Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/base.py: 18%

245 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import copy 1a

2import sys 1a

3import threading 1a

4import time 1a

5import uuid 1a

6from collections import defaultdict 1a

7from collections.abc import AsyncGenerator, Awaitable, MutableMapping 1a

8from contextlib import asynccontextmanager 1a

9from datetime import datetime, timezone 1a

10from logging import Logger 1a

11from typing import TYPE_CHECKING, Any, Callable, Optional, Protocol, runtime_checkable 1a

12 

13import anyio 1a

14import httpx 1a

15from asgi_lifespan import LifespanManager 1a

16from httpx import HTTPStatusError, Request, Response 1a

17from typing_extensions import Self 1a

18 

19import prefect 1a

20from prefect._internal.compatibility.starlette import status 1a

21from prefect.client import constants 1a

22from prefect.client.schemas.objects import CsrfToken 1a

23from prefect.exceptions import PrefectHTTPStatusError 1a

24from prefect.logging import get_logger 1a

25from prefect.settings import ( 1a

26 PREFECT_API_URL, 

27 PREFECT_CLIENT_MAX_RETRIES, 

28 PREFECT_CLIENT_RETRY_EXTRA_CODES, 

29 PREFECT_CLIENT_RETRY_JITTER_FACTOR, 

30 PREFECT_CLOUD_API_URL, 

31 PREFECT_SERVER_ALLOW_EPHEMERAL_MODE, 

32 get_current_settings, 

33) 

34from prefect.utilities.collections import AutoEnum 1a

35from prefect.utilities.math import bounded_poisson_interval, clamped_poisson_interval 1a

36 

37# Datastores for lifespan management, keys should be a tuple of thread and app 

38# identities. 

39APP_LIFESPANS: dict[tuple[int, int], LifespanManager] = {} 1a

40APP_LIFESPANS_REF_COUNTS: dict[tuple[int, int], int] = {} 1a

41# Blocks concurrent access to the above dicts per thread. The index should be the thread 

42# identity. 

43APP_LIFESPANS_LOCKS: dict[int, anyio.Lock] = defaultdict(anyio.Lock) 1a

44 

45 

46logger: Logger = get_logger("client") 1a

47 

48 

49# Define ASGI application types for type checking 

50Scope = MutableMapping[str, Any] 1a

51Message = MutableMapping[str, Any] 1a

52 

53Receive = Callable[[], Awaitable[Message]] 1a

54Send = Callable[[Message], Awaitable[None]] 1a

55 

56 

57@runtime_checkable 1a

58class ASGIApp(Protocol): 1a

59 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: ... 59 ↛ exitline 59 didn't return from function '__call__' because 1a

60 

61 

62@asynccontextmanager 1a

63async def app_lifespan_context(app: ASGIApp) -> AsyncGenerator[None, None]: 1a

64 """ 

65 A context manager that calls startup/shutdown hooks for the given application. 

66 

67 Lifespan contexts are cached per application to avoid calling the lifespan hooks 

68 more than once if the context is entered in nested code. A no-op context will be 

69 returned if the context for the given application is already being managed. 

70 

71 This manager is robust to concurrent access within the event loop. For example, 

72 if you have concurrent contexts for the same application, it is guaranteed that 

73 startup hooks will be called before their context starts and shutdown hooks will 

74 only be called after their context exits. 

75 

76 A reference count is used to support nested use of clients without running 

77 lifespan hooks excessively. The first client context entered will create and enter 

78 a lifespan context. Each subsequent client will increment a reference count but will 

79 not create a new lifespan context. When each client context exits, the reference 

80 count is decremented. When the last client context exits, the lifespan will be 

81 closed. 

82 

83 In simple nested cases, the first client context will be the one to exit the 

84 lifespan. However, if client contexts are entered concurrently they may not exit 

85 in a consistent order. If the first client context was responsible for closing 

86 the lifespan, it would have to wait until all other client contexts to exit to 

87 avoid firing shutdown hooks while the application is in use. Waiting for the other 

88 clients to exit can introduce deadlocks, so, instead, the first client will exit 

89 without closing the lifespan context and reference counts will be used to ensure 

90 the lifespan is closed once all of the clients are done. 

91 """ 

92 # TODO: A deadlock has been observed during multithreaded use of clients while this 

93 # lifespan context is being used. This has only been reproduced on Python 3.7 

94 # and while we hope to discourage using multiple event loops in threads, this 

95 # bug may emerge again. 

96 # See https://github.com/PrefectHQ/orion/pull/1696 

97 thread_id = threading.get_ident() 

98 

99 # The id of the application is used instead of the hash so each application instance 

100 # is managed independently even if they share the same settings. We include the 

101 # thread id since applications are managed separately per thread. 

102 key = (thread_id, id(app)) 

103 

104 # On exception, this will be populated with exception details 

105 exc_info = (None, None, None) 

106 

107 # Get a lock unique to this thread since anyio locks are not threadsafe 

108 lock = APP_LIFESPANS_LOCKS[thread_id] 

109 

110 async with lock: 

111 if key in APP_LIFESPANS: 

112 # The lifespan is already being managed, just increment the reference count 

113 APP_LIFESPANS_REF_COUNTS[key] += 1 

114 else: 

115 # Create a new lifespan manager 

116 APP_LIFESPANS[key] = context = LifespanManager( 

117 app, startup_timeout=30, shutdown_timeout=30 

118 ) 

119 APP_LIFESPANS_REF_COUNTS[key] = 1 

120 

121 # Ensure we enter the context before releasing the lock so startup hooks 

122 # are complete before another client can be used 

123 await context.__aenter__() 

124 

125 try: 

126 yield 

127 except BaseException: 

128 exc_info = sys.exc_info() 

129 raise 

130 finally: 

131 # If we do not shield against anyio cancellation, the lock will return 

132 # immediately and the code in its context will not run, leaving the lifespan 

133 # open 

134 with anyio.CancelScope(shield=True): 

135 async with lock: 

136 # After the consumer exits the context, decrement the reference count 

137 APP_LIFESPANS_REF_COUNTS[key] -= 1 

138 

139 # If this the last context to exit, close the lifespan 

140 if APP_LIFESPANS_REF_COUNTS[key] <= 0: 

141 APP_LIFESPANS_REF_COUNTS.pop(key) 

142 context = APP_LIFESPANS.pop(key) 

143 await context.__aexit__(*exc_info) 

144 

145 

146class PrefectResponse(httpx.Response): 1a

147 """ 

148 A Prefect wrapper for the `httpx.Response` class. 

149 

150 Provides more informative error messages. 

151 """ 

152 

153 def raise_for_status(self) -> Response: 1a

154 """ 

155 Raise an exception if the response contains an HTTPStatusError. 

156 

157 The `PrefectHTTPStatusError` contains useful additional information that 

158 is not contained in the `HTTPStatusError`. 

159 """ 

160 try: 

161 return super().raise_for_status() 

162 except HTTPStatusError as exc: 

163 raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__ 

164 

165 @classmethod 1a

166 def from_httpx_response(cls: type[Self], response: httpx.Response) -> Response: 1a

167 """ 

168 Create a `PrefectResponse` from an `httpx.Response`. 

169 

170 By changing the `__class__` attribute of the Response, we change the method 

171 resolution order to look for methods defined in PrefectResponse, while leaving 

172 everything else about the original Response instance intact. 

173 """ 

174 new_response = copy.copy(response) 

175 new_response.__class__ = cls 

176 return new_response 

177 

178 

179class PrefectHttpxAsyncClient(httpx.AsyncClient): 1a

180 """ 

181 A Prefect wrapper for the async httpx client with support for retry-after headers 

182 for the provided status codes (typically 429, 502 and 503). 

183 

184 Additionally, this client will always call `raise_for_status` on responses. 

185 

186 For more details on rate limit headers, see: 

187 [Configuring Cloudflare Rate Limiting](https://support.cloudflare.com/hc/en-us/articles/115001635128-Configuring-Rate-Limiting-from-UI) 

188 """ 

189 

190 def __init__( 1a

191 self, 

192 *args: Any, 

193 enable_csrf_support: bool = False, 

194 raise_on_all_errors: bool = True, 

195 **kwargs: Any, 

196 ): 

197 self.enable_csrf_support: bool = enable_csrf_support 

198 self.csrf_token: Optional[str] = None 

199 self.csrf_token_expiration: Optional[datetime] = None 

200 self.csrf_client_id: uuid.UUID = uuid.uuid4() 

201 self.raise_on_all_errors: bool = raise_on_all_errors 

202 

203 super().__init__(*args, **kwargs) 

204 

205 user_agent = ( 

206 f"prefect/{prefect.__version__} (API {constants.SERVER_API_VERSION})" 

207 ) 

208 self.headers["User-Agent"] = user_agent 

209 

210 # Add custom headers from settings 

211 custom_headers = get_current_settings().client.custom_headers 

212 for header_name, header_value in custom_headers.items(): 

213 # Prevent overriding critical headers 

214 if header_name.lower() in { 

215 "user-agent", 

216 "prefect-csrf-token", 

217 "prefect-csrf-client", 

218 }: 

219 logger.warning( 

220 f"Custom header '{header_name}' is ignored because it conflicts with " 

221 f"a protected header managed by Prefect. Protected headers include: " 

222 f"User-Agent, Prefect-Csrf-Token, Prefect-Csrf-Client" 

223 ) 

224 else: 

225 self.headers[header_name] = header_value 

226 

227 async def _send_with_retry( 1a

228 self, 

229 request: Request, 

230 send: Callable[[Request], Awaitable[Response]], 

231 send_args: tuple[Any, ...], 

232 send_kwargs: dict[str, Any], 

233 retry_codes: set[int] = set(), 

234 retry_exceptions: tuple[type[Exception], ...] = tuple(), 

235 ): 

236 """ 

237 Send a request and retry it if it fails. 

238 

239 Sends the provided request and retries it up to PREFECT_CLIENT_MAX_RETRIES times 

240 if the request either raises an exception listed in `retry_exceptions` or 

241 receives a response with a status code listed in `retry_codes`. 

242 

243 Retries are not counted against the limit if the response headers contains 

244 a reserved value, indicating that the server is undergoing maintenance. These 

245 requests will retry indefinitely until the header is no longer returned. 

246 

247 Retries will be delayed based on either the retry header (preferred) or 

248 exponential backoff if a retry header is not provided. 

249 """ 

250 try_count = 0 

251 response = None 

252 

253 if TYPE_CHECKING: 

254 # older httpx versions type method as str | bytes | Unknown 

255 # but in reality it is always a string. 

256 assert isinstance(request.method, str) # type: ignore 

257 

258 is_change_request = request.method.lower() in {"post", "put", "patch", "delete"} 

259 

260 if self.enable_csrf_support and is_change_request: 

261 await self._add_csrf_headers(request=request) 

262 

263 while try_count <= PREFECT_CLIENT_MAX_RETRIES.value(): 

264 retry_seconds = None 

265 exc_info = None 

266 

267 try: 

268 response = await send(request, *send_args, **send_kwargs) 

269 except retry_exceptions: # type: ignore 

270 try_count += 1 

271 if try_count > PREFECT_CLIENT_MAX_RETRIES.value(): 

272 raise 

273 # Otherwise, we will ignore this error but capture the info for logging 

274 exc_info = sys.exc_info() 

275 else: 

276 if response.headers.get("Prefect-Maintenance") != "true": 

277 try_count += 1 

278 

279 # We got a response; check if it's a CSRF error, otherwise 

280 # return immediately if it is not retryable 

281 if ( 

282 response.status_code == status.HTTP_403_FORBIDDEN 

283 and "Invalid CSRF token" in response.text 

284 ): 

285 # We got a CSRF error, clear the token and try again 

286 self.csrf_token = None 

287 await self._add_csrf_headers(request) 

288 elif response.status_code not in retry_codes: 

289 return response 

290 

291 if "Retry-After" in response.headers: 

292 retry_seconds = float(response.headers["Retry-After"]) 

293 

294 # Use an exponential back-off if not set in a header 

295 if retry_seconds is None: 

296 retry_seconds = 2**try_count 

297 

298 # Add jitter 

299 jitter_factor = PREFECT_CLIENT_RETRY_JITTER_FACTOR.value() 

300 if retry_seconds > 0 and jitter_factor > 0: 

301 if response is not None and "Retry-After" in response.headers: 

302 # Always wait for _at least_ retry seconds if requested by the API 

303 retry_seconds = bounded_poisson_interval( 

304 retry_seconds, retry_seconds * (1 + jitter_factor) 

305 ) 

306 else: 

307 # Otherwise, use a symmetrical jitter 

308 retry_seconds = clamped_poisson_interval( 

309 retry_seconds, jitter_factor 

310 ) 

311 

312 logger.debug( 

313 ( 

314 "Encountered retryable exception during request. " 

315 if exc_info 

316 else ( 

317 "Received response with retryable status code" 

318 f" {response.status_code if response else 'unknown'}. " 

319 ) 

320 ) 

321 + f"Another attempt will be made in {retry_seconds}s. " 

322 "This is attempt" 

323 f" {try_count}/{PREFECT_CLIENT_MAX_RETRIES.value() + 1}.", 

324 exc_info=exc_info, 

325 ) 

326 await anyio.sleep(retry_seconds) 

327 

328 assert response is not None, ( 

329 "Retry handling ended without response or exception" 

330 ) 

331 

332 # We ran out of retries, return the failed response 

333 return response 

334 

335 async def send(self, request: Request, *args: Any, **kwargs: Any) -> Response: 1a

336 """ 

337 Send a request with automatic retry behavior for the following status codes: 

338 

339 - 403 Forbidden, if the request failed due to CSRF protection 

340 - 408 Request Timeout 

341 - 429 CloudFlare-style rate limiting 

342 - 502 Bad Gateway 

343 - 503 Service unavailable 

344 - Any additional status codes provided in `PREFECT_CLIENT_RETRY_EXTRA_CODES` 

345 """ 

346 

347 super_send = super().send 

348 response = await self._send_with_retry( 

349 request=request, 

350 send=super_send, 

351 send_args=args, 

352 send_kwargs=kwargs, 

353 retry_codes={ 

354 status.HTTP_429_TOO_MANY_REQUESTS, 

355 status.HTTP_503_SERVICE_UNAVAILABLE, 

356 status.HTTP_502_BAD_GATEWAY, 

357 status.HTTP_408_REQUEST_TIMEOUT, 

358 *PREFECT_CLIENT_RETRY_EXTRA_CODES.value(), 

359 }, 

360 retry_exceptions=( 

361 httpx.ReadTimeout, 

362 httpx.PoolTimeout, 

363 httpx.ConnectTimeout, 

364 # `ConnectionResetError` when reading socket raises as a `ReadError` 

365 httpx.ReadError, 

366 # Sockets can be closed during writes resulting in a `WriteError` 

367 httpx.WriteError, 

368 # Uvicorn bug, see https://github.com/PrefectHQ/prefect/issues/7512 

369 httpx.RemoteProtocolError, 

370 # HTTP2 bug, see https://github.com/PrefectHQ/prefect/issues/7442 

371 httpx.LocalProtocolError, 

372 ), 

373 ) 

374 

375 # Convert to a Prefect response to add nicer errors messages 

376 response = PrefectResponse.from_httpx_response(response) 

377 

378 if self.raise_on_all_errors: 

379 response.raise_for_status() 

380 

381 return response 

382 

383 async def _add_csrf_headers(self, request: Request): 1a

384 now = datetime.now(timezone.utc) 

385 

386 if not self.enable_csrf_support: 

387 return 

388 

389 if not self.csrf_token or ( 

390 self.csrf_token_expiration and now > self.csrf_token_expiration 

391 ): 

392 token_request = self.build_request( 

393 "GET", f"/csrf-token?client={self.csrf_client_id}" 

394 ) 

395 

396 try: 

397 token_response = await self.send(token_request) 

398 except PrefectHTTPStatusError as exc: 

399 old_server = exc.response.status_code == status.HTTP_404_NOT_FOUND 

400 unconfigured_server = ( 

401 exc.response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY 

402 and "CSRF protection is disabled." in exc.response.text 

403 ) 

404 

405 if old_server or unconfigured_server: 

406 # The token endpoint is either unavailable, suggesting an 

407 # older server, or CSRF protection is disabled. In either 

408 # case we should disable CSRF support. 

409 self.enable_csrf_support = False 

410 return 

411 

412 raise 

413 

414 token: CsrfToken = CsrfToken.model_validate(token_response.json()) 

415 self.csrf_token = token.token 

416 self.csrf_token_expiration = token.expiration 

417 

418 request.headers["Prefect-Csrf-Token"] = self.csrf_token 

419 request.headers["Prefect-Csrf-Client"] = str(self.csrf_client_id) 

420 

421 

422class PrefectHttpxSyncClient(httpx.Client): 1a

423 """ 

424 A Prefect wrapper for the async httpx client with support for retry-after headers 

425 for the provided status codes (typically 429, 502 and 503). 

426 

427 Additionally, this client will always call `raise_for_status` on responses. 

428 

429 For more details on rate limit headers, see: 

430 [Configuring Cloudflare Rate Limiting](https://support.cloudflare.com/hc/en-us/articles/115001635128-Configuring-Rate-Limiting-from-UI) 

431 """ 

432 

433 def __init__( 1a

434 self, 

435 *args: Any, 

436 enable_csrf_support: bool = False, 

437 raise_on_all_errors: bool = True, 

438 **kwargs: Any, 

439 ): 

440 self.enable_csrf_support: bool = enable_csrf_support 

441 self.csrf_token: Optional[str] = None 

442 self.csrf_token_expiration: Optional[datetime] = None 

443 self.csrf_client_id: uuid.UUID = uuid.uuid4() 

444 self.raise_on_all_errors: bool = raise_on_all_errors 

445 

446 super().__init__(*args, **kwargs) 

447 

448 user_agent = ( 

449 f"prefect/{prefect.__version__} (API {constants.SERVER_API_VERSION})" 

450 ) 

451 self.headers["User-Agent"] = user_agent 

452 

453 # Add custom headers from settings 

454 custom_headers = get_current_settings().client.custom_headers 

455 for header_name, header_value in custom_headers.items(): 

456 # Prevent overriding critical headers 

457 if header_name.lower() in { 

458 "user-agent", 

459 "prefect-csrf-token", 

460 "prefect-csrf-client", 

461 }: 

462 logger.warning( 

463 f"Custom header '{header_name}' is ignored because it conflicts with " 

464 f"a protected header managed by Prefect. Protected headers include: " 

465 f"User-Agent, Prefect-Csrf-Token, Prefect-Csrf-Client" 

466 ) 

467 else: 

468 self.headers[header_name] = header_value 

469 

470 def _send_with_retry( 1a

471 self, 

472 request: Request, 

473 send: Callable[[Request], Response], 

474 send_args: tuple[Any, ...], 

475 send_kwargs: dict[str, Any], 

476 retry_codes: set[int] = set(), 

477 retry_exceptions: tuple[type[Exception], ...] = tuple(), 

478 ): 

479 """ 

480 Send a request and retry it if it fails. 

481 

482 Sends the provided request and retries it up to PREFECT_CLIENT_MAX_RETRIES times 

483 if the request either raises an exception listed in `retry_exceptions` or 

484 receives a response with a status code listed in `retry_codes`. 

485 

486 Retries are not counted against the limit if the response headers contains 

487 a reserved value, indicating that the server is undergoing maintenance. These 

488 requests will retry indefinitely until the header is no longer returned. 

489 

490 Retries will be delayed based on either the retry header (preferred) or 

491 exponential backoff if a retry header is not provided. 

492 """ 

493 try_count = 0 

494 response = None 

495 

496 if TYPE_CHECKING: 

497 # older httpx versions type method as str | bytes | Unknown 

498 # but in reality it is always a string. 

499 assert isinstance(request.method, str) # type: ignore 

500 

501 is_change_request = request.method.lower() in {"post", "put", "patch", "delete"} 

502 

503 if self.enable_csrf_support and is_change_request: 

504 self._add_csrf_headers(request=request) 

505 

506 while try_count <= PREFECT_CLIENT_MAX_RETRIES.value(): 

507 retry_seconds = None 

508 exc_info = None 

509 

510 try: 

511 response = send(request, *send_args, **send_kwargs) 

512 except retry_exceptions: # type: ignore 

513 try_count += 1 

514 if try_count > PREFECT_CLIENT_MAX_RETRIES.value(): 

515 raise 

516 # Otherwise, we will ignore this error but capture the info for logging 

517 exc_info = sys.exc_info() 

518 else: 

519 if response.headers.get("Prefect-Maintenance") != "true": 

520 try_count += 1 

521 

522 # We got a response; check if it's a CSRF error, otherwise 

523 # return immediately if it is not retryable 

524 if ( 

525 response.status_code == status.HTTP_403_FORBIDDEN 

526 and "Invalid CSRF token" in response.text 

527 ): 

528 # We got a CSRF error, clear the token and try again 

529 self.csrf_token = None 

530 self._add_csrf_headers(request) 

531 elif response.status_code not in retry_codes: 

532 return response 

533 

534 if "Retry-After" in response.headers: 

535 retry_seconds = float(response.headers["Retry-After"]) 

536 

537 # Use an exponential back-off if not set in a header 

538 if retry_seconds is None: 

539 retry_seconds = 2**try_count 

540 

541 # Add jitter 

542 jitter_factor = PREFECT_CLIENT_RETRY_JITTER_FACTOR.value() 

543 if retry_seconds > 0 and jitter_factor > 0: 

544 if response is not None and "Retry-After" in response.headers: 

545 # Always wait for _at least_ retry seconds if requested by the API 

546 retry_seconds = bounded_poisson_interval( 

547 retry_seconds, retry_seconds * (1 + jitter_factor) 

548 ) 

549 else: 

550 # Otherwise, use a symmetrical jitter 

551 retry_seconds = clamped_poisson_interval( 

552 retry_seconds, jitter_factor 

553 ) 

554 

555 logger.debug( 

556 ( 

557 "Encountered retryable exception during request. " 

558 if exc_info 

559 else ( 

560 "Received response with retryable status code" 

561 f" {response.status_code if response else 'unknown'}. " 

562 ) 

563 ) 

564 + f"Another attempt will be made in {retry_seconds}s. " 

565 "This is attempt" 

566 f" {try_count}/{PREFECT_CLIENT_MAX_RETRIES.value() + 1}.", 

567 exc_info=exc_info, 

568 ) 

569 time.sleep(retry_seconds) 

570 

571 assert response is not None, ( 

572 "Retry handling ended without response or exception" 

573 ) 

574 

575 # We ran out of retries, return the failed response 

576 return response 

577 

578 def send(self, request: Request, *args: Any, **kwargs: Any) -> Response: 1a

579 """ 

580 Send a request with automatic retry behavior for the following status codes: 

581 

582 - 403 Forbidden, if the request failed due to CSRF protection 

583 - 408 Request Timeout 

584 - 429 CloudFlare-style rate limiting 

585 - 502 Bad Gateway 

586 - 503 Service unavailable 

587 - Any additional status codes provided in `PREFECT_CLIENT_RETRY_EXTRA_CODES` 

588 """ 

589 

590 super_send = super().send 

591 response = self._send_with_retry( 

592 request=request, 

593 send=super_send, 

594 send_args=args, 

595 send_kwargs=kwargs, 

596 retry_codes={ 

597 status.HTTP_429_TOO_MANY_REQUESTS, 

598 status.HTTP_503_SERVICE_UNAVAILABLE, 

599 status.HTTP_502_BAD_GATEWAY, 

600 status.HTTP_408_REQUEST_TIMEOUT, 

601 *PREFECT_CLIENT_RETRY_EXTRA_CODES.value(), 

602 }, 

603 retry_exceptions=( 

604 httpx.ReadTimeout, 

605 httpx.PoolTimeout, 

606 httpx.ConnectTimeout, 

607 # `ConnectionResetError` when reading socket raises as a `ReadError` 

608 httpx.ReadError, 

609 # Sockets can be closed during writes resulting in a `WriteError` 

610 httpx.WriteError, 

611 # Uvicorn bug, see https://github.com/PrefectHQ/prefect/issues/7512 

612 httpx.RemoteProtocolError, 

613 # HTTP2 bug, see https://github.com/PrefectHQ/prefect/issues/7442 

614 httpx.LocalProtocolError, 

615 ), 

616 ) 

617 

618 # Convert to a Prefect response to add nicer errors messages 

619 response = PrefectResponse.from_httpx_response(response) 

620 

621 if self.raise_on_all_errors: 

622 response.raise_for_status() 

623 

624 return response 

625 

626 def _add_csrf_headers(self, request: Request): 1a

627 now = datetime.now(timezone.utc) 

628 

629 if not self.enable_csrf_support: 

630 return 

631 

632 if not self.csrf_token or ( 

633 self.csrf_token_expiration and now > self.csrf_token_expiration 

634 ): 

635 token_request = self.build_request( 

636 "GET", f"/csrf-token?client={self.csrf_client_id}" 

637 ) 

638 

639 try: 

640 token_response = self.send(token_request) 

641 except PrefectHTTPStatusError as exc: 

642 old_server = exc.response.status_code == status.HTTP_404_NOT_FOUND 

643 unconfigured_server = ( 

644 exc.response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY 

645 and "CSRF protection is disabled." in exc.response.text 

646 ) 

647 

648 if old_server or unconfigured_server: 

649 # The token endpoint is either unavailable, suggesting an 

650 # older server, or CSRF protection is disabled. In either 

651 # case we should disable CSRF support. 

652 self.enable_csrf_support = False 

653 return 

654 

655 raise 

656 

657 token: CsrfToken = CsrfToken.model_validate(token_response.json()) 

658 self.csrf_token = token.token 

659 self.csrf_token_expiration = token.expiration 

660 

661 request.headers["Prefect-Csrf-Token"] = self.csrf_token 

662 request.headers["Prefect-Csrf-Client"] = str(self.csrf_client_id) 

663 

664 

665class ServerType(AutoEnum): 1a

666 EPHEMERAL = AutoEnum.auto() 1a

667 SERVER = AutoEnum.auto() 1a

668 CLOUD = AutoEnum.auto() 1a

669 UNCONFIGURED = AutoEnum.auto() 1a

670 

671 

672def determine_server_type() -> ServerType: 1a

673 """ 

674 Determine the server type based on the current settings. 

675 

676 Returns: 

677 - `ServerType.EPHEMERAL` if the ephemeral server is enabled 

678 - `ServerType.SERVER` if a API URL is configured and it is not a cloud URL 

679 - `ServerType.CLOUD` if an API URL is configured and it is a cloud URL 

680 - `ServerType.UNCONFIGURED` if no API URL is configured and ephemeral mode is 

681 not enabled 

682 """ 

683 api_url = PREFECT_API_URL.value() 

684 if api_url is None: 

685 if PREFECT_SERVER_ALLOW_EPHEMERAL_MODE.value(): 

686 return ServerType.EPHEMERAL 

687 else: 

688 return ServerType.UNCONFIGURED 

689 if api_url.startswith(PREFECT_CLOUD_API_URL.value()): 

690 return ServerType.CLOUD 

691 else: 

692 return ServerType.SERVER