Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/__init__.py: 16%
612 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
2import asyncio 1a
3import base64 1a
4import datetime 1a
5import ssl 1a
6from collections.abc import Iterable 1a
7from contextlib import AsyncExitStack 1a
8from logging import Logger 1a
9from typing import TYPE_CHECKING, Any, Literal, NoReturn, Optional, Union, overload 1a
10from uuid import UUID 1a
12import certifi 1a
13import httpcore 1a
14import httpx 1a
16import pydantic 1a
17from asgi_lifespan import LifespanManager 1a
18from packaging import version 1a
19from starlette import status 1a
20from typing_extensions import ParamSpec, Self, TypeVar 1a
22from prefect.client.orchestration._artifacts.client import ( 1a
23 ArtifactClient,
24 ArtifactAsyncClient,
25 ArtifactCollectionClient,
26 ArtifactCollectionAsyncClient,
27)
29from prefect.client.orchestration._concurrency_limits.client import ( 1a
30 ConcurrencyLimitAsyncClient,
31 ConcurrencyLimitClient,
32)
34from prefect.client.orchestration._logs.client import ( 1a
35 LogClient,
36 LogAsyncClient,
37)
38from prefect.client.orchestration._variables.client import ( 1a
39 VariableClient,
40 VariableAsyncClient,
41)
43from prefect.client.orchestration._deployments.client import ( 1a
44 DeploymentClient,
45 DeploymentAsyncClient,
46)
47from prefect.client.orchestration._automations.client import ( 1a
48 AutomationClient,
49 AutomationAsyncClient,
50)
52from prefect.client.orchestration._work_pools.client import ( 1a
53 WorkPoolClient,
54 WorkPoolAsyncClient,
55)
57from prefect._experimental.sla.client import SlaClient, SlaAsyncClient 1a
59from prefect.client.orchestration._flows.client import ( 1a
60 FlowClient,
61 FlowAsyncClient,
62)
63from prefect.client.orchestration._flow_runs.client import ( 1a
64 FlowRunClient,
65 FlowRunAsyncClient,
66)
68from prefect.client.orchestration._blocks_documents.client import ( 1a
69 BlocksDocumentClient,
70 BlocksDocumentAsyncClient,
71)
73from prefect.client.orchestration._blocks_schemas.client import ( 1a
74 BlocksSchemaClient,
75 BlocksSchemaAsyncClient,
76)
78from prefect.client.orchestration._blocks_types.client import ( 1a
79 BlocksTypeClient,
80 BlocksTypeAsyncClient,
81)
83from prefect.client.orchestration._events.client import ( 1a
84 EventClient,
85 EventAsyncClient,
86)
88import prefect 1a
89import prefect.exceptions 1a
90from prefect.logging.loggers import get_run_logger 1a
91import prefect.states 1a
92from prefect.client.constants import SERVER_API_VERSION 1a
93from prefect.client.schemas import FlowRun, OrchestrationResult, TaskRun 1a
94from prefect.client.schemas.actions import ( 1a
95 TaskRunCreate,
96 TaskRunUpdate,
97 WorkQueueCreate,
98 WorkQueueUpdate,
99)
100from prefect.client.schemas.filters import ( 1a
101 DeploymentFilter,
102 FlowFilter,
103 FlowRunFilter,
104 TaskRunFilter,
105 WorkQueueFilter,
106 WorkQueueFilterName,
107)
108from prefect.client.schemas.objects import ( 1a
109 TaskRunResult,
110 FlowRunResult,
111 Parameter,
112 Constant,
113 TaskRunPolicy,
114 WorkQueue,
115 WorkQueueStatusDetail,
116)
118from prefect.client.schemas.sorting import ( 1a
119 TaskRunSort,
120)
121from prefect.logging import get_logger 1a
122from prefect.settings import ( 1a
123 PREFECT_API_AUTH_STRING,
124 PREFECT_API_DATABASE_CONNECTION_URL,
125 PREFECT_API_ENABLE_HTTP2,
126 PREFECT_API_KEY,
127 PREFECT_API_REQUEST_TIMEOUT,
128 PREFECT_API_SSL_CERT_FILE,
129 PREFECT_API_TLS_INSECURE_SKIP_VERIFY,
130 PREFECT_API_URL,
131 PREFECT_CLIENT_CSRF_SUPPORT_ENABLED,
132 PREFECT_CLOUD_API_URL,
133 PREFECT_SERVER_ALLOW_EPHEMERAL_MODE,
134 PREFECT_TESTING_UNIT_TEST_MODE,
135)
136from prefect.types._datetime import now 1a
138if TYPE_CHECKING: 138 ↛ 139line 138 didn't jump to line 139 because the condition on line 138 was never true1a
139 from prefect.tasks import Task as TaskObject
141from prefect.client.base import ( 1a
142 ASGIApp,
143 PrefectHttpxAsyncClient,
144 PrefectHttpxSyncClient,
145 ServerType,
146 app_lifespan_context,
147)
149P = ParamSpec("P") 1a
150R = TypeVar("R", infer_variance=True) 1a
151T = TypeVar("T") 1a
153# Cache for TypeAdapter instances to avoid repeated instantiation
154_TYPE_ADAPTER_CACHE: dict[type, pydantic.TypeAdapter[Any]] = {} 1a
157def _get_type_adapter(type_: type) -> pydantic.TypeAdapter[Any]: 1a
158 """Get or create a cached TypeAdapter for the given type."""
159 if type_ not in _TYPE_ADAPTER_CACHE:
160 _TYPE_ADAPTER_CACHE[type_] = pydantic.TypeAdapter(type_)
161 return _TYPE_ADAPTER_CACHE[type_]
164@overload 1a
165def get_client( 165 ↛ exitline 165 didn't return from function 'get_client' because 1a
166 httpx_settings: Optional[dict[str, Any]],
167 sync_client: Literal[False],
168) -> "PrefectClient": ...
171@overload 1a
172def get_client(*, httpx_settings: Optional[dict[str, Any]]) -> "PrefectClient": ... 172 ↛ exitline 172 didn't return from function 'get_client' because 1a
175@overload 1a
176def get_client(*, sync_client: Literal[False] = False) -> "PrefectClient": ... 176 ↛ exitline 176 didn't return from function 'get_client' because 1a
179@overload 1a
180def get_client( 180 ↛ exitline 180 didn't return from function 'get_client' because 1a
181 httpx_settings: Optional[dict[str, Any]], sync_client: Literal[True]
182) -> "SyncPrefectClient": ...
185@overload 1a
186def get_client(*, sync_client: Literal[True]) -> "SyncPrefectClient": ... 186 ↛ exitline 186 didn't return from function 'get_client' because 1a
189def get_client( 1a
190 httpx_settings: Optional[dict[str, Any]] = None, sync_client: bool = False
191) -> Union["SyncPrefectClient", "PrefectClient"]:
192 """
193 Retrieve a HTTP client for communicating with the Prefect REST API.
195 The client must be context managed; for example:
197 ```python
198 async with get_client() as client:
199 await client.hello()
200 ```
202 To return a synchronous client, pass sync_client=True:
204 ```python
205 with get_client(sync_client=True) as client:
206 client.hello()
207 ```
208 """
209 import prefect.context
211 # try to load clients from a client context, if possible
212 # only load clients that match the provided config / loop
213 try:
214 loop = asyncio.get_running_loop()
215 except RuntimeError:
216 loop = None
218 if sync_client:
219 if client_ctx := prefect.context.SyncClientContext.get():
220 if (
221 client_ctx.client
222 and getattr(client_ctx, "_httpx_settings", None) == httpx_settings
223 ):
224 return client_ctx.client
225 else:
226 if client_ctx := prefect.context.AsyncClientContext.get():
227 if (
228 client_ctx.client
229 and getattr(client_ctx, "_httpx_settings", None) == httpx_settings
230 and loop in (getattr(client_ctx.client, "_loop", None), None)
231 ):
232 return client_ctx.client
234 api: str = PREFECT_API_URL.value()
235 server_type = None
237 if not api and PREFECT_SERVER_ALLOW_EPHEMERAL_MODE:
238 # create an ephemeral API if none was provided
239 from prefect.server.api.server import SubprocessASGIServer
241 server = SubprocessASGIServer()
242 server.start()
243 assert server.server_process is not None, "Server process did not start"
245 api = server.api_url
246 server_type = ServerType.EPHEMERAL
247 elif not api and not PREFECT_SERVER_ALLOW_EPHEMERAL_MODE:
248 raise ValueError(
249 "No Prefect API URL provided. Please set PREFECT_API_URL to the address of a running Prefect server."
250 )
252 if sync_client:
253 return SyncPrefectClient(
254 api,
255 auth_string=PREFECT_API_AUTH_STRING.value(),
256 api_key=PREFECT_API_KEY.value(),
257 httpx_settings=httpx_settings,
258 server_type=server_type,
259 )
260 else:
261 return PrefectClient(
262 api,
263 auth_string=PREFECT_API_AUTH_STRING.value(),
264 api_key=PREFECT_API_KEY.value(),
265 httpx_settings=httpx_settings,
266 server_type=server_type,
267 )
270class PrefectClient( 1a
271 ArtifactAsyncClient,
272 ArtifactCollectionAsyncClient,
273 LogAsyncClient,
274 VariableAsyncClient,
275 ConcurrencyLimitAsyncClient,
276 DeploymentAsyncClient,
277 AutomationAsyncClient,
278 SlaAsyncClient,
279 FlowRunAsyncClient,
280 FlowAsyncClient,
281 BlocksDocumentAsyncClient,
282 BlocksSchemaAsyncClient,
283 BlocksTypeAsyncClient,
284 WorkPoolAsyncClient,
285 EventAsyncClient,
286):
287 """
288 An asynchronous client for interacting with the [Prefect REST API](https://docs.prefect.io/v3/api-ref/rest-api/).
290 Args:
291 api: the REST API URL or FastAPI application to connect to
292 api_key: An optional API key for authentication.
293 api_version: The API version this client is compatible with.
294 httpx_settings: An optional dictionary of settings to pass to the underlying
295 `httpx.AsyncClient`
297 Examples:
299 Say hello to a Prefect REST API
301 ```python
302 async with get_client() as client:
303 response = await client.hello()
305 print(response.json())
306 👋
307 ```
308 """
310 def __init__( 1a
311 self,
312 api: Union[str, ASGIApp],
313 *,
314 auth_string: Optional[str] = None,
315 api_key: Optional[str] = None,
316 api_version: Optional[str] = None,
317 httpx_settings: Optional[dict[str, Any]] = None,
318 server_type: Optional[ServerType] = None,
319 ) -> None:
320 httpx_settings = httpx_settings.copy() if httpx_settings else {}
321 httpx_settings.setdefault("headers", {})
323 tls_verify = httpx_settings.get("verify")
324 if not tls_verify or not isinstance(tls_verify, ssl.SSLContext):
325 if PREFECT_API_TLS_INSECURE_SKIP_VERIFY:
326 # Create an unverified context for insecure connections
327 ctx = ssl.create_default_context()
328 ctx.check_hostname = False
329 ctx.verify_mode = ssl.CERT_NONE
330 httpx_settings.setdefault("verify", ctx)
331 else:
332 cert_file = PREFECT_API_SSL_CERT_FILE.value()
333 if not cert_file:
334 cert_file = certifi.where()
335 # Create a verified context with the certificate file
336 ctx = ssl.create_default_context(cafile=cert_file)
337 httpx_settings.setdefault("verify", ctx)
339 if api_version is None:
340 api_version = SERVER_API_VERSION
341 httpx_settings["headers"].setdefault("X-PREFECT-API-VERSION", api_version)
342 # Prioritize auth_string if provided, otherwise use api_key
343 if auth_string:
344 token = base64.b64encode(auth_string.encode("utf-8")).decode("utf-8")
345 httpx_settings["headers"]["Authorization"] = (
346 f"Basic {token}" # Overwrite if exists
347 )
348 elif api_key:
349 httpx_settings["headers"]["Authorization"] = (
350 f"Bearer {api_key}" # Set if auth_string is not present
351 )
353 # Context management
354 self._context_stack: int = 0
355 self._exit_stack = AsyncExitStack()
356 self._ephemeral_app: Optional[ASGIApp] = None
357 self.manage_lifespan = True
358 self.server_type: ServerType
360 # Only set if this client started the lifespan of the application
361 self._ephemeral_lifespan: Optional[LifespanManager] = None
363 self._closed = False
364 self._started = False
366 # Connect to an external application
367 if isinstance(api, str):
368 if httpx_settings.get("app"):
369 raise ValueError(
370 "Invalid httpx settings: `app` cannot be set when providing an "
371 "api url. `app` is only for use with ephemeral instances. Provide "
372 "it as the `api` parameter instead."
373 )
374 httpx_settings.setdefault("base_url", api)
376 # See https://www.python-httpx.org/advanced/#pool-limit-configuration
377 httpx_settings.setdefault(
378 "limits",
379 httpx.Limits(
380 # We see instability when allowing the client to open many connections at once.
381 # Limiting concurrency results in more stable performance.
382 max_connections=16,
383 max_keepalive_connections=8,
384 # The Prefect Cloud LB will keep connections alive for 30s.
385 # Only allow the client to keep connections alive for 25s.
386 keepalive_expiry=25,
387 ),
388 )
390 # See https://www.python-httpx.org/http2/
391 # Enabling HTTP/2 support on the client does not necessarily mean that your requests
392 # and responses will be transported over HTTP/2, since both the client and the server
393 # need to support HTTP/2. If you connect to a server that only supports HTTP/1.1 the
394 # client will use a standard HTTP/1.1 connection instead.
395 httpx_settings.setdefault("http2", PREFECT_API_ENABLE_HTTP2.value())
397 if server_type:
398 self.server_type = server_type
399 else:
400 self.server_type = (
401 ServerType.CLOUD
402 if api.startswith(PREFECT_CLOUD_API_URL.value())
403 else ServerType.SERVER
404 )
406 # Connect to an in-process application
407 else:
408 self._ephemeral_app = api
409 self.server_type = ServerType.EPHEMERAL
411 # When using an ephemeral server, server-side exceptions can be raised
412 # client-side breaking all of our response error code handling. To work
413 # around this, we create an ASGI transport with application exceptions
414 # disabled instead of using the application directly.
415 # refs:
416 # - https://github.com/PrefectHQ/prefect/pull/9637
417 # - https://github.com/encode/starlette/blob/d3a11205ed35f8e5a58a711db0ff59c86fa7bb31/starlette/middleware/errors.py#L184
418 # - https://github.com/tiangolo/fastapi/blob/8cc967a7605d3883bd04ceb5d25cc94ae079612f/fastapi/applications.py#L163-L164
419 httpx_settings.setdefault(
420 "transport",
421 httpx.ASGITransport(
422 app=self._ephemeral_app, raise_app_exceptions=False
423 ),
424 )
425 httpx_settings.setdefault("base_url", "http://ephemeral-prefect/api")
427 # See https://www.python-httpx.org/advanced/#timeout-configuration
428 httpx_settings.setdefault(
429 "timeout",
430 httpx.Timeout(
431 connect=PREFECT_API_REQUEST_TIMEOUT.value(),
432 read=PREFECT_API_REQUEST_TIMEOUT.value(),
433 write=PREFECT_API_REQUEST_TIMEOUT.value(),
434 pool=PREFECT_API_REQUEST_TIMEOUT.value(),
435 ),
436 )
438 if not PREFECT_TESTING_UNIT_TEST_MODE:
439 httpx_settings.setdefault("follow_redirects", True)
441 enable_csrf_support = (
442 self.server_type != ServerType.CLOUD
443 and PREFECT_CLIENT_CSRF_SUPPORT_ENABLED.value()
444 )
446 self._client = PrefectHttpxAsyncClient(
447 **httpx_settings, enable_csrf_support=enable_csrf_support
448 )
449 self._loop = None
451 # See https://www.python-httpx.org/advanced/#custom-transports
452 #
453 # If we're using an HTTP/S client (not the ephemeral client), adjust the
454 # transport to add retries _after_ it is instantiated. If we alter the transport
455 # before instantiation, the transport will not be aware of proxies unless we
456 # reproduce all of the logic to make it so.
457 #
458 # Only alter the transport to set our default of 3 retries, don't modify any
459 # transport a user may have provided via httpx_settings.
460 #
461 # Making liberal use of getattr and isinstance checks here to avoid any
462 # surprises if the internals of httpx or httpcore change on us
463 if isinstance(api, str) and not httpx_settings.get("transport"):
464 transport_for_url = getattr(self._client, "_transport_for_url", None)
465 if callable(transport_for_url):
466 server_transport = transport_for_url(httpx.URL(api))
467 if isinstance(server_transport, httpx.AsyncHTTPTransport):
468 pool = getattr(server_transport, "_pool", None)
469 if isinstance(pool, httpcore.AsyncConnectionPool):
470 setattr(pool, "_retries", 3)
472 self.logger: Logger = get_logger("client")
474 @property 1a
475 def api_url(self) -> httpx.URL: 1a
476 """
477 Get the base URL for the API.
478 """
479 return self._client.base_url
481 # API methods ----------------------------------------------------------------------
483 async def api_healthcheck(self) -> Optional[Exception]: 1a
484 """
485 Attempts to connect to the API and returns the encountered exception if not
486 successful.
488 If successful, returns `None`.
489 """
490 try:
491 await self._client.get("/health")
492 return None
493 except Exception as exc:
494 return exc
496 async def hello(self) -> httpx.Response: 1a
497 """
498 Send a GET request to /hello for testing purposes.
499 """
500 return await self._client.get("/hello")
502 async def create_work_queue( 1a
503 self,
504 name: str,
505 description: Optional[str] = None,
506 is_paused: Optional[bool] = None,
507 concurrency_limit: Optional[int] = None,
508 priority: Optional[int] = None,
509 work_pool_name: Optional[str] = None,
510 ) -> WorkQueue:
511 """
512 Create a work queue.
514 Args:
515 name: a unique name for the work queue
516 description: An optional description for the work queue.
517 is_paused: Whether or not the work queue is paused.
518 concurrency_limit: An optional concurrency limit for the work queue.
519 priority: The queue's priority. Lower values are higher priority (1 is the highest).
520 work_pool_name: The name of the work pool to use for this queue.
522 Raises:
523 prefect.exceptions.ObjectAlreadyExists: If request returns 409
524 httpx.RequestError: If request fails
526 Returns:
527 The created work queue
528 """
529 create_model = WorkQueueCreate(name=name, filter=None)
530 if description is not None:
531 create_model.description = description
532 if is_paused is not None:
533 create_model.is_paused = is_paused
534 if concurrency_limit is not None:
535 create_model.concurrency_limit = concurrency_limit
536 if priority is not None:
537 create_model.priority = priority
539 data = create_model.model_dump(mode="json")
540 try:
541 if work_pool_name is not None:
542 response = await self._client.post(
543 f"/work_pools/{work_pool_name}/queues", json=data
544 )
545 else:
546 response = await self._client.post("/work_queues/", json=data)
547 except httpx.HTTPStatusError as e:
548 if e.response.status_code == status.HTTP_409_CONFLICT:
549 raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
550 elif e.response.status_code == status.HTTP_404_NOT_FOUND:
551 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
552 else:
553 raise
554 return WorkQueue.model_validate(response.json())
556 async def read_work_queue_by_name( 1a
557 self,
558 name: str,
559 work_pool_name: Optional[str] = None,
560 ) -> WorkQueue:
561 """
562 Read a work queue by name.
564 Args:
565 name (str): a unique name for the work queue
566 work_pool_name (str, optional): the name of the work pool
567 the queue belongs to.
569 Raises:
570 prefect.exceptions.ObjectNotFound: if no work queue is found
571 httpx.HTTPStatusError: other status errors
573 Returns:
574 WorkQueue: a work queue API object
575 """
576 try:
577 if work_pool_name is not None:
578 response = await self._client.get(
579 f"/work_pools/{work_pool_name}/queues/{name}"
580 )
581 else:
582 response = await self._client.get(f"/work_queues/name/{name}")
583 except httpx.HTTPStatusError as e:
584 if e.response.status_code == status.HTTP_404_NOT_FOUND:
585 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
586 else:
587 raise
589 return WorkQueue.model_validate(response.json())
591 async def update_work_queue(self, id: UUID, **kwargs: Any) -> None: 1a
592 """
593 Update properties of a work queue.
595 Args:
596 id: the ID of the work queue to update
597 **kwargs: the fields to update
599 Raises:
600 ValueError: if no kwargs are provided
601 prefect.exceptions.ObjectNotFound: if request returns 404
602 httpx.RequestError: if the request fails
604 """
605 if not kwargs:
606 raise ValueError("No fields provided to update.")
608 data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True)
609 try:
610 await self._client.patch(f"/work_queues/{id}", json=data)
611 except httpx.HTTPStatusError as e:
612 if e.response.status_code == status.HTTP_404_NOT_FOUND:
613 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
614 else:
615 raise
617 async def get_runs_in_work_queue( 1a
618 self,
619 id: UUID,
620 limit: int = 10,
621 scheduled_before: Optional[datetime.datetime] = None,
622 ) -> list[FlowRun]:
623 """
624 Read flow runs off a work queue.
626 Args:
627 id: the id of the work queue to read from
628 limit: a limit on the number of runs to return
629 scheduled_before: a timestamp; only runs scheduled before this time will be returned.
630 Defaults to now.
632 Raises:
633 prefect.exceptions.ObjectNotFound: If request returns 404
634 httpx.RequestError: If request fails
636 Returns:
637 List[FlowRun]: a list of FlowRun objects read from the queue
638 """
639 if scheduled_before is None:
640 scheduled_before = now("UTC")
642 try:
643 response = await self._client.post(
644 f"/work_queues/{id}/get_runs",
645 json={
646 "limit": limit,
647 "scheduled_before": scheduled_before.isoformat(),
648 },
649 )
650 except httpx.HTTPStatusError as e:
651 if e.response.status_code == status.HTTP_404_NOT_FOUND:
652 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
653 else:
654 raise
655 return _get_type_adapter(list[FlowRun]).validate_python(response.json())
657 async def read_work_queue( 1a
658 self,
659 id: UUID,
660 ) -> WorkQueue:
661 """
662 Read a work queue.
664 Args:
665 id: the id of the work queue to load
667 Raises:
668 prefect.exceptions.ObjectNotFound: If request returns 404
669 httpx.RequestError: If request fails
671 Returns:
672 WorkQueue: an instantiated WorkQueue object
673 """
674 try:
675 response = await self._client.get(f"/work_queues/{id}")
676 except httpx.HTTPStatusError as e:
677 if e.response.status_code == status.HTTP_404_NOT_FOUND:
678 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
679 else:
680 raise
681 return WorkQueue.model_validate(response.json())
683 async def read_work_queue_status( 1a
684 self,
685 id: UUID,
686 ) -> WorkQueueStatusDetail:
687 """
688 Read a work queue status.
690 Args:
691 id: the id of the work queue to load
693 Raises:
694 prefect.exceptions.ObjectNotFound: If request returns 404
695 httpx.RequestError: If request fails
697 Returns:
698 WorkQueueStatus: an instantiated WorkQueueStatus object
699 """
700 try:
701 response = await self._client.get(f"/work_queues/{id}/status")
702 except httpx.HTTPStatusError as e:
703 if e.response.status_code == status.HTTP_404_NOT_FOUND:
704 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
705 else:
706 raise
707 return WorkQueueStatusDetail.model_validate(response.json())
709 async def match_work_queues( 1a
710 self,
711 prefixes: list[str],
712 work_pool_name: Optional[str] = None,
713 ) -> list[WorkQueue]:
714 """
715 Query the Prefect API for work queues with names with a specific prefix.
717 Args:
718 prefixes: a list of strings used to match work queue name prefixes
719 work_pool_name: an optional work pool name to scope the query to
721 Returns:
722 a list of WorkQueue model representations
723 of the work queues
724 """
725 page_length = 100
726 current_page = 0
727 work_queues: list[WorkQueue] = []
729 while True:
730 new_queues = await self.read_work_queues(
731 work_pool_name=work_pool_name,
732 offset=current_page * page_length,
733 limit=page_length,
734 work_queue_filter=WorkQueueFilter(
735 name=WorkQueueFilterName(startswith_=prefixes)
736 ),
737 )
738 if not new_queues:
739 break
740 work_queues += new_queues
741 current_page += 1
743 return work_queues
745 async def delete_work_queue_by_id( 1a
746 self,
747 id: UUID,
748 ) -> None:
749 """
750 Delete a work queue by its ID.
752 Args:
753 id: the id of the work queue to delete
755 Raises:
756 prefect.exceptions.ObjectNotFound: If request returns 404
757 httpx.RequestError: If requests fails
758 """
759 try:
760 await self._client.delete(
761 f"/work_queues/{id}",
762 )
763 except httpx.HTTPStatusError as e:
764 if e.response.status_code == status.HTTP_404_NOT_FOUND:
765 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
766 else:
767 raise
769 async def set_task_run_name(self, task_run_id: UUID, name: str) -> httpx.Response: 1a
770 task_run_data = TaskRunUpdate(name=name)
771 return await self._client.patch(
772 f"/task_runs/{task_run_id}",
773 json=task_run_data.model_dump(mode="json", exclude_unset=True),
774 )
776 async def create_task_run( 1a
777 self,
778 task: "TaskObject[P, R]",
779 flow_run_id: Optional[UUID],
780 dynamic_key: str,
781 id: Optional[UUID] = None,
782 name: Optional[str] = None,
783 extra_tags: Optional[Iterable[str]] = None,
784 state: Optional[prefect.states.State[R]] = None,
785 task_inputs: Optional[
786 dict[
787 str,
788 list[Union[TaskRunResult, FlowRunResult, Parameter, Constant]],
789 ]
790 ] = None,
791 ) -> TaskRun:
792 """
793 Create a task run
795 Args:
796 task: The Task to run
797 flow_run_id: The flow run id with which to associate the task run
798 dynamic_key: A key unique to this particular run of a Task within the flow
799 id: An optional ID for the task run. If not provided, one will be generated
800 server-side.
801 name: An optional name for the task run
802 extra_tags: an optional list of extra tags to apply to the task run in
803 addition to `task.tags`
804 state: The initial state for the run. If not provided, defaults to
805 `Pending` for now. Should always be a `Scheduled` type.
806 task_inputs: the set of inputs passed to the task
808 Returns:
809 The created task run.
810 """
811 tags = set(task.tags).union(extra_tags or [])
813 if state is None:
814 state = prefect.states.Pending()
816 retry_delay = task.retry_delay_seconds
817 if isinstance(retry_delay, list):
818 retry_delay = [int(rd) for rd in retry_delay]
819 elif isinstance(retry_delay, float):
820 retry_delay = int(retry_delay)
822 task_run_data = TaskRunCreate(
823 id=id,
824 name=name,
825 flow_run_id=flow_run_id,
826 task_key=task.task_key,
827 dynamic_key=str(dynamic_key),
828 tags=list(tags),
829 task_version=task.version,
830 empirical_policy=TaskRunPolicy(
831 retries=task.retries,
832 retry_delay=retry_delay,
833 retry_jitter_factor=task.retry_jitter_factor,
834 ),
835 state=prefect.states.to_state_create(state),
836 task_inputs=task_inputs or {},
837 )
838 content = task_run_data.model_dump_json(exclude={"id"} if id is None else None)
840 response = await self._client.post("/task_runs/", content=content)
841 return TaskRun.model_validate(response.json())
843 async def read_task_run(self, task_run_id: UUID) -> TaskRun: 1a
844 """
845 Query the Prefect API for a task run by id.
847 Args:
848 task_run_id: the task run ID of interest
850 Returns:
851 a Task Run model representation of the task run
852 """
853 try:
854 response = await self._client.get(f"/task_runs/{task_run_id}")
855 return TaskRun.model_validate(response.json())
856 except httpx.HTTPStatusError as e:
857 if e.response.status_code == status.HTTP_404_NOT_FOUND:
858 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
859 else:
860 raise
862 async def read_task_runs( 1a
863 self,
864 *,
865 flow_filter: Optional[FlowFilter] = None,
866 flow_run_filter: Optional[FlowRunFilter] = None,
867 task_run_filter: Optional[TaskRunFilter] = None,
868 deployment_filter: Optional[DeploymentFilter] = None,
869 sort: Optional[TaskRunSort] = None,
870 limit: Optional[int] = None,
871 offset: int = 0,
872 ) -> list[TaskRun]:
873 """
874 Query the Prefect API for task runs. Only task runs matching all criteria will
875 be returned.
877 Args:
878 flow_filter: filter criteria for flows
879 flow_run_filter: filter criteria for flow runs
880 task_run_filter: filter criteria for task runs
881 deployment_filter: filter criteria for deployments
882 sort: sort criteria for the task runs
883 limit: a limit for the task run query
884 offset: an offset for the task run query
886 Returns:
887 a list of Task Run model representations
888 of the task runs
889 """
890 body: dict[str, Any] = {
891 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
892 "flow_runs": (
893 flow_run_filter.model_dump(mode="json", exclude_unset=True)
894 if flow_run_filter
895 else None
896 ),
897 "task_runs": (
898 task_run_filter.model_dump(mode="json") if task_run_filter else None
899 ),
900 "deployments": (
901 deployment_filter.model_dump(mode="json") if deployment_filter else None
902 ),
903 "sort": sort,
904 "limit": limit,
905 "offset": offset,
906 }
907 response = await self._client.post("/task_runs/filter", json=body)
908 return _get_type_adapter(list[TaskRun]).validate_python(response.json())
910 async def delete_task_run(self, task_run_id: UUID) -> None: 1a
911 """
912 Delete a task run by id.
914 Args:
915 task_run_id: the task run ID of interest
916 Raises:
917 prefect.exceptions.ObjectNotFound: If request returns 404
918 httpx.RequestError: If requests fails
919 """
920 try:
921 await self._client.delete(f"/task_runs/{task_run_id}")
922 except httpx.HTTPStatusError as e:
923 if e.response.status_code == 404:
924 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
925 else:
926 raise
928 async def set_task_run_state( 1a
929 self,
930 task_run_id: UUID,
931 state: prefect.states.State[T],
932 force: bool = False,
933 ) -> OrchestrationResult[T]:
934 """
935 Set the state of a task run.
937 Args:
938 task_run_id: the id of the task run
939 state: the state to set
940 force: if True, disregard orchestration logic when setting the state,
941 forcing the Prefect API to accept the state
943 Returns:
944 an OrchestrationResult model representation of state orchestration output
945 """
946 state_create = prefect.states.to_state_create(state)
947 state_create.state_details.task_run_id = task_run_id
948 response = await self._client.post(
949 f"/task_runs/{task_run_id}/set_state",
950 json=dict(state=state_create.model_dump(mode="json"), force=force),
951 )
952 result: OrchestrationResult[T] = OrchestrationResult.model_validate(
953 response.json()
954 )
955 return result
957 async def read_task_run_states( 1a
958 self, task_run_id: UUID
959 ) -> list[prefect.states.State]:
960 """
961 Query for the states of a task run
963 Args:
964 task_run_id: the id of the task run
966 Returns:
967 a list of State model representations of the task run states
968 """
969 response = await self._client.get(
970 "/task_run_states/", params=dict(task_run_id=str(task_run_id))
971 )
972 return _get_type_adapter(list[prefect.states.State]).validate_python(
973 response.json()
974 )
976 async def read_work_queues( 1a
977 self,
978 work_pool_name: Optional[str] = None,
979 work_queue_filter: Optional[WorkQueueFilter] = None,
980 limit: Optional[int] = None,
981 offset: Optional[int] = None,
982 ) -> list[WorkQueue]:
983 """
984 Retrieves queues for a work pool.
986 Args:
987 work_pool_name: Name of the work pool for which to get queues.
988 work_queue_filter: Criteria by which to filter queues.
989 limit: Limit for the queue query.
990 offset: Limit for the queue query.
992 Returns:
993 List of queues for the specified work pool.
994 """
995 json: dict[str, Any] = {
996 "work_queues": (
997 work_queue_filter.model_dump(mode="json", exclude_unset=True)
998 if work_queue_filter
999 else None
1000 ),
1001 "limit": limit,
1002 "offset": offset,
1003 }
1005 if work_pool_name:
1006 try:
1007 response = await self._client.post(
1008 f"/work_pools/{work_pool_name}/queues/filter",
1009 json=json,
1010 )
1011 except httpx.HTTPStatusError as e:
1012 if e.response.status_code == status.HTTP_404_NOT_FOUND:
1013 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1014 else:
1015 raise
1016 else:
1017 response = await self._client.post("/work_queues/filter", json=json)
1019 return _get_type_adapter(list[WorkQueue]).validate_python(response.json())
1021 async def read_worker_metadata(self) -> dict[str, Any]: 1a
1022 """Reads worker metadata stored in Prefect collection registry."""
1023 response = await self._client.get("collections/views/aggregate-worker-metadata")
1024 response.raise_for_status()
1025 return response.json()
1027 async def api_version(self) -> str: 1a
1028 res = await self._client.get("/admin/version")
1029 return res.json()
1031 def client_version(self) -> str: 1a
1032 return prefect.__version__
1034 @property 1a
1035 def loop(self) -> asyncio.AbstractEventLoop | None: 1a
1036 return self._loop
1038 async def raise_for_api_version_mismatch(self) -> None: 1a
1039 # Cloud is always compatible as a server
1040 if self.server_type == ServerType.CLOUD:
1041 return
1043 try:
1044 api_version = await self.api_version()
1045 except Exception as e:
1046 if "Unauthorized" in str(e):
1047 raise e
1048 raise RuntimeError(f"Failed to reach API at {self.api_url}") from e
1050 api_version = version.parse(api_version)
1051 client_version = version.parse(self.client_version())
1053 if api_version.major != client_version.major:
1054 raise RuntimeError(
1055 f"Found incompatible versions: client: {client_version}, server: {api_version}. "
1056 "Major versions must match."
1057 )
1058 if api_version < client_version:
1059 warning_message = (
1060 "Your Prefect server is running an older version of Prefect than your client which may result in unexpected behavior. "
1061 f"Please upgrade your Prefect server from version {api_version} to version {client_version} or higher."
1062 )
1063 try:
1064 get_run_logger().warning(warning_message)
1065 except prefect.context.MissingContextError:
1066 self.logger.warning(warning_message)
1068 async def __aenter__(self) -> Self: 1a
1069 """
1070 Start the client.
1072 If the client is already started, this will raise an exception.
1074 If the client is already closed, this will raise an exception. Use a new client
1075 instance instead.
1076 """
1077 if self._closed:
1078 # httpx.AsyncClient does not allow reuse so we will not either.
1079 raise RuntimeError(
1080 "The client cannot be started again after closing. "
1081 "Retrieve a new client with `get_client()` instead."
1082 )
1084 self._context_stack += 1
1086 if self._started:
1087 # allow reentrancy
1088 return self
1090 self._loop = asyncio.get_running_loop()
1091 await self._exit_stack.__aenter__()
1093 # Enter a lifespan context if using an ephemeral application.
1094 # See https://github.com/encode/httpx/issues/350
1095 if self._ephemeral_app and self.manage_lifespan:
1096 self._ephemeral_lifespan = await self._exit_stack.enter_async_context(
1097 app_lifespan_context(self._ephemeral_app)
1098 )
1100 if self._ephemeral_app:
1101 self.logger.debug(
1102 "Using ephemeral application with database at "
1103 f"{PREFECT_API_DATABASE_CONNECTION_URL.value()}"
1104 )
1105 else:
1106 self.logger.debug(f"Connecting to API at {self.api_url}")
1108 # Enter the httpx client's context
1109 await self._exit_stack.enter_async_context(self._client)
1111 self._started = True
1113 return self
1115 async def __aexit__(self, *exc_info: Any) -> Optional[bool]: 1a
1116 """
1117 Shutdown the client.
1118 """
1120 self._context_stack -= 1
1121 if self._context_stack > 0:
1122 return
1123 self._closed = True
1124 return await self._exit_stack.__aexit__(*exc_info)
1126 def __enter__(self) -> NoReturn: 1a
1127 raise RuntimeError(
1128 "The `PrefectClient` must be entered with an async context. Use 'async "
1129 "with PrefectClient(...)' not 'with PrefectClient(...)'"
1130 )
1132 def __exit__(self, *_: object) -> NoReturn: 1a
1133 assert False, "This should never be called but must be defined for __enter__"
1136class SyncPrefectClient( 1a
1137 ArtifactClient,
1138 ArtifactCollectionClient,
1139 LogClient,
1140 VariableClient,
1141 ConcurrencyLimitClient,
1142 DeploymentClient,
1143 AutomationClient,
1144 SlaClient,
1145 FlowRunClient,
1146 FlowClient,
1147 BlocksDocumentClient,
1148 BlocksSchemaClient,
1149 BlocksTypeClient,
1150 WorkPoolClient,
1151 EventClient,
1152):
1153 """
1154 A synchronous client for interacting with the [Prefect REST API](https://docs.prefect.io/v3/api-ref/rest-api/).
1156 Args:
1157 api: the REST API URL or FastAPI application to connect to
1158 api_key: An optional API key for authentication.
1159 api_version: The API version this client is compatible with.
1160 httpx_settings: An optional dictionary of settings to pass to the underlying
1161 `httpx.Client`
1163 Examples:
1165 Say hello to a Prefect REST API
1167 ```python
1168 with get_client(sync_client=True) as client:
1169 response = client.hello()
1171 print(response.json())
1172 👋
1173 ```
1174 """
1176 def __init__( 1a
1177 self,
1178 api: Union[str, ASGIApp],
1179 *,
1180 auth_string: Optional[str] = None,
1181 api_key: Optional[str] = None,
1182 api_version: Optional[str] = None,
1183 httpx_settings: Optional[dict[str, Any]] = None,
1184 server_type: Optional[ServerType] = None,
1185 ) -> None:
1186 httpx_settings = httpx_settings.copy() if httpx_settings else {}
1187 httpx_settings.setdefault("headers", {})
1189 tls_verify = httpx_settings.get("verify")
1190 if not tls_verify or not isinstance(tls_verify, ssl.SSLContext):
1191 if PREFECT_API_TLS_INSECURE_SKIP_VERIFY:
1192 # Create an unverified context for insecure connections
1193 ctx = ssl.create_default_context()
1194 ctx.check_hostname = False
1195 ctx.verify_mode = ssl.CERT_NONE
1196 httpx_settings.setdefault("verify", ctx)
1197 else:
1198 cert_file = PREFECT_API_SSL_CERT_FILE.value()
1199 if not cert_file:
1200 cert_file = certifi.where()
1201 # Create a verified context with the certificate file
1202 ctx = ssl.create_default_context(cafile=cert_file)
1203 httpx_settings.setdefault("verify", ctx)
1205 if api_version is None:
1206 api_version = SERVER_API_VERSION
1207 httpx_settings["headers"].setdefault("X-PREFECT-API-VERSION", api_version)
1208 # Prioritize auth_string if provided, otherwise use api_key
1209 if auth_string:
1210 token = base64.b64encode(auth_string.encode("utf-8")).decode("utf-8")
1211 httpx_settings["headers"]["Authorization"] = (
1212 f"Basic {token}" # Overwrite if exists
1213 )
1214 elif api_key:
1215 httpx_settings["headers"]["Authorization"] = (
1216 f"Bearer {api_key}" # Set if auth_string is not present
1217 )
1219 # Context management
1220 self._context_stack: int = 0
1221 self._ephemeral_app: Optional[ASGIApp] = None
1222 self.manage_lifespan = True
1223 self.server_type: ServerType
1225 self._closed = False
1226 self._started = False
1228 # Connect to an external application
1229 if isinstance(api, str):
1230 if httpx_settings.get("app"):
1231 raise ValueError(
1232 "Invalid httpx settings: `app` cannot be set when providing an "
1233 "api url. `app` is only for use with ephemeral instances. Provide "
1234 "it as the `api` parameter instead."
1235 )
1236 httpx_settings.setdefault("base_url", api)
1238 # See https://www.python-httpx.org/advanced/#pool-limit-configuration
1239 httpx_settings.setdefault(
1240 "limits",
1241 httpx.Limits(
1242 # We see instability when allowing the client to open many connections at once.
1243 # Limiting concurrency results in more stable performance.
1244 max_connections=16,
1245 max_keepalive_connections=8,
1246 # The Prefect Cloud LB will keep connections alive for 30s.
1247 # Only allow the client to keep connections alive for 25s.
1248 keepalive_expiry=25,
1249 ),
1250 )
1252 # See https://www.python-httpx.org/http2/
1253 # Enabling HTTP/2 support on the client does not necessarily mean that your requests
1254 # and responses will be transported over HTTP/2, since both the client and the server
1255 # need to support HTTP/2. If you connect to a server that only supports HTTP/1.1 the
1256 # client will use a standard HTTP/1.1 connection instead.
1257 httpx_settings.setdefault("http2", PREFECT_API_ENABLE_HTTP2.value())
1259 if server_type:
1260 self.server_type = server_type
1261 else:
1262 self.server_type = (
1263 ServerType.CLOUD
1264 if api.startswith(PREFECT_CLOUD_API_URL.value())
1265 else ServerType.SERVER
1266 )
1268 # Connect to an in-process application
1269 else:
1270 self._ephemeral_app = api
1271 self.server_type = ServerType.EPHEMERAL
1273 # See https://www.python-httpx.org/advanced/#timeout-configuration
1274 httpx_settings.setdefault(
1275 "timeout",
1276 httpx.Timeout(
1277 connect=PREFECT_API_REQUEST_TIMEOUT.value(),
1278 read=PREFECT_API_REQUEST_TIMEOUT.value(),
1279 write=PREFECT_API_REQUEST_TIMEOUT.value(),
1280 pool=PREFECT_API_REQUEST_TIMEOUT.value(),
1281 ),
1282 )
1284 if not PREFECT_TESTING_UNIT_TEST_MODE:
1285 httpx_settings.setdefault("follow_redirects", True)
1287 enable_csrf_support = (
1288 self.server_type != ServerType.CLOUD
1289 and PREFECT_CLIENT_CSRF_SUPPORT_ENABLED.value()
1290 )
1292 self._client = PrefectHttpxSyncClient(
1293 **httpx_settings, enable_csrf_support=enable_csrf_support
1294 )
1296 # See https://www.python-httpx.org/advanced/#custom-transports
1297 #
1298 # If we're using an HTTP/S client (not the ephemeral client), adjust the
1299 # transport to add retries _after_ it is instantiated. If we alter the transport
1300 # before instantiation, the transport will not be aware of proxies unless we
1301 # reproduce all of the logic to make it so.
1302 #
1303 # Only alter the transport to set our default of 3 retries, don't modify any
1304 # transport a user may have provided via httpx_settings.
1305 #
1306 # Making liberal use of getattr and isinstance checks here to avoid any
1307 # surprises if the internals of httpx or httpcore change on us
1308 if isinstance(api, str) and not httpx_settings.get("transport"):
1309 transport_for_url = getattr(self._client, "_transport_for_url", None)
1310 if callable(transport_for_url):
1311 server_transport = transport_for_url(httpx.URL(api))
1312 if isinstance(server_transport, httpx.HTTPTransport):
1313 pool = getattr(server_transport, "_pool", None)
1314 if isinstance(pool, httpcore.ConnectionPool):
1315 setattr(pool, "_retries", 3)
1317 self.logger: Logger = get_logger("client")
1319 @property 1a
1320 def api_url(self) -> httpx.URL: 1a
1321 """
1322 Get the base URL for the API.
1323 """
1324 return self._client.base_url
1326 # Context management ----------------------------------------------------------------
1328 def __enter__(self) -> "SyncPrefectClient": 1a
1329 """
1330 Start the client.
1332 If the client is already started, this will raise an exception.
1334 If the client is already closed, this will raise an exception. Use a new client
1335 instance instead.
1336 """
1337 if self._closed:
1338 # httpx.Client does not allow reuse so we will not either.
1339 raise RuntimeError(
1340 "The client cannot be started again after closing. "
1341 "Retrieve a new client with `get_client()` instead."
1342 )
1344 self._context_stack += 1
1346 if self._started:
1347 # allow reentrancy
1348 return self
1350 self._client.__enter__()
1351 self._started = True
1353 return self
1355 def __exit__(self, *exc_info: Any) -> None: 1a
1356 """
1357 Shutdown the client.
1358 """
1359 self._context_stack -= 1
1360 if self._context_stack > 0:
1361 return
1362 self._closed = True
1363 self._client.__exit__(*exc_info)
1365 # API methods ----------------------------------------------------------------------
1367 def api_healthcheck(self) -> Optional[Exception]: 1a
1368 """
1369 Attempts to connect to the API and returns the encountered exception if not
1370 successful.
1372 If successful, returns `None`.
1373 """
1374 try:
1375 self._client.get("/health")
1376 return None
1377 except Exception as exc:
1378 return exc
1380 def hello(self) -> httpx.Response: 1a
1381 """
1382 Send a GET request to /hello for testing purposes.
1383 """
1384 return self._client.get("/hello")
1386 def api_version(self) -> str: 1a
1387 res = self._client.get("/admin/version")
1388 return res.json()
1390 def client_version(self) -> str: 1a
1391 return prefect.__version__
1393 def raise_for_api_version_mismatch(self) -> None: 1a
1394 # Cloud is always compatible as a server
1395 if self.server_type == ServerType.CLOUD:
1396 return
1398 try:
1399 api_version = self.api_version()
1400 except Exception as e:
1401 if "Unauthorized" in str(e):
1402 raise e
1403 raise RuntimeError(f"Failed to reach API at {self.api_url}") from e
1405 api_version = version.parse(api_version)
1406 client_version = version.parse(self.client_version())
1408 if api_version.major != client_version.major:
1409 raise RuntimeError(
1410 f"Found incompatible versions: client: {client_version}, server: {api_version}. "
1411 "Major versions must match."
1412 )
1413 if api_version < client_version:
1414 warning_message = (
1415 "Your Prefect server is running an older version of Prefect than your client which may result in unexpected behavior. "
1416 f"Please upgrade your Prefect server from version {api_version} to version {client_version} or higher."
1417 )
1418 try:
1419 get_run_logger().warning(warning_message)
1420 except prefect.context.MissingContextError:
1421 self.logger.warning(warning_message)
1423 def set_task_run_name(self, task_run_id: UUID, name: str) -> httpx.Response: 1a
1424 task_run_data = TaskRunUpdate(name=name)
1425 return self._client.patch(
1426 f"/task_runs/{task_run_id}",
1427 json=task_run_data.model_dump(mode="json", exclude_unset=True),
1428 )
1430 def create_task_run( 1a
1431 self,
1432 task: "TaskObject[P, R]",
1433 flow_run_id: Optional[UUID],
1434 dynamic_key: str,
1435 id: Optional[UUID] = None,
1436 name: Optional[str] = None,
1437 extra_tags: Optional[Iterable[str]] = None,
1438 state: Optional[prefect.states.State[R]] = None,
1439 task_inputs: Optional[
1440 dict[
1441 str,
1442 list[
1443 Union[
1444 TaskRunResult,
1445 FlowRunResult,
1446 Parameter,
1447 Constant,
1448 ]
1449 ],
1450 ]
1451 ] = None,
1452 ) -> TaskRun:
1453 """
1454 Create a task run
1456 Args:
1457 task: The Task to run
1458 flow_run_id: The flow run id with which to associate the task run
1459 dynamic_key: A key unique to this particular run of a Task within the flow
1460 id: An optional ID for the task run. If not provided, one will be generated
1461 server-side.
1462 name: An optional name for the task run
1463 extra_tags: an optional list of extra tags to apply to the task run in
1464 addition to `task.tags`
1465 state: The initial state for the run. If not provided, defaults to
1466 `Pending` for now. Should always be a `Scheduled` type.
1467 task_inputs: the set of inputs passed to the task
1469 Returns:
1470 The created task run.
1471 """
1472 tags = set(task.tags).union(extra_tags or [])
1474 if state is None:
1475 state = prefect.states.Pending()
1477 retry_delay = task.retry_delay_seconds
1478 if isinstance(retry_delay, list):
1479 retry_delay = [int(rd) for rd in retry_delay]
1480 elif isinstance(retry_delay, float):
1481 retry_delay = int(retry_delay)
1483 task_run_data = TaskRunCreate(
1484 id=id,
1485 name=name,
1486 flow_run_id=flow_run_id,
1487 task_key=task.task_key,
1488 dynamic_key=dynamic_key,
1489 tags=list(tags),
1490 task_version=task.version,
1491 empirical_policy=TaskRunPolicy(
1492 retries=task.retries,
1493 retry_delay=retry_delay,
1494 retry_jitter_factor=task.retry_jitter_factor,
1495 ),
1496 state=prefect.states.to_state_create(state),
1497 task_inputs=task_inputs or {},
1498 )
1500 content = task_run_data.model_dump_json(exclude={"id"} if id is None else None)
1502 response = self._client.post("/task_runs/", content=content)
1503 return TaskRun.model_validate(response.json())
1505 def read_task_run(self, task_run_id: UUID) -> TaskRun: 1a
1506 """
1507 Query the Prefect API for a task run by id.
1509 Args:
1510 task_run_id: the task run ID of interest
1512 Returns:
1513 a Task Run model representation of the task run
1514 """
1515 try:
1516 response = self._client.get(f"/task_runs/{task_run_id}")
1517 return TaskRun.model_validate(response.json())
1518 except httpx.HTTPStatusError as e:
1519 if e.response.status_code == status.HTTP_404_NOT_FOUND:
1520 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1521 else:
1522 raise
1524 def read_task_runs( 1a
1525 self,
1526 *,
1527 flow_filter: Optional[FlowFilter] = None,
1528 flow_run_filter: Optional[FlowRunFilter] = None,
1529 task_run_filter: Optional[TaskRunFilter] = None,
1530 deployment_filter: Optional[DeploymentFilter] = None,
1531 sort: Optional[TaskRunSort] = None,
1532 limit: Optional[int] = None,
1533 offset: int = 0,
1534 ) -> list[TaskRun]:
1535 """
1536 Query the Prefect API for task runs. Only task runs matching all criteria will
1537 be returned.
1539 Args:
1540 flow_filter: filter criteria for flows
1541 flow_run_filter: filter criteria for flow runs
1542 task_run_filter: filter criteria for task runs
1543 deployment_filter: filter criteria for deployments
1544 sort: sort criteria for the task runs
1545 limit: a limit for the task run query
1546 offset: an offset for the task run query
1548 Returns:
1549 a list of Task Run model representations
1550 of the task runs
1551 """
1552 body: dict[str, Any] = {
1553 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
1554 "flow_runs": (
1555 flow_run_filter.model_dump(mode="json", exclude_unset=True)
1556 if flow_run_filter
1557 else None
1558 ),
1559 "task_runs": (
1560 task_run_filter.model_dump(mode="json") if task_run_filter else None
1561 ),
1562 "deployments": (
1563 deployment_filter.model_dump(mode="json") if deployment_filter else None
1564 ),
1565 "sort": sort,
1566 "limit": limit,
1567 "offset": offset,
1568 }
1569 response = self._client.post("/task_runs/filter", json=body)
1570 return _get_type_adapter(list[TaskRun]).validate_python(response.json())
1572 def set_task_run_state( 1a
1573 self,
1574 task_run_id: UUID,
1575 state: prefect.states.State[Any],
1576 force: bool = False,
1577 ) -> OrchestrationResult[Any]:
1578 """
1579 Set the state of a task run.
1581 Args:
1582 task_run_id: the id of the task run
1583 state: the state to set
1584 force: if True, disregard orchestration logic when setting the state,
1585 forcing the Prefect API to accept the state
1587 Returns:
1588 an OrchestrationResult model representation of state orchestration output
1589 """
1590 state_create = prefect.states.to_state_create(state)
1591 state_create.state_details.task_run_id = task_run_id
1592 response = self._client.post(
1593 f"/task_runs/{task_run_id}/set_state",
1594 json=dict(state=state_create.model_dump(mode="json"), force=force),
1595 )
1596 result: OrchestrationResult[Any] = OrchestrationResult.model_validate(
1597 response.json()
1598 )
1599 return result
1601 def read_task_run_states(self, task_run_id: UUID) -> list[prefect.states.State]: 1a
1602 """
1603 Query for the states of a task run
1605 Args:
1606 task_run_id: the id of the task run
1608 Returns:
1609 a list of State model representations of the task run states
1610 """
1611 response = self._client.get(
1612 "/task_run_states/", params=dict(task_run_id=str(task_run_id))
1613 )
1614 return _get_type_adapter(list[prefect.states.State]).validate_python(
1615 response.json()
1616 )
1618 def delete_task_run(self, task_run_id: UUID) -> None: 1a
1619 """
1620 Delete a task run by id.
1622 Args:
1623 task_run_id: the task run ID of interest
1624 Raises:
1625 prefect.exceptions.ObjectNotFound: If request returns 404
1626 httpx.RequestError: If requests fails
1627 """
1628 try:
1629 self._client.delete(f"/task_runs/{task_run_id}")
1630 except httpx.HTTPStatusError as e:
1631 if e.response.status_code == 404:
1632 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1633 else:
1634 raise
1636 def create_work_queue( 1a
1637 self,
1638 name: str,
1639 description: Optional[str] = None,
1640 is_paused: Optional[bool] = None,
1641 concurrency_limit: Optional[int] = None,
1642 priority: Optional[int] = None,
1643 work_pool_name: Optional[str] = None,
1644 ) -> WorkQueue:
1645 """
1646 Create a work queue.
1648 Args:
1649 name: a unique name for the work queue
1650 description: An optional description for the work queue.
1651 is_paused: Whether or not the work queue is paused.
1652 concurrency_limit: An optional concurrency limit for the work queue.
1653 priority: The queue's priority. Lower values are higher priority (1 is the highest).
1654 work_pool_name: The name of the work pool to use for this queue.
1656 Raises:
1657 prefect.exceptions.ObjectAlreadyExists: If request returns 409
1658 httpx.RequestError: If request fails
1660 Returns:
1661 The created work queue
1662 """
1663 create_model = WorkQueueCreate(name=name, filter=None)
1664 if description is not None:
1665 create_model.description = description
1666 if is_paused is not None:
1667 create_model.is_paused = is_paused
1668 if concurrency_limit is not None:
1669 create_model.concurrency_limit = concurrency_limit
1670 if priority is not None:
1671 create_model.priority = priority
1673 data = create_model.model_dump(mode="json")
1674 try:
1675 if work_pool_name is not None:
1676 response = self._client.post(
1677 f"/work_pools/{work_pool_name}/queues", json=data
1678 )
1679 else:
1680 response = self._client.post("/work_queues/", json=data)
1681 except httpx.HTTPStatusError as e:
1682 if e.response.status_code == status.HTTP_409_CONFLICT:
1683 raise prefect.exceptions.ObjectAlreadyExists(http_exc=e) from e
1684 elif e.response.status_code == status.HTTP_404_NOT_FOUND:
1685 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1686 else:
1687 raise
1688 return WorkQueue.model_validate(response.json())
1690 def read_work_queue_by_name( 1a
1691 self,
1692 name: str,
1693 work_pool_name: Optional[str] = None,
1694 ) -> WorkQueue:
1695 """
1696 Read a work queue by name.
1698 Args:
1699 name (str): a unique name for the work queue
1700 work_pool_name (str, optional): the name of the work pool
1701 the queue belongs to.
1703 Raises:
1704 prefect.exceptions.ObjectNotFound: if no work queue is found
1705 httpx.HTTPStatusError: other status errors
1707 Returns:
1708 WorkQueue: a work queue API object
1709 """
1710 try:
1711 if work_pool_name is not None:
1712 response = self._client.get(
1713 f"/work_pools/{work_pool_name}/queues/{name}"
1714 )
1715 else:
1716 response = self._client.get(f"/work_queues/name/{name}")
1717 except httpx.HTTPStatusError as e:
1718 if e.response.status_code == status.HTTP_404_NOT_FOUND:
1719 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1720 else:
1721 raise
1723 return WorkQueue.model_validate(response.json())
1725 def update_work_queue(self, id: UUID, **kwargs: Any) -> None: 1a
1726 """
1727 Update properties of a work queue.
1729 Args:
1730 id: the ID of the work queue to update
1731 **kwargs: the fields to update
1733 Raises:
1734 ValueError: if no kwargs are provided
1735 prefect.exceptions.ObjectNotFound: if request returns 404
1736 httpx.RequestError: if the request fails
1738 """
1739 if not kwargs:
1740 raise ValueError("No fields provided to update.")
1742 data = WorkQueueUpdate(**kwargs).model_dump(mode="json", exclude_unset=True)
1743 try:
1744 self._client.patch(f"/work_queues/{id}", json=data)
1745 except httpx.HTTPStatusError as e:
1746 if e.response.status_code == status.HTTP_404_NOT_FOUND:
1747 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1748 else:
1749 raise
1751 def get_runs_in_work_queue( 1a
1752 self,
1753 id: UUID,
1754 limit: int = 10,
1755 scheduled_before: Optional[datetime.datetime] = None,
1756 ) -> list[FlowRun]:
1757 """
1758 Read flow runs off a work queue.
1760 Args:
1761 id: the id of the work queue to read from
1762 limit: a limit on the number of runs to return
1763 scheduled_before: a timestamp; only runs scheduled before this time will be returned.
1764 Defaults to now.
1766 Raises:
1767 prefect.exceptions.ObjectNotFound: If request returns 404
1768 httpx.RequestError: If request fails
1770 Returns:
1771 List[FlowRun]: a list of FlowRun objects read from the queue
1772 """
1773 if scheduled_before is None:
1774 scheduled_before = now("UTC")
1776 try:
1777 response = self._client.post(
1778 f"/work_queues/{id}/get_runs",
1779 json={
1780 "limit": limit,
1781 "scheduled_before": scheduled_before.isoformat(),
1782 },
1783 )
1784 except httpx.HTTPStatusError as e:
1785 if e.response.status_code == status.HTTP_404_NOT_FOUND:
1786 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1787 else:
1788 raise
1789 return _get_type_adapter(list[FlowRun]).validate_python(response.json())
1791 def read_work_queue( 1a
1792 self,
1793 id: UUID,
1794 ) -> WorkQueue:
1795 """
1796 Read a work queue.
1798 Args:
1799 id: the id of the work queue to load
1801 Raises:
1802 prefect.exceptions.ObjectNotFound: If request returns 404
1803 httpx.RequestError: If request fails
1805 Returns:
1806 WorkQueue: an instantiated WorkQueue object
1807 """
1808 try:
1809 response = self._client.get(f"/work_queues/{id}")
1810 except httpx.HTTPStatusError as e:
1811 if e.response.status_code == status.HTTP_404_NOT_FOUND:
1812 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1813 else:
1814 raise
1815 return WorkQueue.model_validate(response.json())
1817 def read_work_queue_status( 1a
1818 self,
1819 id: UUID,
1820 ) -> WorkQueueStatusDetail:
1821 """
1822 Read a work queue status.
1824 Args:
1825 id: the id of the work queue to load
1827 Raises:
1828 prefect.exceptions.ObjectNotFound: If request returns 404
1829 httpx.RequestError: If request fails
1831 Returns:
1832 WorkQueueStatus: an instantiated WorkQueueStatus object
1833 """
1834 try:
1835 response = self._client.get(f"/work_queues/{id}/status")
1836 except httpx.HTTPStatusError as e:
1837 if e.response.status_code == status.HTTP_404_NOT_FOUND:
1838 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1839 else:
1840 raise
1841 return WorkQueueStatusDetail.model_validate(response.json())
1843 def match_work_queues( 1a
1844 self,
1845 prefixes: list[str],
1846 work_pool_name: Optional[str] = None,
1847 ) -> list[WorkQueue]:
1848 """
1849 Query the Prefect API for work queues with names with a specific prefix.
1851 Args:
1852 prefixes: a list of strings used to match work queue name prefixes
1853 work_pool_name: an optional work pool name to scope the query to
1855 Returns:
1856 a list of WorkQueue model representations
1857 of the work queues
1858 """
1859 page_length = 100
1860 current_page = 0
1861 work_queues: list[WorkQueue] = []
1863 while True:
1864 new_queues = self.read_work_queues(
1865 work_pool_name=work_pool_name,
1866 offset=current_page * page_length,
1867 limit=page_length,
1868 work_queue_filter=WorkQueueFilter(
1869 name=WorkQueueFilterName(startswith_=prefixes)
1870 ),
1871 )
1872 if not new_queues:
1873 break
1874 work_queues += new_queues
1875 current_page += 1
1877 return work_queues
1879 def delete_work_queue_by_id( 1a
1880 self,
1881 id: UUID,
1882 ) -> None:
1883 """
1884 Delete a work queue by its ID.
1886 Args:
1887 id: the id of the work queue to delete
1889 Raises:
1890 prefect.exceptions.ObjectNotFound: If request returns 404
1891 httpx.RequestError: If requests fails
1892 """
1893 try:
1894 self._client.delete(
1895 f"/work_queues/{id}",
1896 )
1897 except httpx.HTTPStatusError as e:
1898 if e.response.status_code == status.HTTP_404_NOT_FOUND:
1899 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1900 else:
1901 raise
1903 def read_work_queues( 1a
1904 self,
1905 work_pool_name: Optional[str] = None,
1906 work_queue_filter: Optional[WorkQueueFilter] = None,
1907 limit: Optional[int] = None,
1908 offset: Optional[int] = None,
1909 ) -> list[WorkQueue]:
1910 """
1911 Retrieves queues for a work pool.
1913 Args:
1914 work_pool_name: Name of the work pool for which to get queues.
1915 work_queue_filter: Criteria by which to filter queues.
1916 limit: Limit for the queue query.
1917 offset: Limit for the queue query.
1919 Returns:
1920 List of queues for the specified work pool.
1921 """
1922 json: dict[str, Any] = {
1923 "work_queues": (
1924 work_queue_filter.model_dump(mode="json", exclude_unset=True)
1925 if work_queue_filter
1926 else None
1927 ),
1928 "limit": limit,
1929 "offset": offset,
1930 }
1932 if work_pool_name:
1933 try:
1934 response = self._client.post(
1935 f"/work_pools/{work_pool_name}/queues/filter",
1936 json=json,
1937 )
1938 except httpx.HTTPStatusError as e:
1939 if e.response.status_code == status.HTTP_404_NOT_FOUND:
1940 raise prefect.exceptions.ObjectNotFound(http_exc=e) from e
1941 else:
1942 raise
1943 else:
1944 response = self._client.post("/work_queues/filter", json=json)
1946 return _get_type_adapter(list[WorkQueue]).validate_python(response.json())
1948 def read_worker_metadata(self) -> dict[str, Any]: 1a
1949 """Reads worker metadata stored in Prefect collection registry."""
1950 response = self._client.get("collections/views/aggregate-worker-metadata")
1951 response.raise_for_status()
1952 return response.json()