Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_deployments/client.py: 10%
369 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3from collections.abc import Iterable 1a
4from typing import TYPE_CHECKING, Any, Union 1a
5from uuid import UUID 1a
7from httpx import HTTPStatusError, RequestError 1a
9from prefect._internal.compatibility.deprecated import deprecated_callable 1a
10from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a
11from prefect.exceptions import ObjectAlreadyExists, ObjectLimitReached, ObjectNotFound 1a
13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a
14 import datetime
16 from prefect.client.schemas import FlowRun
17 from prefect.client.schemas.actions import (
18 DeploymentCreate,
19 DeploymentScheduleCreate,
20 DeploymentUpdate,
21 )
22 from prefect.client.schemas.filters import (
23 DeploymentFilter,
24 FlowFilter,
25 FlowRunFilter,
26 TaskRunFilter,
27 WorkPoolFilter,
28 WorkQueueFilter,
29 )
30 from prefect.client.schemas.objects import (
31 ConcurrencyOptions,
32 DeploymentBranchingOptions,
33 DeploymentSchedule,
34 VersionInfo,
35 )
36 from prefect.client.schemas.responses import (
37 DeploymentResponse,
38 FlowRunResponse,
39 )
40 from prefect.client.schemas.schedules import SCHEDULE_TYPES
41 from prefect.client.schemas.sorting import (
42 DeploymentSort,
43 )
44 from prefect.states import State
45 from prefect.types import KeyValueLabelsField
48class DeploymentClient(BaseClient): 1a
49 def create_deployment( 1a
50 self,
51 flow_id: UUID,
52 name: str,
53 version: str | None = None,
54 version_info: "VersionInfo | None" = None,
55 schedules: list["DeploymentScheduleCreate"] | None = None,
56 concurrency_limit: int | None = None,
57 concurrency_options: "ConcurrencyOptions | None" = None,
58 parameters: dict[str, Any] | None = None,
59 description: str | None = None,
60 work_queue_name: str | None = None,
61 work_pool_name: str | None = None,
62 tags: list[str] | None = None,
63 storage_document_id: UUID | None = None,
64 path: str | None = None,
65 entrypoint: str | None = None,
66 infrastructure_document_id: UUID | None = None,
67 parameter_openapi_schema: dict[str, Any] | None = None,
68 paused: bool | None = None,
69 pull_steps: list[dict[str, Any]] | None = None,
70 enforce_parameter_schema: bool | None = None,
71 job_variables: dict[str, Any] | None = None,
72 branch: str | None = None,
73 base: UUID | None = None,
74 root: UUID | None = None,
75 ) -> UUID:
76 """
77 Create a deployment.
79 Args:
80 flow_id: the flow ID to create a deployment for
81 name: the name of the deployment
82 version: an optional version string for the deployment
83 tags: an optional list of tags to apply to the deployment
84 storage_document_id: an reference to the storage block document
85 used for the deployed flow
86 infrastructure_document_id: an reference to the infrastructure block document
87 to use for this deployment
88 job_variables: A dictionary of dot delimited infrastructure overrides that
89 will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
90 `namespace='prefect'`. This argument was previously named `infra_overrides`.
91 Both arguments are supported for backwards compatibility.
93 Raises:
94 RequestError: if the deployment was not created for any reason
96 Returns:
97 the ID of the deployment in the backend
98 """
100 from prefect.client.schemas.actions import DeploymentCreate
102 deployment_create = DeploymentCreate(
103 flow_id=flow_id,
104 name=name,
105 version=version,
106 version_info=version_info,
107 parameters=dict(parameters or {}),
108 tags=list(tags or []),
109 work_queue_name=work_queue_name,
110 description=description,
111 storage_document_id=storage_document_id,
112 path=path,
113 entrypoint=entrypoint,
114 infrastructure_document_id=infrastructure_document_id,
115 job_variables=dict(job_variables or {}),
116 parameter_openapi_schema=parameter_openapi_schema or {},
117 paused=paused,
118 schedules=schedules or [],
119 concurrency_limit=concurrency_limit,
120 concurrency_options=concurrency_options,
121 pull_steps=pull_steps,
122 enforce_parameter_schema=enforce_parameter_schema,
123 branch=branch,
124 base=base,
125 root=root,
126 )
128 if work_pool_name is not None:
129 deployment_create.work_pool_name = work_pool_name
131 # Exclude newer fields that are not set to avoid compatibility issues
132 exclude = {
133 field
134 for field in [
135 "work_pool_name",
136 "work_queue_name",
137 ]
138 if field not in deployment_create.model_fields_set
139 }
141 exclude_if_none = [
142 "paused",
143 "pull_steps",
144 "enforce_parameter_schema",
145 "version_info",
146 "branch",
147 "base",
148 "root",
149 ]
151 for field in exclude_if_none:
152 if getattr(deployment_create, field) is None:
153 exclude.add(field)
155 payload = deployment_create.model_dump(mode="json", exclude=exclude)
156 if deployment_create.version_info:
157 payload["version_info"] = deployment_create.version_info.model_dump(
158 mode="json"
159 )
161 try:
162 response = self.request("POST", "/deployments/", json=payload)
163 except HTTPStatusError as e:
164 if e.response.status_code == 403 and "maximum number" in str(e):
165 raise ObjectLimitReached(http_exc=e) from e
166 if e.response.status_code == 409:
167 raise ObjectAlreadyExists(http_exc=e) from e
168 else:
169 raise
171 deployment_id = response.json().get("id")
172 if not deployment_id:
173 raise RequestError(f"Malformed response: {response}")
175 return UUID(deployment_id)
177 def _set_deployment_paused_state(self, deployment_id: UUID, paused: bool) -> None: 1a
178 self.request(
179 "PATCH",
180 "/deployments/{id}",
181 path_params={"id": deployment_id},
182 json={"paused": paused},
183 )
185 @deprecated_callable( 1a
186 start_date="Jun 2025",
187 help="Use pause_deployment or resume_deployment instead.",
188 )
189 def set_deployment_paused_state(self, deployment_id: UUID, paused: bool) -> None: 1a
190 """
191 DEPRECATED: Use pause_deployment or resume_deployment instead.
193 Set the paused state of a deployment.
195 Args:
196 deployment_id: the deployment ID to update
197 paused: whether the deployment should be paused
198 """
199 self._set_deployment_paused_state(deployment_id, paused)
201 def pause_deployment(self, deployment_id: Union[UUID, str]) -> None: 1a
202 """
203 Pause a deployment by ID.
205 Args:
206 deployment_id: The deployment ID of interest (can be a UUID or a string).
208 Raises:
209 ObjectNotFound: If request returns 404
210 RequestError: If request fails
211 """
212 if not isinstance(deployment_id, UUID):
213 try:
214 deployment_id = UUID(deployment_id)
215 except ValueError:
216 raise ValueError(f"Invalid deployment ID: {deployment_id}")
218 try:
219 self._set_deployment_paused_state(deployment_id, paused=True)
220 except HTTPStatusError as e:
221 if e.response.status_code == 404:
222 raise ObjectNotFound(http_exc=e) from e
223 else:
224 raise
226 def resume_deployment(self, deployment_id: Union[UUID, str]) -> None: 1a
227 """
228 Resume (unpause) a deployment by ID.
230 Args:
231 deployment_id: The deployment ID of interest (can be a UUID or a string).
233 Raises:
234 ObjectNotFound: If request returns 404
235 RequestError: If request fails
236 """
237 if not isinstance(deployment_id, UUID):
238 try:
239 deployment_id = UUID(deployment_id)
240 except ValueError:
241 raise ValueError(f"Invalid deployment ID: {deployment_id}")
243 try:
244 self._set_deployment_paused_state(deployment_id, paused=False)
245 except HTTPStatusError as e:
246 if e.response.status_code == 404:
247 raise ObjectNotFound(http_exc=e) from e
248 else:
249 raise
251 def update_deployment( 1a
252 self,
253 deployment_id: UUID,
254 deployment: "DeploymentUpdate",
255 ) -> None:
256 exclude_if_none = [
257 "version_info",
258 ]
260 exclude = {"name", "flow_name", "triggers"}
261 for field in exclude_if_none:
262 if getattr(deployment, field) is None:
263 exclude.add(field)
265 payload = deployment.model_dump(
266 mode="json",
267 exclude_unset=True,
268 exclude=exclude,
269 )
270 if deployment.version_info:
271 payload["version_info"] = deployment.version_info.model_dump(mode="json")
273 self.request(
274 "PATCH",
275 "/deployments/{id}",
276 path_params={"id": deployment_id},
277 json=payload,
278 )
280 def _create_deployment_from_schema(self, schema: "DeploymentCreate") -> UUID: 1a
281 """
282 Create a deployment from a prepared `DeploymentCreate` schema.
283 """
284 # TODO: We are likely to remove this method once we have considered the
285 # packaging interface for deployments further.
286 response = self.request(
287 "POST", "/deployments/", json=schema.model_dump(mode="json")
288 )
289 deployment_id = response.json().get("id")
290 if not deployment_id:
291 raise RequestError(f"Malformed response: {response}")
293 return UUID(deployment_id)
295 def read_deployment( 1a
296 self,
297 deployment_id: Union[UUID, str],
298 ) -> "DeploymentResponse":
299 """
300 Query the Prefect API for a deployment by id.
302 Args:
303 deployment_id: the deployment ID of interest
305 Returns:
306 a Deployment model representation of the deployment
307 """
309 from prefect.client.schemas.responses import DeploymentResponse
311 if not isinstance(deployment_id, UUID):
312 try:
313 deployment_id = UUID(deployment_id)
314 except ValueError:
315 raise ValueError(f"Invalid deployment ID: {deployment_id}")
317 try:
318 response = self.request(
319 "GET",
320 "/deployments/{id}",
321 path_params={"id": deployment_id},
322 )
323 except HTTPStatusError as e:
324 if e.response.status_code == 404:
325 raise ObjectNotFound(http_exc=e) from e
326 else:
327 raise
328 return DeploymentResponse.model_validate(response.json())
330 def read_deployment_by_name( 1a
331 self,
332 name: str,
333 ) -> "DeploymentResponse":
334 """
335 Query the Prefect API for a deployment by name.
337 Args:
338 name: A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME>
340 Raises:
341 ObjectNotFound: If request returns 404
342 RequestError: If request fails
344 Returns:
345 a Deployment model representation of the deployment
346 """
347 from prefect.client.schemas.responses import DeploymentResponse
349 try:
350 flow_name, deployment_name = name.split("/")
351 response = self.request(
352 "GET",
353 "/deployments/name/{flow_name}/{deployment_name}",
354 path_params={
355 "flow_name": flow_name,
356 "deployment_name": deployment_name,
357 },
358 )
359 except (HTTPStatusError, ValueError) as e:
360 if isinstance(e, HTTPStatusError) and e.response.status_code == 404:
361 raise ObjectNotFound(http_exc=e) from e
362 elif isinstance(e, ValueError):
363 raise ValueError(
364 f"Invalid deployment name format: {name}. Expected format: <FLOW_NAME>/<DEPLOYMENT_NAME>"
365 ) from e
366 else:
367 raise
369 return DeploymentResponse.model_validate(response.json())
371 def read_deployments( 1a
372 self,
373 *,
374 flow_filter: "FlowFilter | None" = None,
375 flow_run_filter: "FlowRunFilter | None" = None,
376 task_run_filter: "TaskRunFilter | None" = None,
377 deployment_filter: "DeploymentFilter | None" = None,
378 work_pool_filter: "WorkPoolFilter | None" = None,
379 work_queue_filter: "WorkQueueFilter | None" = None,
380 limit: int | None = None,
381 sort: "DeploymentSort | None" = None,
382 offset: int = 0,
383 ) -> list["DeploymentResponse"]:
384 """
385 Query the Prefect API for deployments. Only deployments matching all
386 the provided criteria will be returned.
388 Args:
389 flow_filter: filter criteria for flows
390 flow_run_filter: filter criteria for flow runs
391 task_run_filter: filter criteria for task runs
392 deployment_filter: filter criteria for deployments
393 work_pool_filter: filter criteria for work pools
394 work_queue_filter: filter criteria for work pool queues
395 limit: a limit for the deployment query
396 offset: an offset for the deployment query
398 Returns:
399 a list of Deployment model representations
400 of the deployments
401 """
402 from prefect.client.schemas.responses import DeploymentResponse
404 body: dict[str, Any] = {
405 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
406 "flow_runs": (
407 flow_run_filter.model_dump(mode="json", exclude_unset=True)
408 if flow_run_filter
409 else None
410 ),
411 "task_runs": (
412 task_run_filter.model_dump(mode="json") if task_run_filter else None
413 ),
414 "deployments": (
415 deployment_filter.model_dump(mode="json") if deployment_filter else None
416 ),
417 "work_pools": (
418 work_pool_filter.model_dump(mode="json") if work_pool_filter else None
419 ),
420 "work_pool_queues": (
421 work_queue_filter.model_dump(mode="json") if work_queue_filter else None
422 ),
423 "limit": limit,
424 "offset": offset,
425 "sort": sort,
426 }
428 response = self.request("POST", "/deployments/filter", json=body)
429 return DeploymentResponse.model_validate_list(response.json())
431 def delete_deployment( 1a
432 self,
433 deployment_id: UUID,
434 ) -> None:
435 """
436 Delete deployment by id.
438 Args:
439 deployment_id: The deployment id of interest.
440 Raises:
441 ObjectNotFound: If request returns 404
442 RequestError: If requests fails
443 """
444 try:
445 self.request(
446 "DELETE",
447 "/deployments/{id}",
448 path_params={"id": deployment_id},
449 )
450 except HTTPStatusError as e:
451 if e.response.status_code == 404:
452 raise ObjectNotFound(http_exc=e) from e
453 else:
454 raise
456 def create_deployment_schedules( 1a
457 self,
458 deployment_id: UUID,
459 schedules: list[tuple["SCHEDULE_TYPES", bool]],
460 ) -> list["DeploymentSchedule"]:
461 """
462 Create deployment schedules.
464 Args:
465 deployment_id: the deployment ID
466 schedules: a list of tuples containing the schedule to create
467 and whether or not it should be active.
469 Raises:
470 RequestError: if the schedules were not created for any reason
472 Returns:
473 the list of schedules created in the backend
474 """
475 from prefect.client.schemas.actions import DeploymentScheduleCreate
476 from prefect.client.schemas.objects import DeploymentSchedule
478 deployment_schedule_create = [
479 DeploymentScheduleCreate(schedule=schedule[0], active=schedule[1])
480 for schedule in schedules
481 ]
483 json = [
484 deployment_schedule_create.model_dump(mode="json")
485 for deployment_schedule_create in deployment_schedule_create
486 ]
487 response = self.request(
488 "POST",
489 "/deployments/{id}/schedules",
490 path_params={"id": deployment_id},
491 json=json,
492 )
493 return DeploymentSchedule.model_validate_list(response.json())
495 def read_deployment_schedules( 1a
496 self,
497 deployment_id: UUID,
498 ) -> list["DeploymentSchedule"]:
499 """
500 Query the Prefect API for a deployment's schedules.
502 Args:
503 deployment_id: the deployment ID
505 Returns:
506 a list of DeploymentSchedule model representations of the deployment schedules
507 """
508 from prefect.client.schemas.objects import DeploymentSchedule
510 try:
511 response = self.request(
512 "GET",
513 "/deployments/{id}/schedules",
514 path_params={"id": deployment_id},
515 )
516 except HTTPStatusError as e:
517 if e.response.status_code == 404:
518 raise ObjectNotFound(http_exc=e) from e
519 else:
520 raise
521 return DeploymentSchedule.model_validate_list(response.json())
523 def update_deployment_schedule( 1a
524 self,
525 deployment_id: UUID,
526 schedule_id: UUID,
527 active: bool | None = None,
528 schedule: "SCHEDULE_TYPES | None" = None,
529 ) -> None:
530 """
531 Update a deployment schedule by ID.
533 Args:
534 deployment_id: the deployment ID
535 schedule_id: the deployment schedule ID of interest
536 active: whether or not the schedule should be active
537 schedule: the cron, rrule, or interval schedule this deployment schedule should use
538 """
539 from prefect.client.schemas.actions import DeploymentScheduleUpdate
541 kwargs: dict[str, Any] = {}
542 if active is not None:
543 kwargs["active"] = active
544 if schedule is not None:
545 kwargs["schedule"] = schedule
547 deployment_schedule_update = DeploymentScheduleUpdate(**kwargs)
548 json = deployment_schedule_update.model_dump(mode="json", exclude_unset=True)
550 try:
551 self.request(
552 "PATCH",
553 "/deployments/{id}/schedules/{schedule_id}",
554 path_params={"id": deployment_id, "schedule_id": schedule_id},
555 json=json,
556 )
557 except HTTPStatusError as e:
558 if e.response.status_code == 404:
559 raise ObjectNotFound(http_exc=e) from e
560 else:
561 raise
563 def delete_deployment_schedule( 1a
564 self,
565 deployment_id: UUID,
566 schedule_id: UUID,
567 ) -> None:
568 """
569 Delete a deployment schedule.
571 Args:
572 deployment_id: the deployment ID
573 schedule_id: the ID of the deployment schedule to delete.
575 Raises:
576 RequestError: if the schedules were not deleted for any reason
577 """
578 try:
579 self.request(
580 "DELETE",
581 "/deployments/{id}/schedules/{schedule_id}",
582 path_params={"id": deployment_id, "schedule_id": schedule_id},
583 )
584 except HTTPStatusError as e:
585 if e.response.status_code == 404:
586 raise ObjectNotFound(http_exc=e) from e
587 else:
588 raise
590 def get_scheduled_flow_runs_for_deployments( 1a
591 self,
592 deployment_ids: list[UUID],
593 scheduled_before: "datetime.datetime | None" = None,
594 limit: int | None = None,
595 ) -> list["FlowRunResponse"]:
596 from prefect.client.schemas.responses import FlowRunResponse
598 body: dict[str, Any] = dict(deployment_ids=[str(id) for id in deployment_ids])
599 if scheduled_before:
600 body["scheduled_before"] = str(scheduled_before)
601 if limit:
602 body["limit"] = limit
604 response = self.request(
605 "POST",
606 "/deployments/get_scheduled_flow_runs",
607 json=body,
608 )
610 return FlowRunResponse.model_validate_list(response.json())
612 def create_flow_run_from_deployment( 1a
613 self,
614 deployment_id: UUID,
615 *,
616 parameters: dict[str, Any] | None = None,
617 context: dict[str, Any] | None = None,
618 state: State[Any] | None = None,
619 name: str | None = None,
620 tags: Iterable[str] | None = None,
621 idempotency_key: str | None = None,
622 parent_task_run_id: UUID | None = None,
623 work_queue_name: str | None = None,
624 job_variables: dict[str, Any] | None = None,
625 labels: "KeyValueLabelsField | None" = None,
626 ) -> "FlowRun":
627 """
628 Create a flow run for a deployment.
630 Args:
631 deployment_id: The deployment ID to create the flow run from
632 parameters: Parameter overrides for this flow run. Merged with the
633 deployment defaults
634 context: Optional run context data
635 state: The initial state for the run. If not provided, defaults to
636 `Scheduled` for now. Should always be a `Scheduled` type.
637 name: An optional name for the flow run. If not provided, the server will
638 generate a name.
639 tags: An optional iterable of tags to apply to the flow run; these tags
640 are merged with the deployment's tags.
641 idempotency_key: Optional idempotency key for creation of the flow run.
642 If the key matches the key of an existing flow run, the existing run will
643 be returned instead of creating a new one.
644 parent_task_run_id: if a subflow run is being created, the placeholder task
645 run identifier in the parent flow
646 work_queue_name: An optional work queue name to add this run to. If not provided,
647 will default to the deployment's set work queue. If one is provided that does not
648 exist, a new work queue will be created within the deployment's work pool.
649 job_variables: Optional variables that will be supplied to the flow run job.
651 Raises:
652 RequestError: if the Prefect API does not successfully create a run for any reason
654 Returns:
655 The flow run model
656 """
657 from prefect.client.schemas.actions import DeploymentFlowRunCreate
658 from prefect.client.schemas.objects import FlowRun
659 from prefect.states import Scheduled, to_state_create
661 parameters = parameters or {}
662 context = context or {}
663 state = state or Scheduled()
664 tags = tags or []
665 labels = labels or {}
667 flow_run_create = DeploymentFlowRunCreate(
668 parameters=parameters,
669 context=context,
670 state=to_state_create(state),
671 tags=list(tags),
672 name=name,
673 idempotency_key=idempotency_key,
674 parent_task_run_id=parent_task_run_id,
675 job_variables=job_variables,
676 labels=labels,
677 )
679 # done separately to avoid including this field in payloads sent to older API versions
680 if work_queue_name:
681 flow_run_create.work_queue_name = work_queue_name
683 response = self.request(
684 "POST",
685 "/deployments/{id}/create_flow_run",
686 path_params={"id": deployment_id},
687 json=flow_run_create.model_dump(mode="json", exclude_unset=True),
688 )
689 return FlowRun.model_validate(response.json())
691 def create_deployment_branch( 1a
692 self,
693 deployment_id: UUID,
694 branch: str,
695 options: "DeploymentBranchingOptions | None" = None,
696 overrides: "DeploymentUpdate | None" = None,
697 ) -> UUID:
698 from prefect.client.schemas.actions import DeploymentBranch
699 from prefect.client.schemas.objects import DeploymentBranchingOptions
701 response = self.request(
702 "POST",
703 "/deployments/{id}/branch",
704 path_params={"id": deployment_id},
705 json=DeploymentBranch(
706 branch=branch,
707 options=options or DeploymentBranchingOptions(),
708 overrides=overrides,
709 ).model_dump(mode="json", exclude_unset=True),
710 )
711 return UUID(response.json().get("id"))
714class DeploymentAsyncClient(BaseAsyncClient): 1a
715 async def create_deployment( 1a
716 self,
717 flow_id: UUID,
718 name: str,
719 version: str | None = None,
720 version_info: "VersionInfo | None" = None,
721 schedules: list["DeploymentScheduleCreate"] | None = None,
722 concurrency_limit: int | None = None,
723 concurrency_options: "ConcurrencyOptions | None" = None,
724 parameters: dict[str, Any] | None = None,
725 description: str | None = None,
726 work_queue_name: str | None = None,
727 work_pool_name: str | None = None,
728 tags: list[str] | None = None,
729 storage_document_id: UUID | None = None,
730 path: str | None = None,
731 entrypoint: str | None = None,
732 infrastructure_document_id: UUID | None = None,
733 parameter_openapi_schema: dict[str, Any] | None = None,
734 paused: bool | None = None,
735 pull_steps: list[dict[str, Any]] | None = None,
736 enforce_parameter_schema: bool | None = None,
737 job_variables: dict[str, Any] | None = None,
738 branch: str | None = None,
739 base: UUID | None = None,
740 root: UUID | None = None,
741 ) -> UUID:
742 """
743 Create a deployment.
745 Args:
746 flow_id: the flow ID to create a deployment for
747 name: the name of the deployment
748 version: an optional version string for the deployment
749 tags: an optional list of tags to apply to the deployment
750 storage_document_id: an reference to the storage block document
751 used for the deployed flow
752 infrastructure_document_id: an reference to the infrastructure block document
753 to use for this deployment
754 job_variables: A dictionary of dot delimited infrastructure overrides that
755 will be applied at runtime; for example `env.CONFIG_KEY=config_value` or
756 `namespace='prefect'`. This argument was previously named `infra_overrides`.
757 Both arguments are supported for backwards compatibility.
759 Raises:
760 RequestError: if the deployment was not created for any reason
762 Returns:
763 the ID of the deployment in the backend
764 """
766 from prefect.client.schemas.actions import DeploymentCreate
768 deployment_create = DeploymentCreate(
769 flow_id=flow_id,
770 name=name,
771 version=version,
772 version_info=version_info,
773 parameters=dict(parameters or {}),
774 tags=list(tags or []),
775 work_queue_name=work_queue_name,
776 description=description,
777 storage_document_id=storage_document_id,
778 path=path,
779 entrypoint=entrypoint,
780 infrastructure_document_id=infrastructure_document_id,
781 job_variables=dict(job_variables or {}),
782 parameter_openapi_schema=parameter_openapi_schema or {},
783 paused=paused,
784 schedules=schedules or [],
785 concurrency_limit=concurrency_limit,
786 concurrency_options=concurrency_options,
787 pull_steps=pull_steps,
788 enforce_parameter_schema=enforce_parameter_schema,
789 branch=branch,
790 base=base,
791 root=root,
792 )
794 if work_pool_name is not None:
795 deployment_create.work_pool_name = work_pool_name
797 # Exclude newer fields that are not set to avoid compatibility issues
798 exclude = {
799 field
800 for field in [
801 "work_pool_name",
802 "work_queue_name",
803 ]
804 if field not in deployment_create.model_fields_set
805 }
807 exclude_if_none = [
808 "paused",
809 "pull_steps",
810 "enforce_parameter_schema",
811 "version_info",
812 "branch",
813 "base",
814 "root",
815 ]
817 for field in exclude_if_none:
818 if getattr(deployment_create, field) is None:
819 exclude.add(field)
821 payload = deployment_create.model_dump(mode="json", exclude=exclude)
822 if deployment_create.version_info:
823 payload["version_info"] = deployment_create.version_info.model_dump(
824 mode="json"
825 )
827 try:
828 response = await self.request("POST", "/deployments/", json=payload)
829 except HTTPStatusError as e:
830 if e.response.status_code == 403 and "maximum number of deployments" in str(
831 e
832 ):
833 raise ObjectLimitReached(http_exc=e) from e
834 if e.response.status_code == 409:
835 raise ObjectAlreadyExists(http_exc=e) from e
836 else:
837 raise
839 deployment_id = response.json().get("id")
840 if not deployment_id:
841 raise RequestError(f"Malformed response: {response}")
843 return UUID(deployment_id)
845 async def _set_deployment_paused_state( 1a
846 self, deployment_id: UUID, paused: bool
847 ) -> None:
848 await self.request(
849 "PATCH",
850 "/deployments/{id}",
851 path_params={"id": deployment_id},
852 json={"paused": paused},
853 )
855 @deprecated_callable( 1a
856 start_date="Jun 2025",
857 help="Use pause_deployment or resume_deployment instead.",
858 )
859 async def set_deployment_paused_state( 1a
860 self, deployment_id: UUID, paused: bool
861 ) -> None:
862 """
863 DEPRECATED: Use pause_deployment or resume_deployment instead.
865 Set the paused state of a deployment.
867 Args:
868 deployment_id: the deployment ID to update
869 paused: whether the deployment should be paused
870 """
871 await self._set_deployment_paused_state(deployment_id, paused)
873 async def pause_deployment(self, deployment_id: Union[UUID, str]) -> None: 1a
874 """
875 Pause a deployment by ID.
877 Args:
878 deployment_id: The deployment ID of interest (can be a UUID or a string).
880 Raises:
881 ObjectNotFound: If request returns 404
882 RequestError: If request fails
883 """
884 if not isinstance(deployment_id, UUID):
885 try:
886 deployment_id = UUID(deployment_id)
887 except ValueError:
888 raise ValueError(f"Invalid deployment ID: {deployment_id}")
890 try:
891 await self._set_deployment_paused_state(deployment_id, paused=True)
892 except HTTPStatusError as e:
893 if e.response.status_code == 404:
894 raise ObjectNotFound(http_exc=e) from e
895 else:
896 raise
898 async def resume_deployment(self, deployment_id: Union[UUID, str]) -> None: 1a
899 """
900 Resume (unpause) a deployment by ID.
902 Args:
903 deployment_id: The deployment ID of interest (can be a UUID or a string).
905 Raises:
906 ObjectNotFound: If request returns 404
907 RequestError: If request fails
908 """
909 if not isinstance(deployment_id, UUID):
910 try:
911 deployment_id = UUID(deployment_id)
912 except ValueError:
913 raise ValueError(f"Invalid deployment ID: {deployment_id}")
915 try:
916 await self._set_deployment_paused_state(deployment_id, paused=False)
917 except HTTPStatusError as e:
918 if e.response.status_code == 404:
919 raise ObjectNotFound(http_exc=e) from e
920 else:
921 raise
923 async def update_deployment( 1a
924 self,
925 deployment_id: UUID,
926 deployment: "DeploymentUpdate",
927 ) -> None:
928 exclude_if_none = [
929 "version_info",
930 ]
932 exclude = {"name", "flow_name", "triggers"}
933 for field in exclude_if_none:
934 if getattr(deployment, field) is None:
935 exclude.add(field)
937 payload = deployment.model_dump(
938 mode="json",
939 exclude_unset=True,
940 exclude=exclude,
941 )
942 if deployment.version_info:
943 payload["version_info"] = deployment.version_info.model_dump(mode="json")
945 await self.request(
946 "PATCH",
947 "/deployments/{id}",
948 path_params={"id": deployment_id},
949 json=payload,
950 )
952 async def _create_deployment_from_schema(self, schema: "DeploymentCreate") -> UUID: 1a
953 """
954 Create a deployment from a prepared `DeploymentCreate` schema.
955 """
957 # TODO: We are likely to remove this method once we have considered the
958 # packaging interface for deployments further.
959 response = await self.request(
960 "POST", "/deployments/", json=schema.model_dump(mode="json")
961 )
962 deployment_id = response.json().get("id")
963 if not deployment_id:
964 raise RequestError(f"Malformed response: {response}")
966 return UUID(deployment_id)
968 async def read_deployment( 1a
969 self,
970 deployment_id: Union[UUID, str],
971 ) -> "DeploymentResponse":
972 """
973 Query the Prefect API for a deployment by id.
975 Args:
976 deployment_id: the deployment ID of interest
978 Returns:
979 a Deployment model representation of the deployment
980 """
982 from prefect.client.schemas.responses import DeploymentResponse
984 if not isinstance(deployment_id, UUID):
985 try:
986 deployment_id = UUID(deployment_id)
987 except ValueError:
988 raise ValueError(f"Invalid deployment ID: {deployment_id}")
990 try:
991 response = await self.request(
992 "GET",
993 "/deployments/{id}",
994 path_params={"id": deployment_id},
995 )
996 except HTTPStatusError as e:
997 if e.response.status_code == 404:
998 raise ObjectNotFound(http_exc=e) from e
999 else:
1000 raise
1001 return DeploymentResponse.model_validate(response.json())
1003 async def read_deployment_by_name( 1a
1004 self,
1005 name: str,
1006 ) -> "DeploymentResponse":
1007 """
1008 Query the Prefect API for a deployment by name.
1010 Args:
1011 name: A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME>
1013 Raises:
1014 ObjectNotFound: If request returns 404
1015 RequestError: If request fails
1017 Returns:
1018 a Deployment model representation of the deployment
1019 """
1020 from prefect.client.schemas.responses import DeploymentResponse
1022 try:
1023 flow_name, deployment_name = name.split("/")
1024 response = await self.request(
1025 "GET",
1026 "/deployments/name/{flow_name}/{deployment_name}",
1027 path_params={
1028 "flow_name": flow_name,
1029 "deployment_name": deployment_name,
1030 },
1031 )
1032 except (HTTPStatusError, ValueError) as e:
1033 if isinstance(e, HTTPStatusError) and e.response.status_code == 404:
1034 raise ObjectNotFound(http_exc=e) from e
1035 elif isinstance(e, ValueError):
1036 raise ValueError(
1037 f"Invalid deployment name format: {name}. Expected format: <FLOW_NAME>/<DEPLOYMENT_NAME>"
1038 ) from e
1039 else:
1040 raise
1042 return DeploymentResponse.model_validate(response.json())
1044 async def read_deployments( 1a
1045 self,
1046 *,
1047 flow_filter: "FlowFilter | None" = None,
1048 flow_run_filter: "FlowRunFilter | None" = None,
1049 task_run_filter: "TaskRunFilter | None" = None,
1050 deployment_filter: "DeploymentFilter | None" = None,
1051 work_pool_filter: "WorkPoolFilter | None" = None,
1052 work_queue_filter: "WorkQueueFilter | None" = None,
1053 limit: int | None = None,
1054 sort: "DeploymentSort | None" = None,
1055 offset: int = 0,
1056 ) -> list["DeploymentResponse"]:
1057 """
1058 Query the Prefect API for deployments. Only deployments matching all
1059 the provided criteria will be returned.
1061 Args:
1062 flow_filter: filter criteria for flows
1063 flow_run_filter: filter criteria for flow runs
1064 task_run_filter: filter criteria for task runs
1065 deployment_filter: filter criteria for deployments
1066 work_pool_filter: filter criteria for work pools
1067 work_queue_filter: filter criteria for work pool queues
1068 limit: a limit for the deployment query
1069 offset: an offset for the deployment query
1071 Returns:
1072 a list of Deployment model representations
1073 of the deployments
1074 """
1075 from prefect.client.schemas.responses import DeploymentResponse
1077 body: dict[str, Any] = {
1078 "flows": flow_filter.model_dump(mode="json") if flow_filter else None,
1079 "flow_runs": (
1080 flow_run_filter.model_dump(mode="json", exclude_unset=True)
1081 if flow_run_filter
1082 else None
1083 ),
1084 "task_runs": (
1085 task_run_filter.model_dump(mode="json") if task_run_filter else None
1086 ),
1087 "deployments": (
1088 deployment_filter.model_dump(mode="json") if deployment_filter else None
1089 ),
1090 "work_pools": (
1091 work_pool_filter.model_dump(mode="json") if work_pool_filter else None
1092 ),
1093 "work_pool_queues": (
1094 work_queue_filter.model_dump(mode="json") if work_queue_filter else None
1095 ),
1096 "limit": limit,
1097 "offset": offset,
1098 "sort": sort,
1099 }
1101 response = await self.request("POST", "/deployments/filter", json=body)
1102 return DeploymentResponse.model_validate_list(response.json())
1104 async def delete_deployment( 1a
1105 self,
1106 deployment_id: UUID,
1107 ) -> None:
1108 """
1109 Delete deployment by id.
1111 Args:
1112 deployment_id: The deployment id of interest.
1113 Raises:
1114 ObjectNotFound: If request returns 404
1115 RequestError: If requests fails
1116 """
1117 try:
1118 await self.request(
1119 "DELETE",
1120 "/deployments/{id}",
1121 path_params={"id": deployment_id},
1122 )
1123 except HTTPStatusError as e:
1124 if e.response.status_code == 404:
1125 raise ObjectNotFound(http_exc=e) from e
1126 else:
1127 raise
1129 async def create_deployment_schedules( 1a
1130 self,
1131 deployment_id: UUID,
1132 schedules: list[tuple["SCHEDULE_TYPES", bool]],
1133 ) -> list["DeploymentSchedule"]:
1134 """
1135 Create deployment schedules.
1137 Args:
1138 deployment_id: the deployment ID
1139 schedules: a list of tuples containing the schedule to create
1140 and whether or not it should be active.
1142 Raises:
1143 RequestError: if the schedules were not created for any reason
1145 Returns:
1146 the list of schedules created in the backend
1147 """
1148 from prefect.client.schemas.actions import DeploymentScheduleCreate
1149 from prefect.client.schemas.objects import DeploymentSchedule
1151 deployment_schedule_create = [
1152 DeploymentScheduleCreate(schedule=schedule[0], active=schedule[1])
1153 for schedule in schedules
1154 ]
1156 json = [
1157 deployment_schedule_create.model_dump(mode="json")
1158 for deployment_schedule_create in deployment_schedule_create
1159 ]
1160 response = await self.request(
1161 "POST",
1162 "/deployments/{id}/schedules",
1163 path_params={"id": deployment_id},
1164 json=json,
1165 )
1166 return DeploymentSchedule.model_validate_list(response.json())
1168 async def read_deployment_schedules( 1a
1169 self,
1170 deployment_id: UUID,
1171 ) -> list["DeploymentSchedule"]:
1172 """
1173 Query the Prefect API for a deployment's schedules.
1175 Args:
1176 deployment_id: the deployment ID
1178 Returns:
1179 a list of DeploymentSchedule model representations of the deployment schedules
1180 """
1181 from prefect.client.schemas.objects import DeploymentSchedule
1183 try:
1184 response = await self.request(
1185 "GET",
1186 "/deployments/{id}/schedules",
1187 path_params={"id": deployment_id},
1188 )
1189 except HTTPStatusError as e:
1190 if e.response.status_code == 404:
1191 raise ObjectNotFound(http_exc=e) from e
1192 else:
1193 raise
1194 return DeploymentSchedule.model_validate_list(response.json())
1196 async def update_deployment_schedule( 1a
1197 self,
1198 deployment_id: UUID,
1199 schedule_id: UUID,
1200 active: bool | None = None,
1201 schedule: "SCHEDULE_TYPES | None" = None,
1202 ) -> None:
1203 """
1204 Update a deployment schedule by ID.
1206 Args:
1207 deployment_id: the deployment ID
1208 schedule_id: the deployment schedule ID of interest
1209 active: whether or not the schedule should be active
1210 schedule: the cron, rrule, or interval schedule this deployment schedule should use
1211 """
1212 from prefect.client.schemas.actions import DeploymentScheduleUpdate
1214 kwargs: dict[str, Any] = {}
1215 if active is not None:
1216 kwargs["active"] = active
1217 if schedule is not None:
1218 kwargs["schedule"] = schedule
1220 deployment_schedule_update = DeploymentScheduleUpdate(**kwargs)
1221 json = deployment_schedule_update.model_dump(mode="json", exclude_unset=True)
1223 try:
1224 await self.request(
1225 "PATCH",
1226 "/deployments/{id}/schedules/{schedule_id}",
1227 path_params={"id": deployment_id, "schedule_id": schedule_id},
1228 json=json,
1229 )
1230 except HTTPStatusError as e:
1231 if e.response.status_code == 404:
1232 raise ObjectNotFound(http_exc=e) from e
1233 else:
1234 raise
1236 async def delete_deployment_schedule( 1a
1237 self,
1238 deployment_id: UUID,
1239 schedule_id: UUID,
1240 ) -> None:
1241 """
1242 Delete a deployment schedule.
1244 Args:
1245 deployment_id: the deployment ID
1246 schedule_id: the ID of the deployment schedule to delete.
1248 Raises:
1249 RequestError: if the schedules were not deleted for any reason
1250 """
1251 try:
1252 await self.request(
1253 "DELETE",
1254 "/deployments/{id}/schedules/{schedule_id}",
1255 path_params={"id": deployment_id, "schedule_id": schedule_id},
1256 )
1257 except HTTPStatusError as e:
1258 if e.response.status_code == 404:
1259 raise ObjectNotFound(http_exc=e) from e
1260 else:
1261 raise
1263 async def get_scheduled_flow_runs_for_deployments( 1a
1264 self,
1265 deployment_ids: list[UUID],
1266 scheduled_before: "datetime.datetime | None" = None,
1267 limit: int | None = None,
1268 ) -> list["FlowRun"]:
1269 from prefect.client.schemas.objects import FlowRun
1271 body: dict[str, Any] = dict(deployment_ids=[str(id) for id in deployment_ids])
1272 if scheduled_before:
1273 body["scheduled_before"] = str(scheduled_before)
1274 if limit:
1275 body["limit"] = limit
1277 response = await self.request(
1278 "POST",
1279 "/deployments/get_scheduled_flow_runs",
1280 json=body,
1281 )
1283 return FlowRun.model_validate_list(response.json())
1285 async def create_flow_run_from_deployment( 1a
1286 self,
1287 deployment_id: UUID,
1288 *,
1289 parameters: dict[str, Any] | None = None,
1290 context: dict[str, Any] | None = None,
1291 state: State[Any] | None = None,
1292 name: str | None = None,
1293 tags: Iterable[str] | None = None,
1294 idempotency_key: str | None = None,
1295 parent_task_run_id: UUID | None = None,
1296 work_queue_name: str | None = None,
1297 job_variables: dict[str, Any] | None = None,
1298 labels: "KeyValueLabelsField | None" = None,
1299 ) -> "FlowRun":
1300 """
1301 Create a flow run for a deployment.
1303 Args:
1304 deployment_id: The deployment ID to create the flow run from
1305 parameters: Parameter overrides for this flow run. Merged with the
1306 deployment defaults
1307 context: Optional run context data
1308 state: The initial state for the run. If not provided, defaults to
1309 `Scheduled` for now. Should always be a `Scheduled` type.
1310 name: An optional name for the flow run. If not provided, the server will
1311 generate a name.
1312 tags: An optional iterable of tags to apply to the flow run; these tags
1313 are merged with the deployment's tags.
1314 idempotency_key: Optional idempotency key for creation of the flow run.
1315 If the key matches the key of an existing flow run, the existing run will
1316 be returned instead of creating a new one.
1317 parent_task_run_id: if a subflow run is being created, the placeholder task
1318 run identifier in the parent flow
1319 work_queue_name: An optional work queue name to add this run to. If not provided,
1320 will default to the deployment's set work queue. If one is provided that does not
1321 exist, a new work queue will be created within the deployment's work pool.
1322 job_variables: Optional variables that will be supplied to the flow run job.
1324 Raises:
1325 RequestError: if the Prefect API does not successfully create a run for any reason
1327 Returns:
1328 The flow run model
1329 """
1330 from prefect.client.schemas.actions import DeploymentFlowRunCreate
1331 from prefect.client.schemas.objects import FlowRun
1332 from prefect.states import Scheduled, to_state_create
1334 parameters = parameters or {}
1335 context = context or {}
1336 state = state or Scheduled()
1337 tags = tags or []
1338 labels = labels or {}
1340 flow_run_create = DeploymentFlowRunCreate(
1341 parameters=parameters,
1342 context=context,
1343 state=to_state_create(state),
1344 tags=list(tags),
1345 name=name,
1346 idempotency_key=idempotency_key,
1347 parent_task_run_id=parent_task_run_id,
1348 job_variables=job_variables,
1349 labels=labels,
1350 )
1352 # done separately to avoid including this field in payloads sent to older API versions
1353 if work_queue_name:
1354 flow_run_create.work_queue_name = work_queue_name
1356 response = await self.request(
1357 "POST",
1358 "/deployments/{id}/create_flow_run",
1359 path_params={"id": deployment_id},
1360 json=flow_run_create.model_dump(mode="json", exclude_unset=True),
1361 )
1362 return FlowRun.model_validate(response.json())
1364 async def create_deployment_branch( 1a
1365 self,
1366 deployment_id: UUID,
1367 branch: str,
1368 options: "DeploymentBranchingOptions | None" = None,
1369 overrides: "DeploymentUpdate | None" = None,
1370 ) -> UUID:
1371 from prefect.client.schemas.actions import DeploymentBranch
1372 from prefect.client.schemas.objects import DeploymentBranchingOptions
1374 response = await self.request(
1375 "POST",
1376 "/deployments/{id}/branch",
1377 path_params={"id": deployment_id},
1378 json=DeploymentBranch(
1379 branch=branch,
1380 options=options or DeploymentBranchingOptions(),
1381 overrides=overrides,
1382 ).model_dump(mode="json", exclude_unset=True),
1383 )
1384 return UUID(response.json().get("id"))