Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/server.py: 57%
434 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Defines the Prefect REST API FastAPI app.
3"""
5from __future__ import annotations 1a
7import asyncio 1a
8import atexit 1a
9import base64 1a
10import contextlib 1a
11import gc 1a
12import logging 1a
13import mimetypes 1a
14import os 1a
15import random 1a
16import shutil 1a
17import socket 1a
18import sqlite3 1a
19import subprocess 1a
20import sys 1a
21import time 1a
22from contextlib import AsyncExitStack, asynccontextmanager 1a
23from functools import wraps 1a
24from hashlib import sha256 1a
25from typing import Any, AsyncGenerator, Awaitable, Callable, Optional 1a
27import anyio 1a
28import asyncpg 1a
29import httpx 1a
30import sqlalchemy as sa 1a
31import sqlalchemy.exc 1a
32import sqlalchemy.orm.exc 1a
33from docket import Docket 1a
34from fastapi import Depends, FastAPI, Request, Response 1a
35from fastapi.encoders import jsonable_encoder 1a
36from fastapi.exceptions import RequestValidationError 1a
37from fastapi.middleware.cors import CORSMiddleware 1a
38from fastapi.middleware.gzip import GZipMiddleware 1a
39from fastapi.openapi.utils import get_openapi 1a
40from fastapi.responses import JSONResponse 1a
41from fastapi.staticfiles import StaticFiles 1a
42from starlette.exceptions import HTTPException 1a
43from typing_extensions import Self 1a
45import prefect 1a
46import prefect.server.api as api 1a
47import prefect.settings 1a
48from prefect._internal.compatibility.starlette import status 1a
49from prefect._internal.observability import configure_logfire 1a
50from prefect.client.constants import SERVER_API_VERSION 1a
51from prefect.logging import get_logger 1a
52from prefect.server.api.background_workers import background_worker 1a
53from prefect.server.api.dependencies import EnforceMinimumAPIVersion 1a
54from prefect.server.exceptions import ObjectNotFoundError 1a
55from prefect.server.services.base import RunInEphemeralServers, RunInWebservers, Service 1a
56from prefect.server.utilities.database import get_dialect 1a
57from prefect.settings import ( 1a
58 PREFECT_API_DATABASE_CONNECTION_URL,
59 PREFECT_API_LOG_RETRYABLE_ERRORS,
60 PREFECT_DEBUG_MODE,
61 PREFECT_MEMO_STORE_PATH,
62 PREFECT_MEMOIZE_BLOCK_AUTO_REGISTRATION,
63 PREFECT_SERVER_API_BASE_PATH,
64 PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS,
65 PREFECT_UI_SERVE_BASE,
66 get_current_settings,
67)
68from prefect.utilities.hashing import hash_objects 1a
70logfire: Any | None = configure_logfire() 1a
72TITLE = "Prefect Server" 1a
73API_TITLE = "Prefect Prefect REST API" 1a
74UI_TITLE = "Prefect Prefect REST API UI" 1a
75API_VERSION: str = prefect.__version__ 1a
76# migrations should run only once per app start; the ephemeral API can potentially
77# create multiple apps in a single process
78LIFESPAN_RAN_FOR_APP: set[Any] = set() 1a
80logger: "logging.Logger" = get_logger("server") 1a
82enforce_minimum_version: EnforceMinimumAPIVersion = EnforceMinimumAPIVersion( 1a
83 # this should be <= SERVER_API_VERSION; clients that send
84 # a version header under this value will be rejected
85 minimum_api_version="0.8.0",
86 logger=logger,
87)
90API_ROUTERS = ( 1a
91 api.flows.router,
92 api.flow_runs.router,
93 api.task_runs.router,
94 api.flow_run_states.router,
95 api.task_run_states.router,
96 api.deployments.router,
97 api.saved_searches.router,
98 api.logs.router,
99 api.concurrency_limits.router,
100 api.concurrency_limits_v2.router,
101 api.block_types.router,
102 api.block_documents.router,
103 api.workers.router,
104 api.task_workers.router,
105 api.work_queues.router,
106 api.artifacts.router,
107 api.block_schemas.router,
108 api.block_capabilities.router,
109 api.collections.router,
110 api.variables.router,
111 api.csrf_token.router,
112 api.events.router,
113 api.automations.router,
114 api.templates.router,
115 api.ui.flows.router,
116 api.ui.flow_runs.router,
117 api.ui.schemas.router,
118 api.ui.task_runs.router,
119 api.admin.router,
120 api.root.router,
121)
123SQLITE_LOCKED_MSG = "database is locked" 1ab
126class _SQLiteLockedOperationalErrorFilter(logging.Filter): 1ab
127 """Filter uvicorn error logs for retryable SQLite lock failures."""
129 def filter(self, record: logging.LogRecord) -> bool: 1ab
130 exc: BaseException | None = record.exc_info[1] if record.exc_info else None 1b
132 if not isinstance(exc, sqlalchemy.exc.OperationalError):
133 return True 1b
135 orig_exc = getattr(exc, "orig", None)
136 if not isinstance(orig_exc, sqlite3.OperationalError): 136 ↛ anywhereline 136 didn't jump anywhere: it always raised an exception.1b
137 return True 1b
139 if getattr(orig_exc, "sqlite_errorname", None) in { 139 ↛ 145line 139 didn't jump to line 145 because the condition on line 139 was always true1b
140 "SQLITE_BUSY",
141 "SQLITE_BUSY_SNAPSHOT",
142 } or SQLITE_LOCKED_MSG in getattr(orig_exc, "args", []):
143 return get_current_settings().server.log_retryable_errors 1b
145 return True 1b
148_SQLITE_LOCKED_LOG_FILTER: _SQLiteLockedOperationalErrorFilter | None = None 1a
151def _install_sqlite_locked_log_filter() -> None: 1ab
152 global _SQLITE_LOCKED_LOG_FILTER
154 if _SQLITE_LOCKED_LOG_FILTER is not None: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true1a
155 return
157 filter_ = _SQLiteLockedOperationalErrorFilter() 1a
158 logging.getLogger("uvicorn.error").addFilter(filter_) 1ab
159 _SQLITE_LOCKED_LOG_FILTER = filter_ 1ab
162class SPAStaticFiles(StaticFiles): 1a
163 """
164 Implementation of `StaticFiles` for serving single page applications.
166 Adds `get_response` handling to ensure that when a resource isn't found the
167 application still returns the index.
168 """
170 async def get_response(self, path: str, scope: Any) -> Response: 1a
171 try:
172 return await super().get_response(path, scope)
173 except HTTPException:
174 return await super().get_response("./index.html", scope)
177class RequestLimitMiddleware: 1a
178 """
179 A middleware that limits the number of concurrent requests handled by the API.
181 This is a blunt tool for limiting SQLite concurrent writes which will cause failures
182 at high volume. Ideally, we would only apply the limit to routes that perform
183 writes.
184 """
186 def __init__(self, app: Any, limit: float): 1a
187 self.app = app 1a
188 self._limiter = anyio.CapacityLimiter(limit) 1ab
190 async def __call__(self, scope: Any, receive: Any, send: Any) -> None: 1a
191 async with self._limiter: 1a
192 await self.app(scope, receive, send) 1ab
195async def validation_exception_handler( 1a
196 request: Request, exc: RequestValidationError
197) -> JSONResponse:
198 """Provide a detailed message for request validation errors."""
199 return JSONResponse(
200 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
201 content=jsonable_encoder(
202 {
203 "exception_message": "Invalid request received.",
204 "exception_detail": exc.errors(),
205 "request_body": exc.body,
206 }
207 ),
208 )
211async def integrity_exception_handler(request: Request, exc: Exception) -> JSONResponse: 1a
212 """Capture database integrity errors."""
213 logger.error("Encountered exception in request:", exc_info=True)
214 return JSONResponse(
215 content={
216 "detail": (
217 "Data integrity conflict. This usually means a "
218 "unique or foreign key constraint was violated. "
219 "See server logs for details."
220 )
221 },
222 status_code=status.HTTP_409_CONFLICT,
223 )
226def is_client_retryable_exception(exc: Exception) -> bool: 1ab
227 if isinstance(exc, sqlalchemy.exc.OperationalError) and isinstance( 227 ↛ 239line 227 didn't jump to line 239 because the condition on line 227 was always true1b
228 exc.orig, sqlite3.OperationalError
229 ):
230 if getattr(exc.orig, "sqlite_errorname", None) in { 230 ↛ anywhereline 230 didn't jump anywhere: it always raised an exception.
231 "SQLITE_BUSY",
232 "SQLITE_BUSY_SNAPSHOT",
233 } or SQLITE_LOCKED_MSG in getattr(exc.orig, "args", []):
234 return True
235 else:
236 # Avoid falling through to the generic `DBAPIError` case below
237 return False
239 if isinstance(
240 exc,
241 (
242 sqlalchemy.exc.DBAPIError,
243 asyncpg.exceptions.QueryCanceledError,
244 asyncpg.exceptions.ConnectionDoesNotExistError,
245 asyncpg.exceptions.CannotConnectNowError,
246 sqlalchemy.exc.InvalidRequestError,
247 sqlalchemy.orm.exc.DetachedInstanceError,
248 ),
249 ):
250 return True
252 return False
255def replace_placeholder_string_in_files( 1a
256 directory: str,
257 placeholder: str,
258 replacement: str,
259 allowed_extensions: list[str] | None = None,
260) -> None:
261 """
262 Recursively loops through all files in the given directory and replaces
263 a placeholder string.
264 """
265 if allowed_extensions is None: 265 ↛ 268line 265 didn't jump to line 268 because the condition on line 265 was always true1a
266 allowed_extensions = [".txt", ".html", ".css", ".js", ".json", ".txt"] 1a
268 for root, _, files in os.walk(directory): 1a
269 for file in files: 1a
270 if any(file.endswith(ext) for ext in allowed_extensions): 1a
271 file_path = os.path.join(root, file) 1a
273 with open(file_path, "r", encoding="utf-8") as file: 1a
274 file_data = file.read() 1a
276 file_data = file_data.replace(placeholder, replacement) 1a
278 with open(file_path, "w", encoding="utf-8") as file: 1a
279 file.write(file_data) 1a
282def copy_directory(directory: str, path: str) -> None: 1a
283 os.makedirs(path, exist_ok=True) 1a
284 for item in os.listdir(directory): 1a
285 source = os.path.join(directory, item) 1a
286 destination = os.path.join(path, item) 1a
288 if os.path.isdir(source): 1a
289 if os.path.exists(destination): 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true1a
290 shutil.rmtree(destination)
291 shutil.copytree(source, destination, symlinks=True) 1a
292 # ensure copied files are writeable
293 for root, _, files in os.walk(destination): 1a
294 for f in files: 1a
295 os.chmod(os.path.join(root, f), 0o700) 1a
296 else:
297 shutil.copy2(source, destination) 1a
298 # Ensure copied file is writeable
299 os.chmod(destination, 0o700) 1a
302async def custom_internal_exception_handler( 1a
303 request: Request, exc: Exception
304) -> JSONResponse:
305 """
306 Log a detailed exception for internal server errors before returning.
308 Send 503 for errors clients can retry on.
309 """
310 if is_client_retryable_exception(exc):
311 if PREFECT_API_LOG_RETRYABLE_ERRORS.value():
312 logger.error("Encountered retryable exception in request:", exc_info=True)
314 return JSONResponse(
315 content={"exception_message": "Service Unavailable"},
316 status_code=status.HTTP_503_SERVICE_UNAVAILABLE,
317 )
319 logger.error("Encountered exception in request:", exc_info=True)
321 return JSONResponse(
322 content={"exception_message": "Internal Server Error"},
323 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
324 )
327async def prefect_object_not_found_exception_handler( 1a
328 request: Request, exc: ObjectNotFoundError
329) -> JSONResponse:
330 """Return 404 status code on object not found exceptions."""
331 return JSONResponse(
332 content={"exception_message": str(exc)}, status_code=status.HTTP_404_NOT_FOUND
333 )
336API_APP_CACHE: dict[tuple[str, str | None], FastAPI] = {} 1a
339def create_api_app( 1a
340 dependencies: list[Any] | None = None,
341 health_check_path: str = "/health",
342 version_check_path: str = "/version",
343 fast_api_app_kwargs: dict[str, Any] | None = None,
344 final: bool = False,
345 ignore_cache: bool = False,
346) -> FastAPI:
347 """
348 Create a FastAPI app that includes the Prefect REST API
350 Args:
351 dependencies: a list of global dependencies to add to each Prefect REST API router
352 health_check_path: the health check route path
353 fast_api_app_kwargs: kwargs to pass to the FastAPI constructor
354 final: whether this will be the last instance of the Prefect server to be
355 created in this process, so that additional optimizations may be applied
356 ignore_cache: if set, a new app will be created even if the settings and fast_api_app_kwargs match
357 an existing app in the cache
359 Returns:
360 a FastAPI app that serves the Prefect REST API
361 """
362 cache_key = ( 1a
363 prefect.settings.get_current_settings().hash_key(),
364 hash_objects(fast_api_app_kwargs) if fast_api_app_kwargs else None,
365 )
367 if cache_key in API_APP_CACHE and not ignore_cache: 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true1a
368 return API_APP_CACHE[cache_key]
370 fast_api_app_kwargs = fast_api_app_kwargs or {} 1a
371 api_app = FastAPI(title=API_TITLE, **fast_api_app_kwargs) 1a
373 if logfire: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true1a
374 logfire.instrument_fastapi(api_app) # pyright: ignore
376 api_app.add_middleware(GZipMiddleware) 1a
378 @api_app.get(health_check_path, tags=["Root"]) 1a
379 async def health_check() -> bool: # type: ignore[reportUnusedFunction] 1a
380 return True
382 @api_app.get(version_check_path, tags=["Root"]) 1a
383 async def server_version() -> str: # type: ignore[reportUnusedFunction] 1a
384 return SERVER_API_VERSION
386 # always include version checking
387 if dependencies is None: 387 ↛ 390line 387 didn't jump to line 390 because the condition on line 387 was always true1a
388 dependencies = [Depends(enforce_minimum_version)] 1a
389 else:
390 dependencies.append(Depends(enforce_minimum_version))
392 for router in API_ROUTERS: 1a
393 api_app.include_router(router, dependencies=dependencies) 1a
394 if final: 394 ↛ 392line 394 didn't jump to line 392 because the condition on line 394 was always true1a
395 # Important note about how FastAPI works:
396 #
397 # When including a router, FastAPI copies the routes and builds entirely new
398 # Pydantic models to represent the request bodies of the routes in the
399 # router. This is because the dependencies may change if the same router is
400 # included multiple times, but it also means that we are holding onto an
401 # entire set of Pydantic models on the original routers for the duration of
402 # the server process that will never be used.
403 #
404 # Because Prefect does not reuse routers, we are free to clean up the routes
405 # because we know they won't be used again. Thus, if we have the hint that
406 # this is the final instance we will create in this process, we can clean up
407 # the routes on the original source routers to conserve memory (~50-55MB as
408 # of introducing this change).
409 del router.routes 1a
411 if final: 411 ↛ 414line 411 didn't jump to line 414 because the condition on line 411 was always true1a
412 gc.collect() 1a
414 auth_string = prefect.settings.PREFECT_SERVER_API_AUTH_STRING.value() 1a
416 if auth_string is not None: 416 ↛ 418line 416 didn't jump to line 418 because the condition on line 416 was never true1a
418 @api_app.middleware("http")
419 async def token_validation(request: Request, call_next: Any): # type: ignore[reportUnusedFunction]
420 header_token = request.headers.get("Authorization")
422 # used for probes in k8s and such
423 if (
424 request.url.path.endswith(("health", "ready"))
425 and request.method.upper() == "GET"
426 ):
427 return await call_next(request)
428 try:
429 if header_token is None:
430 return JSONResponse(
431 status_code=status.HTTP_401_UNAUTHORIZED,
432 content={"exception_message": "Unauthorized"},
433 )
434 scheme, creds = header_token.split()
435 assert scheme == "Basic"
436 decoded = base64.b64decode(creds).decode("utf-8")
437 except Exception:
438 return JSONResponse(
439 status_code=status.HTTP_401_UNAUTHORIZED,
440 content={"exception_message": "Unauthorized"},
441 )
442 if decoded != auth_string:
443 return JSONResponse(
444 status_code=status.HTTP_401_UNAUTHORIZED,
445 content={"exception_message": "Unauthorized"},
446 )
447 return await call_next(request)
449 API_APP_CACHE[cache_key] = api_app 1a
451 return api_app 1a
454def create_ui_app(ephemeral: bool) -> FastAPI: 1a
455 ui_app = FastAPI(title=UI_TITLE) 1a
456 base_url = prefect.settings.PREFECT_UI_SERVE_BASE.value() 1a
457 cache_key = f"{prefect.__version__}:{base_url}" 1a
458 stripped_base_url = base_url.rstrip("/") 1a
459 static_dir = ( 1a
460 prefect.settings.PREFECT_UI_STATIC_DIRECTORY.value()
461 or prefect.__ui_static_subpath__
462 )
463 reference_file_name = "UI_SERVE_BASE" 1a
465 if os.name == "nt": 465 ↛ 467line 465 didn't jump to line 467 because the condition on line 465 was never true1a
466 # Windows defaults to text/plain for .js files
467 mimetypes.init()
468 mimetypes.add_type("application/javascript", ".js")
470 @ui_app.get(f"{stripped_base_url}/ui-settings") 1a
471 def ui_settings() -> dict[str, Any]: # type: ignore[reportUnusedFunction] 1a
472 return {
473 "api_url": prefect.settings.PREFECT_UI_API_URL.value(),
474 "csrf_enabled": prefect.settings.PREFECT_SERVER_CSRF_PROTECTION_ENABLED.value(),
475 "auth": "BASIC"
476 if prefect.settings.PREFECT_SERVER_API_AUTH_STRING.value()
477 else None,
478 "flags": [],
479 }
481 def reference_file_matches_base_url() -> bool: 1a
482 reference_file_path = os.path.join(static_dir, reference_file_name) 1a
484 if os.path.exists(static_dir): 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true1a
485 try:
486 with open(reference_file_path, "r") as f:
487 return f.read() == cache_key
488 except FileNotFoundError:
489 return False
490 else:
491 return False 1a
493 def create_ui_static_subpath() -> None: 1a
494 if not os.path.exists(static_dir): 494 ↛ 497line 494 didn't jump to line 497 because the condition on line 494 was always true1a
495 os.makedirs(static_dir) 1a
497 copy_directory(str(prefect.__ui_static_path__), str(static_dir)) 1a
498 replace_placeholder_string_in_files( 1a
499 str(static_dir),
500 "/PREFECT_UI_SERVE_BASE_REPLACE_PLACEHOLDER",
501 stripped_base_url,
502 )
504 # Create a file to indicate that the static files have been copied
505 # This is used to determine if the static files need to be copied again
506 # when the server is restarted
507 with open(os.path.join(static_dir, reference_file_name), "w") as f: 1a
508 f.write(cache_key) 1a
510 ui_app.add_middleware(GZipMiddleware) 1a
512 if ( 512 ↛ 528line 512 didn't jump to line 528 because the condition on line 512 was always true
513 os.path.exists(prefect.__ui_static_path__)
514 and prefect.settings.PREFECT_UI_ENABLED.value()
515 and not ephemeral
516 ):
517 # If the static files have already been copied, check if the base_url has changed
518 # If it has, we delete the subpath directory and copy the files again
519 if not reference_file_matches_base_url(): 519 ↛ 522line 519 didn't jump to line 522 because the condition on line 519 was always true1a
520 create_ui_static_subpath() 1a
522 ui_app.mount( 1a
523 PREFECT_UI_SERVE_BASE.value(),
524 SPAStaticFiles(directory=static_dir),
525 name="ui_root",
526 )
528 return ui_app 1a
531APP_CACHE: dict[tuple[prefect.settings.Settings, bool], FastAPI] = {} 1a
534def _memoize_block_auto_registration( 1a
535 fn: Callable[[], Awaitable[None]],
536) -> Callable[[], Awaitable[None]]:
537 """
538 Decorator to handle skipping the wrapped function if the block registry has
539 not changed since the last invocation
540 """
541 import toml 1a
543 import prefect.plugins 1a
544 from prefect.blocks.core import Block 1a
545 from prefect.server.models.block_registration import _load_collection_blocks_data 1a
546 from prefect.utilities.dispatch import get_registry_for_type 1a
548 @wraps(fn) 1a
549 async def wrapper(*args: Any, **kwargs: Any) -> None: 1a
550 if not PREFECT_MEMOIZE_BLOCK_AUTO_REGISTRATION.value(): 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true1b
551 await fn(*args, **kwargs)
552 return
554 # Ensure collections are imported and have the opportunity to register types
555 # before loading the registry
556 prefect.plugins.load_prefect_collections() 1b
558 blocks_registry = get_registry_for_type(Block) 1b
559 collection_blocks_data = await _load_collection_blocks_data() 1b
560 current_blocks_loading_hash = hash_objects( 1b
561 blocks_registry,
562 collection_blocks_data,
563 PREFECT_API_DATABASE_CONNECTION_URL.value(),
564 hash_algo=sha256,
565 )
567 memo_store_path = PREFECT_MEMO_STORE_PATH.value() 1b
568 try: 1b
569 if memo_store_path.exists(): 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true1b
570 saved_blocks_loading_hash = toml.load(memo_store_path).get(
571 "block_auto_registration"
572 )
573 if (
574 saved_blocks_loading_hash is not None
575 and current_blocks_loading_hash == saved_blocks_loading_hash
576 ):
577 if PREFECT_DEBUG_MODE.value():
578 logger.debug(
579 "Skipping block loading due to matching hash for block "
580 "auto-registration found in memo store."
581 )
582 return
583 except Exception as exc:
584 logger.warning(
585 ""
586 f"Unable to read memo_store.toml from {PREFECT_MEMO_STORE_PATH} during "
587 f"block auto-registration: {exc!r}.\n"
588 "All blocks will be registered."
589 )
591 await fn(*args, **kwargs) 1b
593 if current_blocks_loading_hash is not None: 593 ↛ exitline 593 didn't return from function 'wrapper' because the condition on line 593 was always true1b
594 try: 1b
595 if not memo_store_path.exists(): 595 ↛ 598line 595 didn't jump to line 598 because the condition on line 595 was always true1b
596 memo_store_path.touch(mode=0o0600) 1b
598 memo_store_path.write_text( 1b
599 toml.dumps({"block_auto_registration": current_blocks_loading_hash})
600 )
601 except Exception as exc:
602 logger.warning(
603 "Unable to write to memo_store.toml at"
604 f" {PREFECT_MEMO_STORE_PATH} after block auto-registration:"
605 f" {exc!r}.\n Subsequent server start ups will perform block"
606 " auto-registration, which may result in slower server startup."
607 )
609 return wrapper 1a
612def create_app( 1a
613 settings: Optional[prefect.settings.Settings] = None,
614 ephemeral: bool = False,
615 webserver_only: bool = False,
616 final: bool = False,
617 ignore_cache: bool = False,
618) -> FastAPI:
619 """
620 Create a FastAPI app that includes the Prefect REST API and UI
622 Args:
623 settings: The settings to use to create the app. If not set, settings are pulled
624 from the context.
625 ephemeral: If set, the application will be treated as ephemeral. The UI
626 and services will be disabled.
627 webserver_only: If set, the webserver and UI will be available but all background
628 services will be disabled.
629 final: whether this will be the last instance of the Prefect server to be
630 created in this process, so that additional optimizations may be applied
631 ignore_cache: If set, a new application will be created even if the settings
632 match. Otherwise, an application is returned from the cache.
633 """
634 settings = settings or prefect.settings.get_current_settings() 1a
635 cache_key = (settings.hash_key(), ephemeral, webserver_only) 1a
636 ephemeral = ephemeral or bool(os.getenv("PREFECT__SERVER_EPHEMERAL")) 1a
637 webserver_only = webserver_only or bool(os.getenv("PREFECT__SERVER_WEBSERVER_ONLY")) 1a
638 final = final or bool(os.getenv("PREFECT__SERVER_FINAL")) 1a
640 from prefect.logging.configuration import setup_logging 1a
642 setup_logging() 1a
644 if cache_key in APP_CACHE and not ignore_cache: 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true1a
645 return APP_CACHE[cache_key]
647 # TODO: Move these startup functions out of this closure into the top-level or
648 # another dedicated location
649 async def run_migrations(): 1a
650 """Ensure the database is created and up to date with the current migrations"""
651 if prefect.settings.PREFECT_API_DATABASE_MIGRATE_ON_START: 651 ↛ exitline 651 didn't return from function 'run_migrations' because the condition on line 651 was always true1a
652 from prefect.server.database import provide_database_interface 1a
654 db = provide_database_interface() 1a
655 await db.create_db() 1ab
657 @_memoize_block_auto_registration 1a
658 async def add_block_types(): 1a
659 """Add all registered blocks to the database"""
660 if not prefect.settings.PREFECT_API_BLOCKS_REGISTER_ON_START: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true1b
661 return
663 from prefect.server.database import provide_database_interface 1b
664 from prefect.server.models.block_registration import run_block_auto_registration 1b
666 db = provide_database_interface() 1b
667 session = await db.session() 1b
669 async with session: 1b
670 await run_block_auto_registration(session=session) 1b
672 @asynccontextmanager 1a
673 async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: 1a
674 if app in LIFESPAN_RAN_FOR_APP: 674 ↛ 675line 674 didn't jump to line 675 because the condition on line 674 was never true1a
675 yield
676 return
678 await run_migrations() 1ab
679 await add_block_types() 1b
681 Services: type[Service] | None = ( 1b
682 RunInWebservers
683 if webserver_only
684 else RunInEphemeralServers
685 if ephemeral
686 else Service
687 )
689 async with AsyncExitStack() as stack: 1b
690 docket = await stack.enter_async_context( 1b
691 Docket(name=settings.server.docket.name, url=settings.server.docket.url)
692 )
693 await stack.enter_async_context(background_worker(docket)) 1b
694 api_app.state.docket = docket 1b
695 if Services: 695 ↛ 697line 695 didn't jump to line 697 because the condition on line 695 was always true1b
696 await stack.enter_async_context(Services.running()) 1b
697 LIFESPAN_RAN_FOR_APP.add(app) 1b
698 yield 1b
700 def on_service_exit(service: Service, task: asyncio.Task[None]) -> None: 1a
701 """
702 Added as a callback for completion of services to log exit
703 """
704 try:
705 # Retrieving the result will raise the exception
706 task.result()
707 except Exception:
708 logger.error(f"{service.name} service failed!", exc_info=True)
709 else:
710 logger.info(f"{service.name} service stopped!")
712 app = FastAPI( 1a
713 title=TITLE,
714 version=API_VERSION,
715 lifespan=lifespan,
716 )
717 api_app = create_api_app( 1a
718 fast_api_app_kwargs={
719 "exception_handlers": {
720 # NOTE: FastAPI special cases the generic `Exception` handler and
721 # registers it as a separate middleware from the others
722 Exception: custom_internal_exception_handler,
723 RequestValidationError: validation_exception_handler,
724 sa.exc.IntegrityError: integrity_exception_handler,
725 ObjectNotFoundError: prefect_object_not_found_exception_handler,
726 }
727 },
728 final=final,
729 ignore_cache=ignore_cache,
730 )
731 ui_app = create_ui_app(ephemeral) 1a
733 # middleware
734 app.add_middleware( 1a
735 CORSMiddleware,
736 allow_origins=prefect.settings.PREFECT_SERVER_CORS_ALLOWED_ORIGINS.value().split(
737 ","
738 ),
739 allow_methods=prefect.settings.PREFECT_SERVER_CORS_ALLOWED_METHODS.value().split(
740 ","
741 ),
742 allow_headers=prefect.settings.PREFECT_SERVER_CORS_ALLOWED_HEADERS.value().split(
743 ","
744 ),
745 )
747 # Limit the number of concurrent requests when using a SQLite database to reduce
748 # chance of errors where the database cannot be opened due to a high number of
749 # concurrent writes
750 if ( 750 ↛ 757line 750 didn't jump to line 757 because the condition on line 750 was always true
751 get_dialect(prefect.settings.PREFECT_API_DATABASE_CONNECTION_URL.value()).name
752 == "sqlite"
753 ):
754 _install_sqlite_locked_log_filter() 1a
755 app.add_middleware(RequestLimitMiddleware, limit=100) 1a
757 if prefect.settings.PREFECT_SERVER_CSRF_PROTECTION_ENABLED.value(): 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true1a
758 app.add_middleware(api.middleware.CsrfMiddleware)
760 if prefect.settings.PREFECT_API_ENABLE_METRICS: 760 ↛ 761line 760 didn't jump to line 761 because the condition on line 760 was never true1a
761 from prometheus_client import CONTENT_TYPE_LATEST, generate_latest
763 @api_app.get("/metrics")
764 async def metrics() -> Response: # type: ignore[reportUnusedFunction]
765 return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST)
767 api_app.mount( 1a
768 "/static",
769 StaticFiles(
770 directory=os.path.join(
771 os.path.dirname(os.path.realpath(__file__)), "static"
772 )
773 ),
774 name="static",
775 )
776 app.api_app = api_app 1a
777 if PREFECT_SERVER_API_BASE_PATH: 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true1a
778 app.mount(PREFECT_SERVER_API_BASE_PATH.value(), app=api_app, name="api")
779 else:
780 app.mount("/api", app=api_app, name="api") 1a
781 app.mount("/", app=ui_app, name="ui") 1a
783 def openapi(): 1a
784 """
785 Convenience method for extracting the user facing OpenAPI schema from the API app.
787 This method is attached to the global public app for easy access.
788 """
789 partial_schema = get_openapi(
790 title=API_TITLE,
791 version=API_VERSION,
792 routes=api_app.routes,
793 )
794 new_schema = partial_schema.copy()
795 new_schema["paths"] = {}
796 for path, value in partial_schema["paths"].items():
797 new_schema["paths"][f"/api{path}"] = value
799 new_schema["info"]["x-logo"] = {"url": "static/prefect-logo-mark-gradient.png"}
800 return new_schema
802 app.openapi = openapi 1a
804 APP_CACHE[cache_key] = app 1a
805 return app 1a
808subprocess_server_logger: "logging.Logger" = get_logger() 1a
811class SubprocessASGIServer: 1a
812 _instances: dict[int | None, "SubprocessASGIServer"] = {} 1a
813 _port_range: range = range(8000, 9000) 1a
815 def __new__(cls, port: int | None = None, *args: Any, **kwargs: Any) -> Self: 1a
816 """
817 Return an instance of the server associated with the provided port.
818 Prevents multiple instances from being created for the same port.
819 """
820 if port not in cls._instances:
821 instance = super().__new__(cls)
822 cls._instances[port] = instance
823 return cls._instances[port]
825 def __init__(self, port: Optional[int] = None): 1a
826 # This ensures initialization happens only once
827 if not hasattr(self, "_initialized"):
828 self.port: Optional[int] = port
829 self.server_process: subprocess.Popen[Any] | None = None
830 self.running: bool = False
831 self._initialized = True
833 def find_available_port(self) -> int: 1a
834 max_attempts = 10
835 for _ in range(max_attempts): 835 ↛ anywhereline 835 didn't jump anywhere: it always raised an exception.1b
836 port = random.choice(self._port_range)
837 if self.is_port_available(port):
838 return port 1b
839 time.sleep(random.uniform(0.1, 0.5)) # Random backoff 1b
840 raise RuntimeError("Unable to find an available port after multiple attempts") 1b
842 @staticmethod 1a
843 def is_port_available(port: int) -> bool: 1ab
844 with contextlib.closing(
845 socket.socket(socket.AF_INET, socket.SOCK_STREAM)
846 ) as sock:
847 try: 1b
848 sock.bind(("127.0.0.1", port))
849 return True
850 except socket.error: 1b
851 return False
853 @property 1a
854 def address(self) -> str: 1a
855 return f"http://127.0.0.1:{self.port}"
857 @property 1a
858 def api_url(self) -> str: 1a
859 return f"{self.address}/api"
861 def start(self, timeout: Optional[int] = None) -> None: 1a
862 """
863 Start the server in a separate process. Safe to call multiple times; only starts
864 the server once.
866 Args:
867 timeout: The maximum time to wait for the server to start
868 """
869 if not self.running:
870 if self.port is None:
871 self.port = self.find_available_port()
872 assert self.port is not None, "Port must be provided or available"
873 help_message = (
874 f"Starting temporary server on {self.address}\nSee "
875 "https://docs.prefect.io/v3/concepts/server#how-to-guides "
876 "for more information on running a dedicated Prefect server."
877 )
878 subprocess_server_logger.info(help_message)
879 try:
880 self.running = True
881 self.server_process = self._run_uvicorn_command()
882 atexit.register(self.stop)
883 with httpx.Client() as client:
884 response = None
885 elapsed_time = 0
886 max_wait_time = (
887 timeout
888 or PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS.value()
889 )
890 while elapsed_time < max_wait_time:
891 if self.server_process.poll() == 3:
892 self.port = self.find_available_port()
893 self.server_process = self._run_uvicorn_command()
894 continue
895 try:
896 response = client.get(f"{self.api_url}/health")
897 except httpx.ConnectError:
898 pass
899 else:
900 if response.status_code == 200:
901 break
902 time.sleep(0.1)
903 elapsed_time += 0.1
904 if response:
905 response.raise_for_status()
906 if not response:
907 error_message = "Timed out while attempting to connect to ephemeral Prefect API server."
908 if self.server_process.poll() is not None:
909 error_message += f" Ephemeral server process exited with code {self.server_process.returncode}."
910 if self.server_process.stdout:
911 error_message += (
912 f" stdout: {self.server_process.stdout.read()}"
913 )
914 if self.server_process.stderr:
915 error_message += (
916 f" stderr: {self.server_process.stderr.read()}"
917 )
918 raise RuntimeError(error_message)
919 except Exception:
920 self.running = False
921 raise
923 def _run_uvicorn_command(self) -> subprocess.Popen[Any]: 1a
924 # used to turn off serving the UI
925 server_env = {
926 "PREFECT_UI_ENABLED": "0",
927 "PREFECT__SERVER_EPHEMERAL": "1",
928 "PREFECT__SERVER_FINAL": "1",
929 }
930 return subprocess.Popen(
931 args=[
932 sys.executable,
933 "-m",
934 "uvicorn",
935 "--app-dir",
936 str(prefect.__module_path__.parent),
937 "--factory",
938 "prefect.server.api.server:create_app",
939 "--host",
940 "127.0.0.1",
941 "--port",
942 str(self.port),
943 "--log-level",
944 "error",
945 "--lifespan",
946 "on",
947 ],
948 env={
949 **os.environ,
950 **server_env,
951 **get_current_settings().to_environment_variables(exclude_unset=True),
952 },
953 )
955 def stop(self) -> None: 1a
956 if self.server_process:
957 subprocess_server_logger.info(
958 f"Stopping temporary server on {self.address}"
959 )
960 self.server_process.terminate()
961 try:
962 self.server_process.wait(timeout=5)
963 except subprocess.TimeoutExpired:
964 self.server_process.kill()
965 # Ensure the process is reaped to avoid ResourceWarning
966 self.server_process.wait()
967 finally:
968 self.server_process = None
969 if self.port in self._instances:
970 del self._instances[self.port]
971 if self.running:
972 self.running = False
974 def __enter__(self) -> Self: 1a
975 self.start()
976 return self
978 def __exit__(self, *args: Any) -> None: 1a
979 self.stop()