Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/base.py: 18%
245 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import copy 1a
2import sys 1a
3import threading 1a
4import time 1a
5import uuid 1a
6from collections import defaultdict 1a
7from collections.abc import AsyncGenerator, Awaitable, MutableMapping 1a
8from contextlib import asynccontextmanager 1a
9from datetime import datetime, timezone 1a
10from logging import Logger 1a
11from typing import TYPE_CHECKING, Any, Callable, Optional, Protocol, runtime_checkable 1a
13import anyio 1a
14import httpx 1a
15from asgi_lifespan import LifespanManager 1a
16from httpx import HTTPStatusError, Request, Response 1a
17from typing_extensions import Self 1a
19import prefect 1a
20from prefect._internal.compatibility.starlette import status 1a
21from prefect.client import constants 1a
22from prefect.client.schemas.objects import CsrfToken 1a
23from prefect.exceptions import PrefectHTTPStatusError 1a
24from prefect.logging import get_logger 1a
25from prefect.settings import ( 1a
26 PREFECT_API_URL,
27 PREFECT_CLIENT_MAX_RETRIES,
28 PREFECT_CLIENT_RETRY_EXTRA_CODES,
29 PREFECT_CLIENT_RETRY_JITTER_FACTOR,
30 PREFECT_CLOUD_API_URL,
31 PREFECT_SERVER_ALLOW_EPHEMERAL_MODE,
32 get_current_settings,
33)
34from prefect.utilities.collections import AutoEnum 1a
35from prefect.utilities.math import bounded_poisson_interval, clamped_poisson_interval 1a
37# Datastores for lifespan management, keys should be a tuple of thread and app
38# identities.
39APP_LIFESPANS: dict[tuple[int, int], LifespanManager] = {} 1a
40APP_LIFESPANS_REF_COUNTS: dict[tuple[int, int], int] = {} 1a
41# Blocks concurrent access to the above dicts per thread. The index should be the thread
42# identity.
43APP_LIFESPANS_LOCKS: dict[int, anyio.Lock] = defaultdict(anyio.Lock) 1a
46logger: Logger = get_logger("client") 1a
49# Define ASGI application types for type checking
50Scope = MutableMapping[str, Any] 1a
51Message = MutableMapping[str, Any] 1a
53Receive = Callable[[], Awaitable[Message]] 1a
54Send = Callable[[Message], Awaitable[None]] 1a
57@runtime_checkable 1a
58class ASGIApp(Protocol): 1a
59 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: ... 59 ↛ exitline 59 didn't return from function '__call__' because 1a
62@asynccontextmanager 1a
63async def app_lifespan_context(app: ASGIApp) -> AsyncGenerator[None, None]: 1a
64 """
65 A context manager that calls startup/shutdown hooks for the given application.
67 Lifespan contexts are cached per application to avoid calling the lifespan hooks
68 more than once if the context is entered in nested code. A no-op context will be
69 returned if the context for the given application is already being managed.
71 This manager is robust to concurrent access within the event loop. For example,
72 if you have concurrent contexts for the same application, it is guaranteed that
73 startup hooks will be called before their context starts and shutdown hooks will
74 only be called after their context exits.
76 A reference count is used to support nested use of clients without running
77 lifespan hooks excessively. The first client context entered will create and enter
78 a lifespan context. Each subsequent client will increment a reference count but will
79 not create a new lifespan context. When each client context exits, the reference
80 count is decremented. When the last client context exits, the lifespan will be
81 closed.
83 In simple nested cases, the first client context will be the one to exit the
84 lifespan. However, if client contexts are entered concurrently they may not exit
85 in a consistent order. If the first client context was responsible for closing
86 the lifespan, it would have to wait until all other client contexts to exit to
87 avoid firing shutdown hooks while the application is in use. Waiting for the other
88 clients to exit can introduce deadlocks, so, instead, the first client will exit
89 without closing the lifespan context and reference counts will be used to ensure
90 the lifespan is closed once all of the clients are done.
91 """
92 # TODO: A deadlock has been observed during multithreaded use of clients while this
93 # lifespan context is being used. This has only been reproduced on Python 3.7
94 # and while we hope to discourage using multiple event loops in threads, this
95 # bug may emerge again.
96 # See https://github.com/PrefectHQ/orion/pull/1696
97 thread_id = threading.get_ident()
99 # The id of the application is used instead of the hash so each application instance
100 # is managed independently even if they share the same settings. We include the
101 # thread id since applications are managed separately per thread.
102 key = (thread_id, id(app))
104 # On exception, this will be populated with exception details
105 exc_info = (None, None, None)
107 # Get a lock unique to this thread since anyio locks are not threadsafe
108 lock = APP_LIFESPANS_LOCKS[thread_id]
110 async with lock:
111 if key in APP_LIFESPANS:
112 # The lifespan is already being managed, just increment the reference count
113 APP_LIFESPANS_REF_COUNTS[key] += 1
114 else:
115 # Create a new lifespan manager
116 APP_LIFESPANS[key] = context = LifespanManager(
117 app, startup_timeout=30, shutdown_timeout=30
118 )
119 APP_LIFESPANS_REF_COUNTS[key] = 1
121 # Ensure we enter the context before releasing the lock so startup hooks
122 # are complete before another client can be used
123 await context.__aenter__()
125 try:
126 yield
127 except BaseException:
128 exc_info = sys.exc_info()
129 raise
130 finally:
131 # If we do not shield against anyio cancellation, the lock will return
132 # immediately and the code in its context will not run, leaving the lifespan
133 # open
134 with anyio.CancelScope(shield=True):
135 async with lock:
136 # After the consumer exits the context, decrement the reference count
137 APP_LIFESPANS_REF_COUNTS[key] -= 1
139 # If this the last context to exit, close the lifespan
140 if APP_LIFESPANS_REF_COUNTS[key] <= 0:
141 APP_LIFESPANS_REF_COUNTS.pop(key)
142 context = APP_LIFESPANS.pop(key)
143 await context.__aexit__(*exc_info)
146class PrefectResponse(httpx.Response): 1a
147 """
148 A Prefect wrapper for the `httpx.Response` class.
150 Provides more informative error messages.
151 """
153 def raise_for_status(self) -> Response: 1a
154 """
155 Raise an exception if the response contains an HTTPStatusError.
157 The `PrefectHTTPStatusError` contains useful additional information that
158 is not contained in the `HTTPStatusError`.
159 """
160 try:
161 return super().raise_for_status()
162 except HTTPStatusError as exc:
163 raise PrefectHTTPStatusError.from_httpx_error(exc) from exc.__cause__
165 @classmethod 1a
166 def from_httpx_response(cls: type[Self], response: httpx.Response) -> Response: 1a
167 """
168 Create a `PrefectResponse` from an `httpx.Response`.
170 By changing the `__class__` attribute of the Response, we change the method
171 resolution order to look for methods defined in PrefectResponse, while leaving
172 everything else about the original Response instance intact.
173 """
174 new_response = copy.copy(response)
175 new_response.__class__ = cls
176 return new_response
179class PrefectHttpxAsyncClient(httpx.AsyncClient): 1a
180 """
181 A Prefect wrapper for the async httpx client with support for retry-after headers
182 for the provided status codes (typically 429, 502 and 503).
184 Additionally, this client will always call `raise_for_status` on responses.
186 For more details on rate limit headers, see:
187 [Configuring Cloudflare Rate Limiting](https://support.cloudflare.com/hc/en-us/articles/115001635128-Configuring-Rate-Limiting-from-UI)
188 """
190 def __init__( 1a
191 self,
192 *args: Any,
193 enable_csrf_support: bool = False,
194 raise_on_all_errors: bool = True,
195 **kwargs: Any,
196 ):
197 self.enable_csrf_support: bool = enable_csrf_support
198 self.csrf_token: Optional[str] = None
199 self.csrf_token_expiration: Optional[datetime] = None
200 self.csrf_client_id: uuid.UUID = uuid.uuid4()
201 self.raise_on_all_errors: bool = raise_on_all_errors
203 super().__init__(*args, **kwargs)
205 user_agent = (
206 f"prefect/{prefect.__version__} (API {constants.SERVER_API_VERSION})"
207 )
208 self.headers["User-Agent"] = user_agent
210 # Add custom headers from settings
211 custom_headers = get_current_settings().client.custom_headers
212 for header_name, header_value in custom_headers.items():
213 # Prevent overriding critical headers
214 if header_name.lower() in {
215 "user-agent",
216 "prefect-csrf-token",
217 "prefect-csrf-client",
218 }:
219 logger.warning(
220 f"Custom header '{header_name}' is ignored because it conflicts with "
221 f"a protected header managed by Prefect. Protected headers include: "
222 f"User-Agent, Prefect-Csrf-Token, Prefect-Csrf-Client"
223 )
224 else:
225 self.headers[header_name] = header_value
227 async def _send_with_retry( 1a
228 self,
229 request: Request,
230 send: Callable[[Request], Awaitable[Response]],
231 send_args: tuple[Any, ...],
232 send_kwargs: dict[str, Any],
233 retry_codes: set[int] = set(),
234 retry_exceptions: tuple[type[Exception], ...] = tuple(),
235 ):
236 """
237 Send a request and retry it if it fails.
239 Sends the provided request and retries it up to PREFECT_CLIENT_MAX_RETRIES times
240 if the request either raises an exception listed in `retry_exceptions` or
241 receives a response with a status code listed in `retry_codes`.
243 Retries are not counted against the limit if the response headers contains
244 a reserved value, indicating that the server is undergoing maintenance. These
245 requests will retry indefinitely until the header is no longer returned.
247 Retries will be delayed based on either the retry header (preferred) or
248 exponential backoff if a retry header is not provided.
249 """
250 try_count = 0
251 response = None
253 if TYPE_CHECKING:
254 # older httpx versions type method as str | bytes | Unknown
255 # but in reality it is always a string.
256 assert isinstance(request.method, str) # type: ignore
258 is_change_request = request.method.lower() in {"post", "put", "patch", "delete"}
260 if self.enable_csrf_support and is_change_request:
261 await self._add_csrf_headers(request=request)
263 while try_count <= PREFECT_CLIENT_MAX_RETRIES.value():
264 retry_seconds = None
265 exc_info = None
267 try:
268 response = await send(request, *send_args, **send_kwargs)
269 except retry_exceptions: # type: ignore
270 try_count += 1
271 if try_count > PREFECT_CLIENT_MAX_RETRIES.value():
272 raise
273 # Otherwise, we will ignore this error but capture the info for logging
274 exc_info = sys.exc_info()
275 else:
276 if response.headers.get("Prefect-Maintenance") != "true":
277 try_count += 1
279 # We got a response; check if it's a CSRF error, otherwise
280 # return immediately if it is not retryable
281 if (
282 response.status_code == status.HTTP_403_FORBIDDEN
283 and "Invalid CSRF token" in response.text
284 ):
285 # We got a CSRF error, clear the token and try again
286 self.csrf_token = None
287 await self._add_csrf_headers(request)
288 elif response.status_code not in retry_codes:
289 return response
291 if "Retry-After" in response.headers:
292 retry_seconds = float(response.headers["Retry-After"])
294 # Use an exponential back-off if not set in a header
295 if retry_seconds is None:
296 retry_seconds = 2**try_count
298 # Add jitter
299 jitter_factor = PREFECT_CLIENT_RETRY_JITTER_FACTOR.value()
300 if retry_seconds > 0 and jitter_factor > 0:
301 if response is not None and "Retry-After" in response.headers:
302 # Always wait for _at least_ retry seconds if requested by the API
303 retry_seconds = bounded_poisson_interval(
304 retry_seconds, retry_seconds * (1 + jitter_factor)
305 )
306 else:
307 # Otherwise, use a symmetrical jitter
308 retry_seconds = clamped_poisson_interval(
309 retry_seconds, jitter_factor
310 )
312 logger.debug(
313 (
314 "Encountered retryable exception during request. "
315 if exc_info
316 else (
317 "Received response with retryable status code"
318 f" {response.status_code if response else 'unknown'}. "
319 )
320 )
321 + f"Another attempt will be made in {retry_seconds}s. "
322 "This is attempt"
323 f" {try_count}/{PREFECT_CLIENT_MAX_RETRIES.value() + 1}.",
324 exc_info=exc_info,
325 )
326 await anyio.sleep(retry_seconds)
328 assert response is not None, (
329 "Retry handling ended without response or exception"
330 )
332 # We ran out of retries, return the failed response
333 return response
335 async def send(self, request: Request, *args: Any, **kwargs: Any) -> Response: 1a
336 """
337 Send a request with automatic retry behavior for the following status codes:
339 - 403 Forbidden, if the request failed due to CSRF protection
340 - 408 Request Timeout
341 - 429 CloudFlare-style rate limiting
342 - 502 Bad Gateway
343 - 503 Service unavailable
344 - Any additional status codes provided in `PREFECT_CLIENT_RETRY_EXTRA_CODES`
345 """
347 super_send = super().send
348 response = await self._send_with_retry(
349 request=request,
350 send=super_send,
351 send_args=args,
352 send_kwargs=kwargs,
353 retry_codes={
354 status.HTTP_429_TOO_MANY_REQUESTS,
355 status.HTTP_503_SERVICE_UNAVAILABLE,
356 status.HTTP_502_BAD_GATEWAY,
357 status.HTTP_408_REQUEST_TIMEOUT,
358 *PREFECT_CLIENT_RETRY_EXTRA_CODES.value(),
359 },
360 retry_exceptions=(
361 httpx.ReadTimeout,
362 httpx.PoolTimeout,
363 httpx.ConnectTimeout,
364 # `ConnectionResetError` when reading socket raises as a `ReadError`
365 httpx.ReadError,
366 # Sockets can be closed during writes resulting in a `WriteError`
367 httpx.WriteError,
368 # Uvicorn bug, see https://github.com/PrefectHQ/prefect/issues/7512
369 httpx.RemoteProtocolError,
370 # HTTP2 bug, see https://github.com/PrefectHQ/prefect/issues/7442
371 httpx.LocalProtocolError,
372 ),
373 )
375 # Convert to a Prefect response to add nicer errors messages
376 response = PrefectResponse.from_httpx_response(response)
378 if self.raise_on_all_errors:
379 response.raise_for_status()
381 return response
383 async def _add_csrf_headers(self, request: Request): 1a
384 now = datetime.now(timezone.utc)
386 if not self.enable_csrf_support:
387 return
389 if not self.csrf_token or (
390 self.csrf_token_expiration and now > self.csrf_token_expiration
391 ):
392 token_request = self.build_request(
393 "GET", f"/csrf-token?client={self.csrf_client_id}"
394 )
396 try:
397 token_response = await self.send(token_request)
398 except PrefectHTTPStatusError as exc:
399 old_server = exc.response.status_code == status.HTTP_404_NOT_FOUND
400 unconfigured_server = (
401 exc.response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
402 and "CSRF protection is disabled." in exc.response.text
403 )
405 if old_server or unconfigured_server:
406 # The token endpoint is either unavailable, suggesting an
407 # older server, or CSRF protection is disabled. In either
408 # case we should disable CSRF support.
409 self.enable_csrf_support = False
410 return
412 raise
414 token: CsrfToken = CsrfToken.model_validate(token_response.json())
415 self.csrf_token = token.token
416 self.csrf_token_expiration = token.expiration
418 request.headers["Prefect-Csrf-Token"] = self.csrf_token
419 request.headers["Prefect-Csrf-Client"] = str(self.csrf_client_id)
422class PrefectHttpxSyncClient(httpx.Client): 1a
423 """
424 A Prefect wrapper for the async httpx client with support for retry-after headers
425 for the provided status codes (typically 429, 502 and 503).
427 Additionally, this client will always call `raise_for_status` on responses.
429 For more details on rate limit headers, see:
430 [Configuring Cloudflare Rate Limiting](https://support.cloudflare.com/hc/en-us/articles/115001635128-Configuring-Rate-Limiting-from-UI)
431 """
433 def __init__( 1a
434 self,
435 *args: Any,
436 enable_csrf_support: bool = False,
437 raise_on_all_errors: bool = True,
438 **kwargs: Any,
439 ):
440 self.enable_csrf_support: bool = enable_csrf_support
441 self.csrf_token: Optional[str] = None
442 self.csrf_token_expiration: Optional[datetime] = None
443 self.csrf_client_id: uuid.UUID = uuid.uuid4()
444 self.raise_on_all_errors: bool = raise_on_all_errors
446 super().__init__(*args, **kwargs)
448 user_agent = (
449 f"prefect/{prefect.__version__} (API {constants.SERVER_API_VERSION})"
450 )
451 self.headers["User-Agent"] = user_agent
453 # Add custom headers from settings
454 custom_headers = get_current_settings().client.custom_headers
455 for header_name, header_value in custom_headers.items():
456 # Prevent overriding critical headers
457 if header_name.lower() in {
458 "user-agent",
459 "prefect-csrf-token",
460 "prefect-csrf-client",
461 }:
462 logger.warning(
463 f"Custom header '{header_name}' is ignored because it conflicts with "
464 f"a protected header managed by Prefect. Protected headers include: "
465 f"User-Agent, Prefect-Csrf-Token, Prefect-Csrf-Client"
466 )
467 else:
468 self.headers[header_name] = header_value
470 def _send_with_retry( 1a
471 self,
472 request: Request,
473 send: Callable[[Request], Response],
474 send_args: tuple[Any, ...],
475 send_kwargs: dict[str, Any],
476 retry_codes: set[int] = set(),
477 retry_exceptions: tuple[type[Exception], ...] = tuple(),
478 ):
479 """
480 Send a request and retry it if it fails.
482 Sends the provided request and retries it up to PREFECT_CLIENT_MAX_RETRIES times
483 if the request either raises an exception listed in `retry_exceptions` or
484 receives a response with a status code listed in `retry_codes`.
486 Retries are not counted against the limit if the response headers contains
487 a reserved value, indicating that the server is undergoing maintenance. These
488 requests will retry indefinitely until the header is no longer returned.
490 Retries will be delayed based on either the retry header (preferred) or
491 exponential backoff if a retry header is not provided.
492 """
493 try_count = 0
494 response = None
496 if TYPE_CHECKING:
497 # older httpx versions type method as str | bytes | Unknown
498 # but in reality it is always a string.
499 assert isinstance(request.method, str) # type: ignore
501 is_change_request = request.method.lower() in {"post", "put", "patch", "delete"}
503 if self.enable_csrf_support and is_change_request:
504 self._add_csrf_headers(request=request)
506 while try_count <= PREFECT_CLIENT_MAX_RETRIES.value():
507 retry_seconds = None
508 exc_info = None
510 try:
511 response = send(request, *send_args, **send_kwargs)
512 except retry_exceptions: # type: ignore
513 try_count += 1
514 if try_count > PREFECT_CLIENT_MAX_RETRIES.value():
515 raise
516 # Otherwise, we will ignore this error but capture the info for logging
517 exc_info = sys.exc_info()
518 else:
519 if response.headers.get("Prefect-Maintenance") != "true":
520 try_count += 1
522 # We got a response; check if it's a CSRF error, otherwise
523 # return immediately if it is not retryable
524 if (
525 response.status_code == status.HTTP_403_FORBIDDEN
526 and "Invalid CSRF token" in response.text
527 ):
528 # We got a CSRF error, clear the token and try again
529 self.csrf_token = None
530 self._add_csrf_headers(request)
531 elif response.status_code not in retry_codes:
532 return response
534 if "Retry-After" in response.headers:
535 retry_seconds = float(response.headers["Retry-After"])
537 # Use an exponential back-off if not set in a header
538 if retry_seconds is None:
539 retry_seconds = 2**try_count
541 # Add jitter
542 jitter_factor = PREFECT_CLIENT_RETRY_JITTER_FACTOR.value()
543 if retry_seconds > 0 and jitter_factor > 0:
544 if response is not None and "Retry-After" in response.headers:
545 # Always wait for _at least_ retry seconds if requested by the API
546 retry_seconds = bounded_poisson_interval(
547 retry_seconds, retry_seconds * (1 + jitter_factor)
548 )
549 else:
550 # Otherwise, use a symmetrical jitter
551 retry_seconds = clamped_poisson_interval(
552 retry_seconds, jitter_factor
553 )
555 logger.debug(
556 (
557 "Encountered retryable exception during request. "
558 if exc_info
559 else (
560 "Received response with retryable status code"
561 f" {response.status_code if response else 'unknown'}. "
562 )
563 )
564 + f"Another attempt will be made in {retry_seconds}s. "
565 "This is attempt"
566 f" {try_count}/{PREFECT_CLIENT_MAX_RETRIES.value() + 1}.",
567 exc_info=exc_info,
568 )
569 time.sleep(retry_seconds)
571 assert response is not None, (
572 "Retry handling ended without response or exception"
573 )
575 # We ran out of retries, return the failed response
576 return response
578 def send(self, request: Request, *args: Any, **kwargs: Any) -> Response: 1a
579 """
580 Send a request with automatic retry behavior for the following status codes:
582 - 403 Forbidden, if the request failed due to CSRF protection
583 - 408 Request Timeout
584 - 429 CloudFlare-style rate limiting
585 - 502 Bad Gateway
586 - 503 Service unavailable
587 - Any additional status codes provided in `PREFECT_CLIENT_RETRY_EXTRA_CODES`
588 """
590 super_send = super().send
591 response = self._send_with_retry(
592 request=request,
593 send=super_send,
594 send_args=args,
595 send_kwargs=kwargs,
596 retry_codes={
597 status.HTTP_429_TOO_MANY_REQUESTS,
598 status.HTTP_503_SERVICE_UNAVAILABLE,
599 status.HTTP_502_BAD_GATEWAY,
600 status.HTTP_408_REQUEST_TIMEOUT,
601 *PREFECT_CLIENT_RETRY_EXTRA_CODES.value(),
602 },
603 retry_exceptions=(
604 httpx.ReadTimeout,
605 httpx.PoolTimeout,
606 httpx.ConnectTimeout,
607 # `ConnectionResetError` when reading socket raises as a `ReadError`
608 httpx.ReadError,
609 # Sockets can be closed during writes resulting in a `WriteError`
610 httpx.WriteError,
611 # Uvicorn bug, see https://github.com/PrefectHQ/prefect/issues/7512
612 httpx.RemoteProtocolError,
613 # HTTP2 bug, see https://github.com/PrefectHQ/prefect/issues/7442
614 httpx.LocalProtocolError,
615 ),
616 )
618 # Convert to a Prefect response to add nicer errors messages
619 response = PrefectResponse.from_httpx_response(response)
621 if self.raise_on_all_errors:
622 response.raise_for_status()
624 return response
626 def _add_csrf_headers(self, request: Request): 1a
627 now = datetime.now(timezone.utc)
629 if not self.enable_csrf_support:
630 return
632 if not self.csrf_token or (
633 self.csrf_token_expiration and now > self.csrf_token_expiration
634 ):
635 token_request = self.build_request(
636 "GET", f"/csrf-token?client={self.csrf_client_id}"
637 )
639 try:
640 token_response = self.send(token_request)
641 except PrefectHTTPStatusError as exc:
642 old_server = exc.response.status_code == status.HTTP_404_NOT_FOUND
643 unconfigured_server = (
644 exc.response.status_code == status.HTTP_422_UNPROCESSABLE_ENTITY
645 and "CSRF protection is disabled." in exc.response.text
646 )
648 if old_server or unconfigured_server:
649 # The token endpoint is either unavailable, suggesting an
650 # older server, or CSRF protection is disabled. In either
651 # case we should disable CSRF support.
652 self.enable_csrf_support = False
653 return
655 raise
657 token: CsrfToken = CsrfToken.model_validate(token_response.json())
658 self.csrf_token = token.token
659 self.csrf_token_expiration = token.expiration
661 request.headers["Prefect-Csrf-Token"] = self.csrf_token
662 request.headers["Prefect-Csrf-Client"] = str(self.csrf_client_id)
665class ServerType(AutoEnum): 1a
666 EPHEMERAL = AutoEnum.auto() 1a
667 SERVER = AutoEnum.auto() 1a
668 CLOUD = AutoEnum.auto() 1a
669 UNCONFIGURED = AutoEnum.auto() 1a
672def determine_server_type() -> ServerType: 1a
673 """
674 Determine the server type based on the current settings.
676 Returns:
677 - `ServerType.EPHEMERAL` if the ephemeral server is enabled
678 - `ServerType.SERVER` if a API URL is configured and it is not a cloud URL
679 - `ServerType.CLOUD` if an API URL is configured and it is a cloud URL
680 - `ServerType.UNCONFIGURED` if no API URL is configured and ephemeral mode is
681 not enabled
682 """
683 api_url = PREFECT_API_URL.value()
684 if api_url is None:
685 if PREFECT_SERVER_ALLOW_EPHEMERAL_MODE.value():
686 return ServerType.EPHEMERAL
687 else:
688 return ServerType.UNCONFIGURED
689 if api_url.startswith(PREFECT_CLOUD_API_URL.value()):
690 return ServerType.CLOUD
691 else:
692 return ServerType.SERVER