Coverage for /usr/local/lib/python3.12/site-packages/prefect/runner/server.py: 0%

60 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 

2 

3import uuid 

4from typing import TYPE_CHECKING, Any, Callable, Optional 

5 

6import uvicorn 

7from fastapi import APIRouter, FastAPI, status 

8from fastapi.responses import JSONResponse 

9from typing_extensions import Literal 

10 

11from prefect.logging import get_logger 

12from prefect.settings import ( 

13 PREFECT_RUNNER_POLL_FREQUENCY, 

14 PREFECT_RUNNER_SERVER_HOST, 

15 PREFECT_RUNNER_SERVER_LOG_LEVEL, 

16 PREFECT_RUNNER_SERVER_MISSED_POLLS_TOLERANCE, 

17 PREFECT_RUNNER_SERVER_PORT, 

18) 

19from prefect.types._datetime import now as now_fn 

20from prefect.utilities.asyncutils import run_coro_as_sync 

21 

22if TYPE_CHECKING: 

23 import logging 

24 

25 from prefect.runner import Runner 

26 

27from pydantic import BaseModel 

28 

29logger: "logging.Logger" = get_logger("runner.webserver") 

30 

31RunnableEndpoint = Literal["deployment", "flow", "task"] 

32 

33 

34class RunnerGenericFlowRunRequest(BaseModel): 

35 entrypoint: str 

36 parameters: Optional[dict[str, Any]] = None 

37 parent_task_run_id: Optional[uuid.UUID] = None 

38 

39 

40def perform_health_check( 

41 runner: "Runner", delay_threshold: int | None = None 

42) -> Callable[..., JSONResponse]: 

43 if delay_threshold is None: 

44 delay_threshold = ( 

45 PREFECT_RUNNER_SERVER_MISSED_POLLS_TOLERANCE.value() 

46 * PREFECT_RUNNER_POLL_FREQUENCY.value() 

47 ) 

48 

49 def _health_check(): 

50 now = now_fn("UTC") 

51 poll_delay = (now - runner.last_polled).total_seconds() 

52 

53 if TYPE_CHECKING: 

54 assert delay_threshold is not None 

55 

56 if poll_delay > delay_threshold: 

57 return JSONResponse( 

58 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

59 content={"message": "Runner is unresponsive at this time"}, 

60 ) 

61 return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "OK"}) 

62 

63 return _health_check 

64 

65 

66def run_count(runner: "Runner") -> Callable[..., int]: 

67 def _run_count() -> int: 

68 run_count = len(runner._flow_run_process_map) # pyright: ignore[reportPrivateUsage] 

69 return run_count 

70 

71 return _run_count 

72 

73 

74def shutdown(runner: "Runner") -> Callable[..., JSONResponse]: 

75 def _shutdown(): 

76 runner.stop() 

77 return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "OK"}) 

78 

79 return _shutdown 

80 

81 

82async def build_server(runner: "Runner") -> FastAPI: 

83 """ 

84 Build a FastAPI server for a runner. 

85 

86 Args: 

87 runner: the runner this server interacts with and monitors 

88 """ 

89 webserver = FastAPI() 

90 router = APIRouter() 

91 

92 router.add_api_route( 

93 "/health", perform_health_check(runner=runner), methods=["GET"] 

94 ) 

95 router.add_api_route("/run_count", run_count(runner=runner), methods=["GET"]) 

96 router.add_api_route("/shutdown", shutdown(runner=runner), methods=["POST"]) 

97 webserver.include_router(router) 

98 

99 return webserver 

100 

101 

102def start_webserver(runner: "Runner", log_level: str | None = None) -> None: 

103 """ 

104 Run a FastAPI server for a runner. 

105 

106 Args: 

107 runner: the runner this server interacts with and monitors 

108 log_level: the log level to use for the server 

109 """ 

110 host = PREFECT_RUNNER_SERVER_HOST.value() 

111 port = PREFECT_RUNNER_SERVER_PORT.value() 

112 log_level = log_level or PREFECT_RUNNER_SERVER_LOG_LEVEL.value() 

113 webserver = run_coro_as_sync(build_server(runner)) 

114 if TYPE_CHECKING: 

115 assert webserver is not None, "webserver should be built" 

116 assert log_level is not None, "log_level should be set" 

117 

118 uvicorn.run( 

119 webserver, host=host, port=port, log_level=log_level.lower() 

120 ) # Uvicorn supports only lowercase log_level