Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/dependencies.py: 79%
80 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Utilities for injecting FastAPI dependencies.
3"""
5from __future__ import annotations 1a
7import logging 1a
8import re 1a
9from base64 import b64decode 1a
10from typing import Annotated, Any, Optional 1a
11from uuid import UUID 1a
13from docket import Docket as Docket_ 1a
14from fastapi import Body, Depends, Header, HTTPException 1a
15from packaging.version import Version 1a
16from starlette.requests import Request 1a
18from prefect._internal.compatibility.starlette import status 1a
19from prefect.server import schemas 1a
20from prefect.settings import PREFECT_API_DEFAULT_LIMIT 1a
23def provide_request_api_version( 1a
24 x_prefect_api_version: str = Header(None),
25) -> Version | None:
26 if not x_prefect_api_version: 26 ↛ 30line 26 didn't jump to line 30 because the condition on line 26 was always true1cdb
27 return 1cdb
29 # parse version
30 try:
31 _, _, _ = [int(v) for v in x_prefect_api_version.split(".")]
32 except ValueError:
33 raise HTTPException(
34 status_code=status.HTTP_400_BAD_REQUEST,
35 detail=(
36 "Invalid X-PREFECT-API-VERSION header format.Expected header in format"
37 f" 'x.y.z' but received {x_prefect_api_version}"
38 ),
39 )
40 return Version(x_prefect_api_version)
43class EnforceMinimumAPIVersion: 1a
44 """
45 FastAPI Dependency used to check compatibility between the version of the api
46 and a given request.
48 Looks for the header 'X-PREFECT-API-VERSION' in the request and compares it
49 to the api's version. Rejects requests that are lower than the minimum version.
50 """
52 def __init__(self, minimum_api_version: str, logger: logging.Logger): 1a
53 self.minimum_api_version = minimum_api_version 1a
54 versions = [int(v) for v in minimum_api_version.split(".")] 1a
55 self.api_major: int = versions[0] 1a
56 self.api_minor: int = versions[1] 1a
57 self.api_patch: int = versions[2] 1a
58 self.logger = logger 1a
60 async def __call__( 1a
61 self,
62 x_prefect_api_version: str = Header(None),
63 ) -> None:
64 request_version = x_prefect_api_version 1cgdfeb
66 # if no version header, assume latest and continue
67 if not request_version: 1cgdfeb
68 return 1cdfeb
70 # parse version
71 try: 1cgdeb
72 major, minor, patch = [int(v) for v in request_version.split(".")] 1cgdeb
73 except ValueError: 1cgdeb
74 await self._notify_of_invalid_value(request_version) 1cgdeb
75 raise HTTPException( 1cgdeb
76 status_code=status.HTTP_400_BAD_REQUEST,
77 detail=(
78 "Invalid X-PREFECT-API-VERSION header format."
79 f"Expected header in format 'x.y.z' but received {request_version}"
80 ),
81 )
83 if (major, minor, patch) < (self.api_major, self.api_minor, self.api_patch):
84 await self._notify_of_outdated_version(request_version)
85 raise HTTPException(
86 status_code=status.HTTP_400_BAD_REQUEST,
87 detail=(
88 f"The request specified API version {request_version} but this "
89 f"server requires version {self.minimum_api_version} or higher."
90 ),
91 )
93 async def _notify_of_invalid_value(self, request_version: str): 1a
94 self.logger.error( 1cgdeb
95 f"Invalid X-PREFECT-API-VERSION header format: '{request_version}'"
96 )
98 async def _notify_of_outdated_version(self, request_version: str): 1a
99 self.logger.error(
100 f"X-PREFECT-API-VERSION header specifies version '{request_version}' "
101 f"but minimum allowed version is '{self.minimum_api_version}'"
102 )
105def LimitBody() -> Any: 1a
106 """
107 A `fastapi.Depends` factory for pulling a `limit: int` parameter from the
108 request body while determining the default from the current settings.
109 """
111 def get_limit( 1a
112 limit: int = Body(
113 None,
114 description="Defaults to PREFECT_API_DEFAULT_LIMIT if not provided.",
115 ),
116 ):
117 default_limit = PREFECT_API_DEFAULT_LIMIT.value() 1cdfeb
118 limit = limit if limit is not None else default_limit 1cdfeb
119 if not limit >= 0: 1cdfeb
120 raise HTTPException( 1cdb
121 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
122 detail="Invalid limit: must be greater than or equal to 0.",
123 )
124 if limit > default_limit: 1cdfeb
125 raise HTTPException( 1cdb
126 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
127 detail=f"Invalid limit: must be less than or equal to {default_limit}.",
128 )
129 return limit 1cdfeb
131 return Depends(get_limit) 1a
134def get_created_by( 1a
135 prefect_automation_id: Optional[UUID] = Header(None, include_in_schema=False),
136 prefect_automation_name: Optional[str] = Header(None, include_in_schema=False),
137) -> Optional[schemas.core.CreatedBy]:
138 """A dependency that returns the provenance information to use when creating objects
139 during this API call."""
140 if prefect_automation_id and prefect_automation_name: 1cdfb
141 try: 1dfb
142 display_value = b64decode(prefect_automation_name.encode()).decode() 1dfb
143 except Exception:
144 display_value = None
146 if display_value: 146 ↛ 153line 146 didn't jump to line 153 because the condition on line 146 was always true1dfb
147 return schemas.core.CreatedBy( 1dfb
148 id=prefect_automation_id,
149 type="AUTOMATION",
150 display_value=display_value,
151 )
153 return None 1cdb
156def get_updated_by( 1a
157 prefect_automation_id: Optional[UUID] = Header(None, include_in_schema=False),
158 prefect_automation_name: Optional[str] = Header(None, include_in_schema=False),
159) -> Optional[schemas.core.UpdatedBy]:
160 """A dependency that returns the provenance information to use when updating objects
161 during this API call."""
162 if prefect_automation_id and prefect_automation_name: 162 ↛ 163line 162 didn't jump to line 163 because the condition on line 162 was never true1cdb
163 return schemas.core.UpdatedBy(
164 id=prefect_automation_id,
165 type="AUTOMATION",
166 display_value=prefect_automation_name,
167 )
169 return None 1cdb
172def is_ephemeral_request(request: Request) -> bool: 1a
173 """
174 A dependency that returns whether the request is to an ephemeral server.
175 """
176 return "ephemeral-prefect" in str(request.base_url) 1c
179PREFECT_CLIENT_USER_AGENT_PATTERN = re.compile( 1a
180 r"^prefect/(\d+\.\d+\.\d+(?:[a-z.+0-9]+)?) \(API \S+\)$"
181)
184def get_prefect_client_version( 1a
185 user_agent: Annotated[Optional[str], Header(include_in_schema=False)] = None,
186) -> Optional[str]:
187 """
188 Attempts to parse out the Prefect client version from the User-Agent header.
190 The Prefect client sets the User-Agent header like so:
191 f"prefect/{prefect.__version__} (API {constants.SERVER_API_VERSION})"
192 """
193 if not user_agent: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true1cb
194 return None
196 if client_version := PREFECT_CLIENT_USER_AGENT_PATTERN.match(user_agent): 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true1cb
197 return client_version.group(1)
198 return None 1cb
201def docket(request: Request) -> Docket_: 1a
202 return request.app.state.docket 1cdb
205Docket = Annotated[Docket_, Depends(docket)] 1a