Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/server.py: 57%

434 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Defines the Prefect REST API FastAPI app. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import asyncio 1a

8import atexit 1a

9import base64 1a

10import contextlib 1a

11import gc 1a

12import logging 1a

13import mimetypes 1a

14import os 1a

15import random 1a

16import shutil 1a

17import socket 1a

18import sqlite3 1a

19import subprocess 1a

20import sys 1a

21import time 1a

22from contextlib import AsyncExitStack, asynccontextmanager 1a

23from functools import wraps 1a

24from hashlib import sha256 1a

25from typing import Any, AsyncGenerator, Awaitable, Callable, Optional 1a

26 

27import anyio 1a

28import asyncpg 1a

29import httpx 1a

30import sqlalchemy as sa 1a

31import sqlalchemy.exc 1a

32import sqlalchemy.orm.exc 1a

33from docket import Docket 1a

34from fastapi import Depends, FastAPI, Request, Response 1a

35from fastapi.encoders import jsonable_encoder 1a

36from fastapi.exceptions import RequestValidationError 1a

37from fastapi.middleware.cors import CORSMiddleware 1a

38from fastapi.middleware.gzip import GZipMiddleware 1a

39from fastapi.openapi.utils import get_openapi 1a

40from fastapi.responses import JSONResponse 1a

41from fastapi.staticfiles import StaticFiles 1a

42from starlette.exceptions import HTTPException 1a

43from typing_extensions import Self 1a

44 

45import prefect 1a

46import prefect.server.api as api 1a

47import prefect.settings 1a

48from prefect._internal.compatibility.starlette import status 1a

49from prefect._internal.observability import configure_logfire 1a

50from prefect.client.constants import SERVER_API_VERSION 1a

51from prefect.logging import get_logger 1a

52from prefect.server.api.background_workers import background_worker 1a

53from prefect.server.api.dependencies import EnforceMinimumAPIVersion 1a

54from prefect.server.exceptions import ObjectNotFoundError 1a

55from prefect.server.services.base import RunInEphemeralServers, RunInWebservers, Service 1a

56from prefect.server.utilities.database import get_dialect 1a

57from prefect.settings import ( 1a

58 PREFECT_API_DATABASE_CONNECTION_URL, 

59 PREFECT_API_LOG_RETRYABLE_ERRORS, 

60 PREFECT_DEBUG_MODE, 

61 PREFECT_MEMO_STORE_PATH, 

62 PREFECT_MEMOIZE_BLOCK_AUTO_REGISTRATION, 

63 PREFECT_SERVER_API_BASE_PATH, 

64 PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS, 

65 PREFECT_UI_SERVE_BASE, 

66 get_current_settings, 

67) 

68from prefect.utilities.hashing import hash_objects 1a

69 

70logfire: Any | None = configure_logfire() 1a

71 

72TITLE = "Prefect Server" 1a

73API_TITLE = "Prefect Prefect REST API" 1a

74UI_TITLE = "Prefect Prefect REST API UI" 1a

75API_VERSION: str = prefect.__version__ 1a

76# migrations should run only once per app start; the ephemeral API can potentially 

77# create multiple apps in a single process 

78LIFESPAN_RAN_FOR_APP: set[Any] = set() 1a

79 

80logger: "logging.Logger" = get_logger("server") 1a

81 

82enforce_minimum_version: EnforceMinimumAPIVersion = EnforceMinimumAPIVersion( 1a

83 # this should be <= SERVER_API_VERSION; clients that send 

84 # a version header under this value will be rejected 

85 minimum_api_version="0.8.0", 

86 logger=logger, 

87) 

88 

89 

90API_ROUTERS = ( 1a

91 api.flows.router, 

92 api.flow_runs.router, 

93 api.task_runs.router, 

94 api.flow_run_states.router, 

95 api.task_run_states.router, 

96 api.deployments.router, 

97 api.saved_searches.router, 

98 api.logs.router, 

99 api.concurrency_limits.router, 

100 api.concurrency_limits_v2.router, 

101 api.block_types.router, 

102 api.block_documents.router, 

103 api.workers.router, 

104 api.task_workers.router, 

105 api.work_queues.router, 

106 api.artifacts.router, 

107 api.block_schemas.router, 

108 api.block_capabilities.router, 

109 api.collections.router, 

110 api.variables.router, 

111 api.csrf_token.router, 

112 api.events.router, 

113 api.automations.router, 

114 api.templates.router, 

115 api.ui.flows.router, 

116 api.ui.flow_runs.router, 

117 api.ui.schemas.router, 

118 api.ui.task_runs.router, 

119 api.admin.router, 

120 api.root.router, 

121) 

122 

123SQLITE_LOCKED_MSG = "database is locked" 1ab

124 

125 

126class _SQLiteLockedOperationalErrorFilter(logging.Filter): 1ab

127 """Filter uvicorn error logs for retryable SQLite lock failures.""" 

128 

129 def filter(self, record: logging.LogRecord) -> bool: 1ab

130 exc: BaseException | None = record.exc_info[1] if record.exc_info else None 1b

131 

132 if not isinstance(exc, sqlalchemy.exc.OperationalError): 

133 return True 1b

134 

135 orig_exc = getattr(exc, "orig", None) 

136 if not isinstance(orig_exc, sqlite3.OperationalError): 136 ↛ anywhereline 136 didn't jump anywhere: it always raised an exception.1b

137 return True 1b

138 

139 if getattr(orig_exc, "sqlite_errorname", None) in { 139 ↛ 145line 139 didn't jump to line 145 because the condition on line 139 was always true1b

140 "SQLITE_BUSY", 

141 "SQLITE_BUSY_SNAPSHOT", 

142 } or SQLITE_LOCKED_MSG in getattr(orig_exc, "args", []): 

143 return get_current_settings().server.log_retryable_errors 1b

144 

145 return True 1b

146 

147 

148_SQLITE_LOCKED_LOG_FILTER: _SQLiteLockedOperationalErrorFilter | None = None 1a

149 

150 

151def _install_sqlite_locked_log_filter() -> None: 1ab

152 global _SQLITE_LOCKED_LOG_FILTER 

153 

154 if _SQLITE_LOCKED_LOG_FILTER is not None: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true1a

155 return 

156 

157 filter_ = _SQLiteLockedOperationalErrorFilter() 1a

158 logging.getLogger("uvicorn.error").addFilter(filter_) 1ab

159 _SQLITE_LOCKED_LOG_FILTER = filter_ 1ab

160 

161 

162class SPAStaticFiles(StaticFiles): 1a

163 """ 

164 Implementation of `StaticFiles` for serving single page applications. 

165 

166 Adds `get_response` handling to ensure that when a resource isn't found the 

167 application still returns the index. 

168 """ 

169 

170 async def get_response(self, path: str, scope: Any) -> Response: 1a

171 try: 

172 return await super().get_response(path, scope) 

173 except HTTPException: 

174 return await super().get_response("./index.html", scope) 

175 

176 

177class RequestLimitMiddleware: 1a

178 """ 

179 A middleware that limits the number of concurrent requests handled by the API. 

180 

181 This is a blunt tool for limiting SQLite concurrent writes which will cause failures 

182 at high volume. Ideally, we would only apply the limit to routes that perform 

183 writes. 

184 """ 

185 

186 def __init__(self, app: Any, limit: float): 1a

187 self.app = app 1a

188 self._limiter = anyio.CapacityLimiter(limit) 1ab

189 

190 async def __call__(self, scope: Any, receive: Any, send: Any) -> None: 1a

191 async with self._limiter: 1a

192 await self.app(scope, receive, send) 1ab

193 

194 

195async def validation_exception_handler( 1a

196 request: Request, exc: RequestValidationError 

197) -> JSONResponse: 

198 """Provide a detailed message for request validation errors.""" 

199 return JSONResponse( 

200 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

201 content=jsonable_encoder( 

202 { 

203 "exception_message": "Invalid request received.", 

204 "exception_detail": exc.errors(), 

205 "request_body": exc.body, 

206 } 

207 ), 

208 ) 

209 

210 

211async def integrity_exception_handler(request: Request, exc: Exception) -> JSONResponse: 1a

212 """Capture database integrity errors.""" 

213 logger.error("Encountered exception in request:", exc_info=True) 

214 return JSONResponse( 

215 content={ 

216 "detail": ( 

217 "Data integrity conflict. This usually means a " 

218 "unique or foreign key constraint was violated. " 

219 "See server logs for details." 

220 ) 

221 }, 

222 status_code=status.HTTP_409_CONFLICT, 

223 ) 

224 

225 

226def is_client_retryable_exception(exc: Exception) -> bool: 1ab

227 if isinstance(exc, sqlalchemy.exc.OperationalError) and isinstance( 227 ↛ 239line 227 didn't jump to line 239 because the condition on line 227 was always true1b

228 exc.orig, sqlite3.OperationalError 

229 ): 

230 if getattr(exc.orig, "sqlite_errorname", None) in { 230 ↛ anywhereline 230 didn't jump anywhere: it always raised an exception.

231 "SQLITE_BUSY", 

232 "SQLITE_BUSY_SNAPSHOT", 

233 } or SQLITE_LOCKED_MSG in getattr(exc.orig, "args", []): 

234 return True 

235 else: 

236 # Avoid falling through to the generic `DBAPIError` case below 

237 return False 

238 

239 if isinstance( 

240 exc, 

241 ( 

242 sqlalchemy.exc.DBAPIError, 

243 asyncpg.exceptions.QueryCanceledError, 

244 asyncpg.exceptions.ConnectionDoesNotExistError, 

245 asyncpg.exceptions.CannotConnectNowError, 

246 sqlalchemy.exc.InvalidRequestError, 

247 sqlalchemy.orm.exc.DetachedInstanceError, 

248 ), 

249 ): 

250 return True 

251 

252 return False 

253 

254 

255def replace_placeholder_string_in_files( 1a

256 directory: str, 

257 placeholder: str, 

258 replacement: str, 

259 allowed_extensions: list[str] | None = None, 

260) -> None: 

261 """ 

262 Recursively loops through all files in the given directory and replaces 

263 a placeholder string. 

264 """ 

265 if allowed_extensions is None: 265 ↛ 268line 265 didn't jump to line 268 because the condition on line 265 was always true1a

266 allowed_extensions = [".txt", ".html", ".css", ".js", ".json", ".txt"] 1a

267 

268 for root, _, files in os.walk(directory): 1a

269 for file in files: 1a

270 if any(file.endswith(ext) for ext in allowed_extensions): 1a

271 file_path = os.path.join(root, file) 1a

272 

273 with open(file_path, "r", encoding="utf-8") as file: 1a

274 file_data = file.read() 1a

275 

276 file_data = file_data.replace(placeholder, replacement) 1a

277 

278 with open(file_path, "w", encoding="utf-8") as file: 1a

279 file.write(file_data) 1a

280 

281 

282def copy_directory(directory: str, path: str) -> None: 1a

283 os.makedirs(path, exist_ok=True) 1a

284 for item in os.listdir(directory): 1a

285 source = os.path.join(directory, item) 1a

286 destination = os.path.join(path, item) 1a

287 

288 if os.path.isdir(source): 1a

289 if os.path.exists(destination): 289 ↛ 290line 289 didn't jump to line 290 because the condition on line 289 was never true1a

290 shutil.rmtree(destination) 

291 shutil.copytree(source, destination, symlinks=True) 1a

292 # ensure copied files are writeable 

293 for root, _, files in os.walk(destination): 1a

294 for f in files: 1a

295 os.chmod(os.path.join(root, f), 0o700) 1a

296 else: 

297 shutil.copy2(source, destination) 1a

298 # Ensure copied file is writeable 

299 os.chmod(destination, 0o700) 1a

300 

301 

302async def custom_internal_exception_handler( 1a

303 request: Request, exc: Exception 

304) -> JSONResponse: 

305 """ 

306 Log a detailed exception for internal server errors before returning. 

307 

308 Send 503 for errors clients can retry on. 

309 """ 

310 if is_client_retryable_exception(exc): 

311 if PREFECT_API_LOG_RETRYABLE_ERRORS.value(): 

312 logger.error("Encountered retryable exception in request:", exc_info=True) 

313 

314 return JSONResponse( 

315 content={"exception_message": "Service Unavailable"}, 

316 status_code=status.HTTP_503_SERVICE_UNAVAILABLE, 

317 ) 

318 

319 logger.error("Encountered exception in request:", exc_info=True) 

320 

321 return JSONResponse( 

322 content={"exception_message": "Internal Server Error"}, 

323 status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, 

324 ) 

325 

326 

327async def prefect_object_not_found_exception_handler( 1a

328 request: Request, exc: ObjectNotFoundError 

329) -> JSONResponse: 

330 """Return 404 status code on object not found exceptions.""" 

331 return JSONResponse( 

332 content={"exception_message": str(exc)}, status_code=status.HTTP_404_NOT_FOUND 

333 ) 

334 

335 

336API_APP_CACHE: dict[tuple[str, str | None], FastAPI] = {} 1a

337 

338 

339def create_api_app( 1a

340 dependencies: list[Any] | None = None, 

341 health_check_path: str = "/health", 

342 version_check_path: str = "/version", 

343 fast_api_app_kwargs: dict[str, Any] | None = None, 

344 final: bool = False, 

345 ignore_cache: bool = False, 

346) -> FastAPI: 

347 """ 

348 Create a FastAPI app that includes the Prefect REST API 

349 

350 Args: 

351 dependencies: a list of global dependencies to add to each Prefect REST API router 

352 health_check_path: the health check route path 

353 fast_api_app_kwargs: kwargs to pass to the FastAPI constructor 

354 final: whether this will be the last instance of the Prefect server to be 

355 created in this process, so that additional optimizations may be applied 

356 ignore_cache: if set, a new app will be created even if the settings and fast_api_app_kwargs match 

357 an existing app in the cache 

358 

359 Returns: 

360 a FastAPI app that serves the Prefect REST API 

361 """ 

362 cache_key = ( 1a

363 prefect.settings.get_current_settings().hash_key(), 

364 hash_objects(fast_api_app_kwargs) if fast_api_app_kwargs else None, 

365 ) 

366 

367 if cache_key in API_APP_CACHE and not ignore_cache: 367 ↛ 368line 367 didn't jump to line 368 because the condition on line 367 was never true1a

368 return API_APP_CACHE[cache_key] 

369 

370 fast_api_app_kwargs = fast_api_app_kwargs or {} 1a

371 api_app = FastAPI(title=API_TITLE, **fast_api_app_kwargs) 1a

372 

373 if logfire: 373 ↛ 374line 373 didn't jump to line 374 because the condition on line 373 was never true1a

374 logfire.instrument_fastapi(api_app) # pyright: ignore 

375 

376 api_app.add_middleware(GZipMiddleware) 1a

377 

378 @api_app.get(health_check_path, tags=["Root"]) 1a

379 async def health_check() -> bool: # type: ignore[reportUnusedFunction] 1a

380 return True 

381 

382 @api_app.get(version_check_path, tags=["Root"]) 1a

383 async def server_version() -> str: # type: ignore[reportUnusedFunction] 1a

384 return SERVER_API_VERSION 

385 

386 # always include version checking 

387 if dependencies is None: 387 ↛ 390line 387 didn't jump to line 390 because the condition on line 387 was always true1a

388 dependencies = [Depends(enforce_minimum_version)] 1a

389 else: 

390 dependencies.append(Depends(enforce_minimum_version)) 

391 

392 for router in API_ROUTERS: 1a

393 api_app.include_router(router, dependencies=dependencies) 1a

394 if final: 394 ↛ 392line 394 didn't jump to line 392 because the condition on line 394 was always true1a

395 # Important note about how FastAPI works: 

396 # 

397 # When including a router, FastAPI copies the routes and builds entirely new 

398 # Pydantic models to represent the request bodies of the routes in the 

399 # router. This is because the dependencies may change if the same router is 

400 # included multiple times, but it also means that we are holding onto an 

401 # entire set of Pydantic models on the original routers for the duration of 

402 # the server process that will never be used. 

403 # 

404 # Because Prefect does not reuse routers, we are free to clean up the routes 

405 # because we know they won't be used again. Thus, if we have the hint that 

406 # this is the final instance we will create in this process, we can clean up 

407 # the routes on the original source routers to conserve memory (~50-55MB as 

408 # of introducing this change). 

409 del router.routes 1a

410 

411 if final: 411 ↛ 414line 411 didn't jump to line 414 because the condition on line 411 was always true1a

412 gc.collect() 1a

413 

414 auth_string = prefect.settings.PREFECT_SERVER_API_AUTH_STRING.value() 1a

415 

416 if auth_string is not None: 416 ↛ 418line 416 didn't jump to line 418 because the condition on line 416 was never true1a

417 

418 @api_app.middleware("http") 

419 async def token_validation(request: Request, call_next: Any): # type: ignore[reportUnusedFunction] 

420 header_token = request.headers.get("Authorization") 

421 

422 # used for probes in k8s and such 

423 if ( 

424 request.url.path.endswith(("health", "ready")) 

425 and request.method.upper() == "GET" 

426 ): 

427 return await call_next(request) 

428 try: 

429 if header_token is None: 

430 return JSONResponse( 

431 status_code=status.HTTP_401_UNAUTHORIZED, 

432 content={"exception_message": "Unauthorized"}, 

433 ) 

434 scheme, creds = header_token.split() 

435 assert scheme == "Basic" 

436 decoded = base64.b64decode(creds).decode("utf-8") 

437 except Exception: 

438 return JSONResponse( 

439 status_code=status.HTTP_401_UNAUTHORIZED, 

440 content={"exception_message": "Unauthorized"}, 

441 ) 

442 if decoded != auth_string: 

443 return JSONResponse( 

444 status_code=status.HTTP_401_UNAUTHORIZED, 

445 content={"exception_message": "Unauthorized"}, 

446 ) 

447 return await call_next(request) 

448 

449 API_APP_CACHE[cache_key] = api_app 1a

450 

451 return api_app 1a

452 

453 

454def create_ui_app(ephemeral: bool) -> FastAPI: 1a

455 ui_app = FastAPI(title=UI_TITLE) 1a

456 base_url = prefect.settings.PREFECT_UI_SERVE_BASE.value() 1a

457 cache_key = f"{prefect.__version__}:{base_url}" 1a

458 stripped_base_url = base_url.rstrip("/") 1a

459 static_dir = ( 1a

460 prefect.settings.PREFECT_UI_STATIC_DIRECTORY.value() 

461 or prefect.__ui_static_subpath__ 

462 ) 

463 reference_file_name = "UI_SERVE_BASE" 1a

464 

465 if os.name == "nt": 465 ↛ 467line 465 didn't jump to line 467 because the condition on line 465 was never true1a

466 # Windows defaults to text/plain for .js files 

467 mimetypes.init() 

468 mimetypes.add_type("application/javascript", ".js") 

469 

470 @ui_app.get(f"{stripped_base_url}/ui-settings") 1a

471 def ui_settings() -> dict[str, Any]: # type: ignore[reportUnusedFunction] 1a

472 return { 

473 "api_url": prefect.settings.PREFECT_UI_API_URL.value(), 

474 "csrf_enabled": prefect.settings.PREFECT_SERVER_CSRF_PROTECTION_ENABLED.value(), 

475 "auth": "BASIC" 

476 if prefect.settings.PREFECT_SERVER_API_AUTH_STRING.value() 

477 else None, 

478 "flags": [], 

479 } 

480 

481 def reference_file_matches_base_url() -> bool: 1a

482 reference_file_path = os.path.join(static_dir, reference_file_name) 1a

483 

484 if os.path.exists(static_dir): 484 ↛ 485line 484 didn't jump to line 485 because the condition on line 484 was never true1a

485 try: 

486 with open(reference_file_path, "r") as f: 

487 return f.read() == cache_key 

488 except FileNotFoundError: 

489 return False 

490 else: 

491 return False 1a

492 

493 def create_ui_static_subpath() -> None: 1a

494 if not os.path.exists(static_dir): 494 ↛ 497line 494 didn't jump to line 497 because the condition on line 494 was always true1a

495 os.makedirs(static_dir) 1a

496 

497 copy_directory(str(prefect.__ui_static_path__), str(static_dir)) 1a

498 replace_placeholder_string_in_files( 1a

499 str(static_dir), 

500 "/PREFECT_UI_SERVE_BASE_REPLACE_PLACEHOLDER", 

501 stripped_base_url, 

502 ) 

503 

504 # Create a file to indicate that the static files have been copied 

505 # This is used to determine if the static files need to be copied again 

506 # when the server is restarted 

507 with open(os.path.join(static_dir, reference_file_name), "w") as f: 1a

508 f.write(cache_key) 1a

509 

510 ui_app.add_middleware(GZipMiddleware) 1a

511 

512 if ( 512 ↛ 528line 512 didn't jump to line 528 because the condition on line 512 was always true

513 os.path.exists(prefect.__ui_static_path__) 

514 and prefect.settings.PREFECT_UI_ENABLED.value() 

515 and not ephemeral 

516 ): 

517 # If the static files have already been copied, check if the base_url has changed 

518 # If it has, we delete the subpath directory and copy the files again 

519 if not reference_file_matches_base_url(): 519 ↛ 522line 519 didn't jump to line 522 because the condition on line 519 was always true1a

520 create_ui_static_subpath() 1a

521 

522 ui_app.mount( 1a

523 PREFECT_UI_SERVE_BASE.value(), 

524 SPAStaticFiles(directory=static_dir), 

525 name="ui_root", 

526 ) 

527 

528 return ui_app 1a

529 

530 

531APP_CACHE: dict[tuple[prefect.settings.Settings, bool], FastAPI] = {} 1a

532 

533 

534def _memoize_block_auto_registration( 1a

535 fn: Callable[[], Awaitable[None]], 

536) -> Callable[[], Awaitable[None]]: 

537 """ 

538 Decorator to handle skipping the wrapped function if the block registry has 

539 not changed since the last invocation 

540 """ 

541 import toml 1a

542 

543 import prefect.plugins 1a

544 from prefect.blocks.core import Block 1a

545 from prefect.server.models.block_registration import _load_collection_blocks_data 1a

546 from prefect.utilities.dispatch import get_registry_for_type 1a

547 

548 @wraps(fn) 1a

549 async def wrapper(*args: Any, **kwargs: Any) -> None: 1a

550 if not PREFECT_MEMOIZE_BLOCK_AUTO_REGISTRATION.value(): 550 ↛ 551line 550 didn't jump to line 551 because the condition on line 550 was never true1b

551 await fn(*args, **kwargs) 

552 return 

553 

554 # Ensure collections are imported and have the opportunity to register types 

555 # before loading the registry 

556 prefect.plugins.load_prefect_collections() 1b

557 

558 blocks_registry = get_registry_for_type(Block) 1b

559 collection_blocks_data = await _load_collection_blocks_data() 1b

560 current_blocks_loading_hash = hash_objects( 1b

561 blocks_registry, 

562 collection_blocks_data, 

563 PREFECT_API_DATABASE_CONNECTION_URL.value(), 

564 hash_algo=sha256, 

565 ) 

566 

567 memo_store_path = PREFECT_MEMO_STORE_PATH.value() 1b

568 try: 1b

569 if memo_store_path.exists(): 569 ↛ 570line 569 didn't jump to line 570 because the condition on line 569 was never true1b

570 saved_blocks_loading_hash = toml.load(memo_store_path).get( 

571 "block_auto_registration" 

572 ) 

573 if ( 

574 saved_blocks_loading_hash is not None 

575 and current_blocks_loading_hash == saved_blocks_loading_hash 

576 ): 

577 if PREFECT_DEBUG_MODE.value(): 

578 logger.debug( 

579 "Skipping block loading due to matching hash for block " 

580 "auto-registration found in memo store." 

581 ) 

582 return 

583 except Exception as exc: 

584 logger.warning( 

585 "" 

586 f"Unable to read memo_store.toml from {PREFECT_MEMO_STORE_PATH} during " 

587 f"block auto-registration: {exc!r}.\n" 

588 "All blocks will be registered." 

589 ) 

590 

591 await fn(*args, **kwargs) 1b

592 

593 if current_blocks_loading_hash is not None: 593 ↛ exitline 593 didn't return from function 'wrapper' because the condition on line 593 was always true1b

594 try: 1b

595 if not memo_store_path.exists(): 595 ↛ 598line 595 didn't jump to line 598 because the condition on line 595 was always true1b

596 memo_store_path.touch(mode=0o0600) 1b

597 

598 memo_store_path.write_text( 1b

599 toml.dumps({"block_auto_registration": current_blocks_loading_hash}) 

600 ) 

601 except Exception as exc: 

602 logger.warning( 

603 "Unable to write to memo_store.toml at" 

604 f" {PREFECT_MEMO_STORE_PATH} after block auto-registration:" 

605 f" {exc!r}.\n Subsequent server start ups will perform block" 

606 " auto-registration, which may result in slower server startup." 

607 ) 

608 

609 return wrapper 1a

610 

611 

612def create_app( 1a

613 settings: Optional[prefect.settings.Settings] = None, 

614 ephemeral: bool = False, 

615 webserver_only: bool = False, 

616 final: bool = False, 

617 ignore_cache: bool = False, 

618) -> FastAPI: 

619 """ 

620 Create a FastAPI app that includes the Prefect REST API and UI 

621 

622 Args: 

623 settings: The settings to use to create the app. If not set, settings are pulled 

624 from the context. 

625 ephemeral: If set, the application will be treated as ephemeral. The UI 

626 and services will be disabled. 

627 webserver_only: If set, the webserver and UI will be available but all background 

628 services will be disabled. 

629 final: whether this will be the last instance of the Prefect server to be 

630 created in this process, so that additional optimizations may be applied 

631 ignore_cache: If set, a new application will be created even if the settings 

632 match. Otherwise, an application is returned from the cache. 

633 """ 

634 settings = settings or prefect.settings.get_current_settings() 1a

635 cache_key = (settings.hash_key(), ephemeral, webserver_only) 1a

636 ephemeral = ephemeral or bool(os.getenv("PREFECT__SERVER_EPHEMERAL")) 1a

637 webserver_only = webserver_only or bool(os.getenv("PREFECT__SERVER_WEBSERVER_ONLY")) 1a

638 final = final or bool(os.getenv("PREFECT__SERVER_FINAL")) 1a

639 

640 from prefect.logging.configuration import setup_logging 1a

641 

642 setup_logging() 1a

643 

644 if cache_key in APP_CACHE and not ignore_cache: 644 ↛ 645line 644 didn't jump to line 645 because the condition on line 644 was never true1a

645 return APP_CACHE[cache_key] 

646 

647 # TODO: Move these startup functions out of this closure into the top-level or 

648 # another dedicated location 

649 async def run_migrations(): 1a

650 """Ensure the database is created and up to date with the current migrations""" 

651 if prefect.settings.PREFECT_API_DATABASE_MIGRATE_ON_START: 651 ↛ exitline 651 didn't return from function 'run_migrations' because the condition on line 651 was always true1a

652 from prefect.server.database import provide_database_interface 1a

653 

654 db = provide_database_interface() 1a

655 await db.create_db() 1ab

656 

657 @_memoize_block_auto_registration 1a

658 async def add_block_types(): 1a

659 """Add all registered blocks to the database""" 

660 if not prefect.settings.PREFECT_API_BLOCKS_REGISTER_ON_START: 660 ↛ 661line 660 didn't jump to line 661 because the condition on line 660 was never true1b

661 return 

662 

663 from prefect.server.database import provide_database_interface 1b

664 from prefect.server.models.block_registration import run_block_auto_registration 1b

665 

666 db = provide_database_interface() 1b

667 session = await db.session() 1b

668 

669 async with session: 1b

670 await run_block_auto_registration(session=session) 1b

671 

672 @asynccontextmanager 1a

673 async def lifespan(app: FastAPI) -> AsyncGenerator[None, None]: 1a

674 if app in LIFESPAN_RAN_FOR_APP: 674 ↛ 675line 674 didn't jump to line 675 because the condition on line 674 was never true1a

675 yield 

676 return 

677 

678 await run_migrations() 1ab

679 await add_block_types() 1b

680 

681 Services: type[Service] | None = ( 1b

682 RunInWebservers 

683 if webserver_only 

684 else RunInEphemeralServers 

685 if ephemeral 

686 else Service 

687 ) 

688 

689 async with AsyncExitStack() as stack: 1b

690 docket = await stack.enter_async_context( 1b

691 Docket(name=settings.server.docket.name, url=settings.server.docket.url) 

692 ) 

693 await stack.enter_async_context(background_worker(docket)) 1b

694 api_app.state.docket = docket 1b

695 if Services: 695 ↛ 697line 695 didn't jump to line 697 because the condition on line 695 was always true1b

696 await stack.enter_async_context(Services.running()) 1b

697 LIFESPAN_RAN_FOR_APP.add(app) 1b

698 yield 1b

699 

700 def on_service_exit(service: Service, task: asyncio.Task[None]) -> None: 1a

701 """ 

702 Added as a callback for completion of services to log exit 

703 """ 

704 try: 

705 # Retrieving the result will raise the exception 

706 task.result() 

707 except Exception: 

708 logger.error(f"{service.name} service failed!", exc_info=True) 

709 else: 

710 logger.info(f"{service.name} service stopped!") 

711 

712 app = FastAPI( 1a

713 title=TITLE, 

714 version=API_VERSION, 

715 lifespan=lifespan, 

716 ) 

717 api_app = create_api_app( 1a

718 fast_api_app_kwargs={ 

719 "exception_handlers": { 

720 # NOTE: FastAPI special cases the generic `Exception` handler and 

721 # registers it as a separate middleware from the others 

722 Exception: custom_internal_exception_handler, 

723 RequestValidationError: validation_exception_handler, 

724 sa.exc.IntegrityError: integrity_exception_handler, 

725 ObjectNotFoundError: prefect_object_not_found_exception_handler, 

726 } 

727 }, 

728 final=final, 

729 ignore_cache=ignore_cache, 

730 ) 

731 ui_app = create_ui_app(ephemeral) 1a

732 

733 # middleware 

734 app.add_middleware( 1a

735 CORSMiddleware, 

736 allow_origins=prefect.settings.PREFECT_SERVER_CORS_ALLOWED_ORIGINS.value().split( 

737 "," 

738 ), 

739 allow_methods=prefect.settings.PREFECT_SERVER_CORS_ALLOWED_METHODS.value().split( 

740 "," 

741 ), 

742 allow_headers=prefect.settings.PREFECT_SERVER_CORS_ALLOWED_HEADERS.value().split( 

743 "," 

744 ), 

745 ) 

746 

747 # Limit the number of concurrent requests when using a SQLite database to reduce 

748 # chance of errors where the database cannot be opened due to a high number of 

749 # concurrent writes 

750 if ( 750 ↛ 757line 750 didn't jump to line 757 because the condition on line 750 was always true

751 get_dialect(prefect.settings.PREFECT_API_DATABASE_CONNECTION_URL.value()).name 

752 == "sqlite" 

753 ): 

754 _install_sqlite_locked_log_filter() 1a

755 app.add_middleware(RequestLimitMiddleware, limit=100) 1a

756 

757 if prefect.settings.PREFECT_SERVER_CSRF_PROTECTION_ENABLED.value(): 757 ↛ 758line 757 didn't jump to line 758 because the condition on line 757 was never true1a

758 app.add_middleware(api.middleware.CsrfMiddleware) 

759 

760 if prefect.settings.PREFECT_API_ENABLE_METRICS: 760 ↛ 761line 760 didn't jump to line 761 because the condition on line 760 was never true1a

761 from prometheus_client import CONTENT_TYPE_LATEST, generate_latest 

762 

763 @api_app.get("/metrics") 

764 async def metrics() -> Response: # type: ignore[reportUnusedFunction] 

765 return Response(content=generate_latest(), media_type=CONTENT_TYPE_LATEST) 

766 

767 api_app.mount( 1a

768 "/static", 

769 StaticFiles( 

770 directory=os.path.join( 

771 os.path.dirname(os.path.realpath(__file__)), "static" 

772 ) 

773 ), 

774 name="static", 

775 ) 

776 app.api_app = api_app 1a

777 if PREFECT_SERVER_API_BASE_PATH: 777 ↛ 778line 777 didn't jump to line 778 because the condition on line 777 was never true1a

778 app.mount(PREFECT_SERVER_API_BASE_PATH.value(), app=api_app, name="api") 

779 else: 

780 app.mount("/api", app=api_app, name="api") 1a

781 app.mount("/", app=ui_app, name="ui") 1a

782 

783 def openapi(): 1a

784 """ 

785 Convenience method for extracting the user facing OpenAPI schema from the API app. 

786 

787 This method is attached to the global public app for easy access. 

788 """ 

789 partial_schema = get_openapi( 

790 title=API_TITLE, 

791 version=API_VERSION, 

792 routes=api_app.routes, 

793 ) 

794 new_schema = partial_schema.copy() 

795 new_schema["paths"] = {} 

796 for path, value in partial_schema["paths"].items(): 

797 new_schema["paths"][f"/api{path}"] = value 

798 

799 new_schema["info"]["x-logo"] = {"url": "static/prefect-logo-mark-gradient.png"} 

800 return new_schema 

801 

802 app.openapi = openapi 1a

803 

804 APP_CACHE[cache_key] = app 1a

805 return app 1a

806 

807 

808subprocess_server_logger: "logging.Logger" = get_logger() 1a

809 

810 

811class SubprocessASGIServer: 1a

812 _instances: dict[int | None, "SubprocessASGIServer"] = {} 1a

813 _port_range: range = range(8000, 9000) 1a

814 

815 def __new__(cls, port: int | None = None, *args: Any, **kwargs: Any) -> Self: 1a

816 """ 

817 Return an instance of the server associated with the provided port. 

818 Prevents multiple instances from being created for the same port. 

819 """ 

820 if port not in cls._instances: 

821 instance = super().__new__(cls) 

822 cls._instances[port] = instance 

823 return cls._instances[port] 

824 

825 def __init__(self, port: Optional[int] = None): 1a

826 # This ensures initialization happens only once 

827 if not hasattr(self, "_initialized"): 

828 self.port: Optional[int] = port 

829 self.server_process: subprocess.Popen[Any] | None = None 

830 self.running: bool = False 

831 self._initialized = True 

832 

833 def find_available_port(self) -> int: 1a

834 max_attempts = 10 

835 for _ in range(max_attempts): 835 ↛ anywhereline 835 didn't jump anywhere: it always raised an exception.1b

836 port = random.choice(self._port_range) 

837 if self.is_port_available(port): 

838 return port 1b

839 time.sleep(random.uniform(0.1, 0.5)) # Random backoff 1b

840 raise RuntimeError("Unable to find an available port after multiple attempts") 1b

841 

842 @staticmethod 1a

843 def is_port_available(port: int) -> bool: 1ab

844 with contextlib.closing( 

845 socket.socket(socket.AF_INET, socket.SOCK_STREAM) 

846 ) as sock: 

847 try: 1b

848 sock.bind(("127.0.0.1", port)) 

849 return True 

850 except socket.error: 1b

851 return False 

852 

853 @property 1a

854 def address(self) -> str: 1a

855 return f"http://127.0.0.1:{self.port}" 

856 

857 @property 1a

858 def api_url(self) -> str: 1a

859 return f"{self.address}/api" 

860 

861 def start(self, timeout: Optional[int] = None) -> None: 1a

862 """ 

863 Start the server in a separate process. Safe to call multiple times; only starts 

864 the server once. 

865 

866 Args: 

867 timeout: The maximum time to wait for the server to start 

868 """ 

869 if not self.running: 

870 if self.port is None: 

871 self.port = self.find_available_port() 

872 assert self.port is not None, "Port must be provided or available" 

873 help_message = ( 

874 f"Starting temporary server on {self.address}\nSee " 

875 "https://docs.prefect.io/v3/concepts/server#how-to-guides " 

876 "for more information on running a dedicated Prefect server." 

877 ) 

878 subprocess_server_logger.info(help_message) 

879 try: 

880 self.running = True 

881 self.server_process = self._run_uvicorn_command() 

882 atexit.register(self.stop) 

883 with httpx.Client() as client: 

884 response = None 

885 elapsed_time = 0 

886 max_wait_time = ( 

887 timeout 

888 or PREFECT_SERVER_EPHEMERAL_STARTUP_TIMEOUT_SECONDS.value() 

889 ) 

890 while elapsed_time < max_wait_time: 

891 if self.server_process.poll() == 3: 

892 self.port = self.find_available_port() 

893 self.server_process = self._run_uvicorn_command() 

894 continue 

895 try: 

896 response = client.get(f"{self.api_url}/health") 

897 except httpx.ConnectError: 

898 pass 

899 else: 

900 if response.status_code == 200: 

901 break 

902 time.sleep(0.1) 

903 elapsed_time += 0.1 

904 if response: 

905 response.raise_for_status() 

906 if not response: 

907 error_message = "Timed out while attempting to connect to ephemeral Prefect API server." 

908 if self.server_process.poll() is not None: 

909 error_message += f" Ephemeral server process exited with code {self.server_process.returncode}." 

910 if self.server_process.stdout: 

911 error_message += ( 

912 f" stdout: {self.server_process.stdout.read()}" 

913 ) 

914 if self.server_process.stderr: 

915 error_message += ( 

916 f" stderr: {self.server_process.stderr.read()}" 

917 ) 

918 raise RuntimeError(error_message) 

919 except Exception: 

920 self.running = False 

921 raise 

922 

923 def _run_uvicorn_command(self) -> subprocess.Popen[Any]: 1a

924 # used to turn off serving the UI 

925 server_env = { 

926 "PREFECT_UI_ENABLED": "0", 

927 "PREFECT__SERVER_EPHEMERAL": "1", 

928 "PREFECT__SERVER_FINAL": "1", 

929 } 

930 return subprocess.Popen( 

931 args=[ 

932 sys.executable, 

933 "-m", 

934 "uvicorn", 

935 "--app-dir", 

936 str(prefect.__module_path__.parent), 

937 "--factory", 

938 "prefect.server.api.server:create_app", 

939 "--host", 

940 "127.0.0.1", 

941 "--port", 

942 str(self.port), 

943 "--log-level", 

944 "error", 

945 "--lifespan", 

946 "on", 

947 ], 

948 env={ 

949 **os.environ, 

950 **server_env, 

951 **get_current_settings().to_environment_variables(exclude_unset=True), 

952 }, 

953 ) 

954 

955 def stop(self) -> None: 1a

956 if self.server_process: 

957 subprocess_server_logger.info( 

958 f"Stopping temporary server on {self.address}" 

959 ) 

960 self.server_process.terminate() 

961 try: 

962 self.server_process.wait(timeout=5) 

963 except subprocess.TimeoutExpired: 

964 self.server_process.kill() 

965 # Ensure the process is reaped to avoid ResourceWarning 

966 self.server_process.wait() 

967 finally: 

968 self.server_process = None 

969 if self.port in self._instances: 

970 del self._instances[self.port] 

971 if self.running: 

972 self.running = False 

973 

974 def __enter__(self) -> Self: 1a

975 self.start() 

976 return self 

977 

978 def __exit__(self, *args: Any) -> None: 1a

979 self.stop()