Coverage for /usr/local/lib/python3.12/site-packages/prefect/workers/server.py: 0%

21 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from typing import Any 

2 

3import uvicorn 

4from fastapi import APIRouter, FastAPI, status 

5from fastapi.responses import JSONResponse 

6 

7from prefect.settings import ( 

8 PREFECT_WORKER_WEBSERVER_HOST, 

9 PREFECT_WORKER_WEBSERVER_PORT, 

10) 

11from prefect.workers.base import BaseWorker 

12 

13 

14def build_healthcheck_server( 

15 worker: BaseWorker[Any, Any, Any], 

16 query_interval_seconds: float, 

17 log_level: str = "error", 

18) -> uvicorn.Server: 

19 """ 

20 Build a healthcheck FastAPI server for a worker. 

21 

22 Args: 

23 worker (BaseWorker | ProcessWorker): the worker whose health we will check 

24 log_level (str): the log 

25 """ 

26 app = FastAPI() 

27 router = APIRouter() 

28 

29 def perform_health_check(): 

30 did_recently_poll = worker.is_worker_still_polling( 

31 query_interval_seconds=query_interval_seconds 

32 ) 

33 

34 if not did_recently_poll: 

35 return JSONResponse( 

36 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

37 content={"message": "Worker may be unresponsive at this time"}, 

38 ) 

39 return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "OK"}) 

40 

41 router.add_api_route("/health", perform_health_check, methods=["GET"]) 

42 

43 app.include_router(router) 

44 

45 config = uvicorn.Config( 

46 app=app, 

47 host=PREFECT_WORKER_WEBSERVER_HOST.value(), 

48 port=PREFECT_WORKER_WEBSERVER_PORT.value(), 

49 log_level=log_level, 

50 ) 

51 return uvicorn.Server(config=config) 

52 

53 

54def start_healthcheck_server( 

55 worker: BaseWorker[Any, Any, Any], 

56 query_interval_seconds: float, 

57 log_level: str = "error", 

58) -> None: 

59 """ 

60 Run a healthcheck FastAPI server for a worker. 

61 

62 Args: 

63 worker (BaseWorker | ProcessWorker): the worker whose health we will check 

64 log_level (str): the log level to use for the server 

65 """ 

66 server = build_healthcheck_server(worker, query_interval_seconds, log_level) 

67 server.run()