Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/root.py: 54%
20 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Contains the `hello` route for testing and healthcheck purposes.
3"""
5import os 1a
7from fastapi import Depends, status 1a
8from fastapi.responses import JSONResponse 1a
10from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
11from prefect.server.utilities.server import PrefectRouter 1a
12from prefect.settings.context import get_current_settings 1a
14router: PrefectRouter = PrefectRouter(prefix="", tags=["Root"]) 1a
17@router.get("/hello") 1a
18async def hello() -> str: 1a
19 """Say hello!"""
20 return "👋"
23@router.get("/ready") 1a
24async def perform_readiness_check( 1a
25 db: PrefectDBInterface = Depends(provide_database_interface),
26) -> JSONResponse:
27 is_db_connectable = await db.is_db_connectable()
29 if is_db_connectable:
30 return JSONResponse(
31 status_code=status.HTTP_200_OK,
32 content={"message": "OK"},
33 )
35 return JSONResponse(
36 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
37 content={"message": "Database is not available"},
38 )
41if get_current_settings().testing.test_mode: 41 ↛ 43line 41 didn't jump to line 43 because the condition on line 41 was never true1a
43 @router.get("/pid")
44 async def pid() -> JSONResponse:
45 """
46 Returns the process ID of the current process.
48 Used for testing multi-worker server.
49 """
50 return JSONResponse(
51 status_code=status.HTTP_200_OK, content={"pid": os.getpid()}
52 )