Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/root.py: 75%

20 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Contains the `hello` route for testing and healthcheck purposes. 

3""" 

4 

5import os 1a

6 

7from fastapi import Depends, status 1a

8from fastapi.responses import JSONResponse 1a

9 

10from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

11from prefect.server.utilities.server import PrefectRouter 1a

12from prefect.settings.context import get_current_settings 1a

13 

14router: PrefectRouter = PrefectRouter(prefix="", tags=["Root"]) 1a

15 

16 

17@router.get("/hello") 1a

18async def hello() -> str: 1a

19 """Say hello!""" 

20 return "👋" 1bc

21 

22 

23@router.get("/ready") 1a

24async def perform_readiness_check( 1a

25 db: PrefectDBInterface = Depends(provide_database_interface), 

26) -> JSONResponse: 

27 is_db_connectable = await db.is_db_connectable() 1bc

28 

29 if is_db_connectable: 29 ↛ 35line 29 didn't jump to line 35 because the condition on line 29 was always true1bc

30 return JSONResponse( 1bc

31 status_code=status.HTTP_200_OK, 

32 content={"message": "OK"}, 

33 ) 

34 

35 return JSONResponse( 

36 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

37 content={"message": "Database is not available"}, 

38 ) 

39 

40 

41if get_current_settings().testing.test_mode: 41 ↛ 43line 41 didn't jump to line 43 because the condition on line 41 was never true1a

42 

43 @router.get("/pid") 

44 async def pid() -> JSONResponse: 

45 """ 

46 Returns the process ID of the current process. 

47 

48 Used for testing multi-worker server. 

49 """ 

50 return JSONResponse( 

51 status_code=status.HTTP_200_OK, content={"pid": os.getpid()} 

52 )