Coverage for /usr/local/lib/python3.12/site-packages/prefect/runner/server.py: 0%
60 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations
3import uuid
4from typing import TYPE_CHECKING, Any, Callable, Optional
6import uvicorn
7from fastapi import APIRouter, FastAPI, status
8from fastapi.responses import JSONResponse
9from typing_extensions import Literal
11from prefect.logging import get_logger
12from prefect.settings import (
13 PREFECT_RUNNER_POLL_FREQUENCY,
14 PREFECT_RUNNER_SERVER_HOST,
15 PREFECT_RUNNER_SERVER_LOG_LEVEL,
16 PREFECT_RUNNER_SERVER_MISSED_POLLS_TOLERANCE,
17 PREFECT_RUNNER_SERVER_PORT,
18)
19from prefect.types._datetime import now as now_fn
20from prefect.utilities.asyncutils import run_coro_as_sync
22if TYPE_CHECKING:
23 import logging
25 from prefect.runner import Runner
27from pydantic import BaseModel
29logger: "logging.Logger" = get_logger("runner.webserver")
31RunnableEndpoint = Literal["deployment", "flow", "task"]
34class RunnerGenericFlowRunRequest(BaseModel):
35 entrypoint: str
36 parameters: Optional[dict[str, Any]] = None
37 parent_task_run_id: Optional[uuid.UUID] = None
40def perform_health_check(
41 runner: "Runner", delay_threshold: int | None = None
42) -> Callable[..., JSONResponse]:
43 if delay_threshold is None:
44 delay_threshold = (
45 PREFECT_RUNNER_SERVER_MISSED_POLLS_TOLERANCE.value()
46 * PREFECT_RUNNER_POLL_FREQUENCY.value()
47 )
49 def _health_check():
50 now = now_fn("UTC")
51 poll_delay = (now - runner.last_polled).total_seconds()
53 if TYPE_CHECKING:
54 assert delay_threshold is not None
56 if poll_delay > delay_threshold:
57 return JSONResponse(
58 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
59 content={"message": "Runner is unresponsive at this time"},
60 )
61 return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "OK"})
63 return _health_check
66def run_count(runner: "Runner") -> Callable[..., int]:
67 def _run_count() -> int:
68 run_count = len(runner._flow_run_process_map) # pyright: ignore[reportPrivateUsage]
69 return run_count
71 return _run_count
74def shutdown(runner: "Runner") -> Callable[..., JSONResponse]:
75 def _shutdown():
76 runner.stop()
77 return JSONResponse(status_code=status.HTTP_200_OK, content={"message": "OK"})
79 return _shutdown
82async def build_server(runner: "Runner") -> FastAPI:
83 """
84 Build a FastAPI server for a runner.
86 Args:
87 runner: the runner this server interacts with and monitors
88 """
89 webserver = FastAPI()
90 router = APIRouter()
92 router.add_api_route(
93 "/health", perform_health_check(runner=runner), methods=["GET"]
94 )
95 router.add_api_route("/run_count", run_count(runner=runner), methods=["GET"])
96 router.add_api_route("/shutdown", shutdown(runner=runner), methods=["POST"])
97 webserver.include_router(router)
99 return webserver
102def start_webserver(runner: "Runner", log_level: str | None = None) -> None:
103 """
104 Run a FastAPI server for a runner.
106 Args:
107 runner: the runner this server interacts with and monitors
108 log_level: the log level to use for the server
109 """
110 host = PREFECT_RUNNER_SERVER_HOST.value()
111 port = PREFECT_RUNNER_SERVER_PORT.value()
112 log_level = log_level or PREFECT_RUNNER_SERVER_LOG_LEVEL.value()
113 webserver = run_coro_as_sync(build_server(runner))
114 if TYPE_CHECKING:
115 assert webserver is not None, "webserver should be built"
116 assert log_level is not None, "log_level should be set"
118 uvicorn.run(
119 webserver, host=host, port=port, log_level=log_level.lower()
120 ) # Uvicorn supports only lowercase log_level