Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/runner.py: 20%
380 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Objects for creating and configuring deployments for flows using `serve` functionality.
4Example:
5 ```python
6 import time
7 from prefect import flow, serve
10 @flow
11 def slow_flow(sleep: int = 60):
12 "Sleepy flow - sleeps the provided amount of time (in seconds)."
13 time.sleep(sleep)
16 @flow
17 def fast_flow():
18 "Fastest flow this side of the Mississippi."
19 return
22 if __name__ == "__main__":
23 # to_deployment creates RunnerDeployment instances
24 slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45)
25 fast_deploy = fast_flow.to_deployment(name="fast")
27 serve(slow_deploy, fast_deploy)
28 ```
30"""
32from __future__ import annotations 1a
34import importlib 1a
35import tempfile 1a
36from datetime import datetime, timedelta 1a
37from pathlib import Path 1a
38from typing import ( 1a
39 TYPE_CHECKING,
40 Any,
41 ClassVar,
42 Iterable,
43 List,
44 Optional,
45 Union,
46)
47from uuid import UUID 1a
49from pydantic import ( 1a
50 BaseModel,
51 ConfigDict,
52 Field,
53 PrivateAttr,
54 field_validator,
55 model_validator,
56)
57from rich.console import Console 1a
58from rich.progress import Progress, SpinnerColumn, TextColumn, track 1a
59from rich.table import Table 1a
60from typing_extensions import Self 1a
62from prefect._experimental.sla.objects import SlaTypes 1a
63from prefect._internal.compatibility.async_dispatch import async_dispatch 1a
64from prefect._internal.concurrency.api import create_call, from_async 1a
65from prefect._internal.schemas.validators import ( 1a
66 reconcile_paused_deployment,
67 reconcile_schedules_runner,
68)
69from prefect._versioning import VersionType, get_inferred_version_info 1a
70from prefect.client.base import ServerType 1a
71from prefect.client.orchestration import PrefectClient, get_client 1a
72from prefect.client.schemas.actions import ( 1a
73 DeploymentScheduleCreate,
74 DeploymentScheduleUpdate,
75 DeploymentUpdate,
76)
77from prefect.client.schemas.filters import WorkerFilter, WorkerFilterStatus 1a
78from prefect.client.schemas.objects import ( 1a
79 ConcurrencyLimitConfig,
80 ConcurrencyOptions,
81 VersionInfo,
82)
83from prefect.client.schemas.schedules import ( 1a
84 SCHEDULE_TYPES,
85 construct_schedule,
86)
87from prefect.deployments.schedules import ( 1a
88 create_deployment_schedule_create,
89)
90from prefect.docker.docker_image import DockerImage 1a
91from prefect.events import DeploymentTriggerTypes, TriggerTypes 1a
92from prefect.exceptions import ( 1a
93 ObjectNotFound,
94 PrefectHTTPStatusError,
95)
96from prefect.runner.storage import RunnerStorage 1a
97from prefect.schedules import Schedule 1a
98from prefect.settings import ( 1a
99 PREFECT_DEFAULT_WORK_POOL_NAME,
100 PREFECT_UI_URL,
101)
102from prefect.types import ListOfNonEmptyStrings 1a
103from prefect.types.entrypoint import EntrypointType 1a
104from prefect.utilities.annotations import freeze 1a
105from prefect.utilities.asyncutils import run_coro_as_sync, sync_compatible 1a
106from prefect.utilities.callables import ParameterSchema, parameter_schema 1a
107from prefect.utilities.collections import get_from_dict, isiterable 1a
108from prefect.utilities.dockerutils import ( 1a
109 parse_image_tag,
110)
112if TYPE_CHECKING: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true1a
113 from prefect.client.types.flexible_schedule_list import FlexibleScheduleList
114 from prefect.flows import Flow
116__all__ = ["RunnerDeployment"] 1a
119class DeploymentApplyError(RuntimeError): 1a
120 """
121 Raised when an error occurs while applying a deployment.
122 """
125class RunnerDeployment(BaseModel): 1a
126 """
127 A Prefect RunnerDeployment definition, used for specifying and building deployments.
129 Attributes:
130 name: A name for the deployment (required).
131 version: An optional version for the deployment; defaults to the flow's version
132 description: An optional description of the deployment; defaults to the flow's
133 description
134 tags: An optional list of tags to associate with this deployment; note that tags
135 are used only for organizational purposes. For delegating work to workers,
136 see `work_queue_name`.
137 schedule: A schedule to run this deployment on, once registered
138 parameters: A dictionary of parameter values to pass to runs created from this
139 deployment
140 path: The path to the working directory for the workflow, relative to remote
141 storage or, if stored on a local filesystem, an absolute path
142 entrypoint: The path to the entrypoint for the workflow, always relative to the
143 `path`
144 parameter_openapi_schema: The parameter schema of the flow, including defaults.
145 enforce_parameter_schema: Whether or not the Prefect API should enforce the
146 parameter schema for this deployment.
147 work_pool_name: The name of the work pool to use for this deployment.
148 work_queue_name: The name of the work queue to use for this deployment's scheduled runs.
149 If not provided the default work queue for the work pool will be used.
150 job_variables: Settings used to override the values specified default base job template
151 of the chosen work pool. Refer to the base job template of the chosen work pool for
152 available settings.
153 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
154 """
156 model_config: ClassVar[ConfigDict] = ConfigDict( 1a
157 arbitrary_types_allowed=True, validate_assignment=True
158 )
160 name: str = Field(..., description="The name of the deployment.") 1a
161 flow_name: Optional[str] = Field( 1a
162 None, description="The name of the underlying flow; typically inferred."
163 )
164 description: Optional[str] = Field( 1a
165 default=None, description="An optional description of the deployment."
166 )
167 version: Optional[str] = Field( 1a
168 default=None, description="An optional version for the deployment."
169 )
170 version_type: Optional[VersionType] = Field( 1a
171 default=None,
172 description=(
173 "The type of version information to use for the deployment. The version type"
174 " will be inferred if not provided."
175 ),
176 )
177 tags: ListOfNonEmptyStrings = Field( 1a
178 default_factory=list,
179 description="One of more tags to apply to this deployment.",
180 )
181 schedules: Optional[ 1a
182 List[Union[DeploymentScheduleCreate, DeploymentScheduleUpdate]]
183 ] = Field(
184 default=None,
185 description="The schedules that should cause this deployment to run.",
186 )
187 concurrency_limit: Optional[int] = Field( 1a
188 default=None,
189 description="The maximum number of concurrent runs of this deployment.",
190 )
191 concurrency_options: Optional[ConcurrencyOptions] = Field( 1a
192 default=None,
193 description="The concurrency limit config for the deployment.",
194 )
195 paused: Optional[bool] = Field( 1a
196 default=None, description="Whether or not the deployment is paused."
197 )
198 parameters: dict[str, Any] = Field(default_factory=dict) 1a
199 entrypoint: Optional[str] = Field( 1a
200 default=None,
201 description=(
202 "The path to the entrypoint for the workflow, relative to the `path`."
203 ),
204 )
205 triggers: List[Union[DeploymentTriggerTypes, TriggerTypes]] = Field( 1a
206 default_factory=list,
207 description="The triggers that should cause this deployment to run.",
208 )
209 enforce_parameter_schema: bool = Field( 1a
210 default=True,
211 description=(
212 "Whether or not the Prefect API should enforce the parameter schema for"
213 " this deployment."
214 ),
215 )
216 storage: Optional[RunnerStorage] = Field( 1a
217 default=None,
218 description=(
219 "The storage object used to retrieve flow code for this deployment."
220 ),
221 )
222 work_pool_name: Optional[str] = Field( 1a
223 default=None,
224 description=(
225 "The name of the work pool to use for this deployment. Only used when"
226 " the deployment is registered with a built runner."
227 ),
228 )
229 work_queue_name: Optional[str] = Field( 1a
230 default=None,
231 description=(
232 "The name of the work queue to use for this deployment. Only used when"
233 " the deployment is registered with a built runner."
234 ),
235 )
236 job_variables: dict[str, Any] = Field( 1a
237 default_factory=dict,
238 description=(
239 "Job variables used to override the default values of a work pool"
240 " base job template. Only used when the deployment is registered with"
241 " a built runner."
242 ),
243 )
245 # (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
246 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = PrivateAttr( 1a
247 default=None,
248 )
249 _entrypoint_type: EntrypointType = PrivateAttr( 1a
250 default=EntrypointType.FILE_PATH,
251 )
252 _path: Optional[str] = PrivateAttr( 1a
253 default=None,
254 )
255 _parameter_openapi_schema: ParameterSchema = PrivateAttr( 1a
256 default_factory=ParameterSchema,
257 )
258 _version_from_flow: bool = PrivateAttr( 1a
259 default=False,
260 )
262 @property 1a
263 def entrypoint_type(self) -> EntrypointType: 1a
264 return self._entrypoint_type
266 @property 1a
267 def full_name(self) -> str: 1a
268 return f"{self.flow_name}/{self.name}"
270 def _get_deployment_version_info( 1a
271 self, version_type: Optional[VersionType] = None
272 ) -> VersionInfo:
273 if inferred_version := run_coro_as_sync(
274 get_inferred_version_info(version_type)
275 ):
276 if not self.version or self._version_from_flow:
277 self.version = inferred_version.version # TODO: maybe reconsider
279 inferred_version.version = self.version
280 return inferred_version
282 return VersionInfo(version=self.version or "", type="prefect:simple")
284 @field_validator("name", mode="before") 1a
285 @classmethod 1a
286 def validate_name(cls, value: str) -> str: 1a
287 if value.endswith(".py"):
288 return Path(value).stem
289 return value
291 @model_validator(mode="after") 1a
292 def validate_automation_names(self): 1a
293 """Ensure that each trigger has a name for its automation if none is provided."""
294 trigger: Union[DeploymentTriggerTypes, TriggerTypes]
295 for i, trigger in enumerate(self.triggers, start=1):
296 if trigger.name is None:
297 trigger.name = f"{self.name}__automation_{i}"
298 return self
300 @model_validator(mode="after") 1a
301 def validate_deployment_parameters(self) -> Self: 1a
302 """Update the parameter schema to mark frozen parameters as readonly."""
303 if not self.parameters:
304 return self
306 for key, value in self.parameters.items():
307 if isinstance(value, freeze):
308 raw_value = value.unfreeze()
309 if key in self._parameter_openapi_schema.properties:
310 self._parameter_openapi_schema.properties[key]["readOnly"] = True
311 self._parameter_openapi_schema.properties[key]["enum"] = [raw_value]
312 self.parameters[key] = raw_value
314 return self
316 @model_validator(mode="before") 1a
317 @classmethod 1a
318 def reconcile_paused(cls, values: dict[str, Any]) -> dict[str, Any]: 1a
319 return reconcile_paused_deployment(values)
321 @model_validator(mode="before") 1a
322 @classmethod 1a
323 def reconcile_schedules(cls, values: dict[str, Any]) -> dict[str, Any]: 1a
324 return reconcile_schedules_runner(values)
326 async def _create( 1a
327 self,
328 work_pool_name: Optional[str] = None,
329 image: Optional[str] = None,
330 version_info: VersionInfo | None = None,
331 ) -> UUID:
332 work_pool_name = work_pool_name or self.work_pool_name
334 if image and not work_pool_name:
335 raise ValueError(
336 "An image can only be provided when registering a deployment with a"
337 " work pool."
338 )
340 if self.work_queue_name and not work_pool_name:
341 raise ValueError(
342 "A work queue can only be provided when registering a deployment with"
343 " a work pool."
344 )
346 if self.job_variables and not work_pool_name:
347 raise ValueError(
348 "Job variables can only be provided when registering a deployment"
349 " with a work pool."
350 )
352 async with get_client() as client:
353 flow_id = await client.create_flow_from_name(self.flow_name)
355 create_payload: dict[str, Any] = dict(
356 flow_id=flow_id,
357 name=self.name,
358 work_queue_name=self.work_queue_name,
359 work_pool_name=work_pool_name,
360 version=self.version,
361 version_info=version_info,
362 paused=self.paused,
363 schedules=self.schedules,
364 concurrency_limit=self.concurrency_limit,
365 concurrency_options=self.concurrency_options,
366 parameters=self.parameters,
367 description=self.description,
368 tags=self.tags,
369 path=self._path,
370 entrypoint=self.entrypoint,
371 storage_document_id=None,
372 infrastructure_document_id=None,
373 parameter_openapi_schema=self._parameter_openapi_schema.model_dump(
374 exclude_unset=True
375 ),
376 enforce_parameter_schema=self.enforce_parameter_schema,
377 )
379 if work_pool_name:
380 create_payload["job_variables"] = self.job_variables
381 if image:
382 create_payload["job_variables"]["image"] = image
383 create_payload["path"] = None if self.storage else self._path
384 if self.storage:
385 pull_steps = self.storage.to_pull_step()
386 if isinstance(pull_steps, list):
387 create_payload["pull_steps"] = pull_steps
388 elif pull_steps:
389 create_payload["pull_steps"] = [pull_steps]
390 else:
391 create_payload["pull_steps"] = []
392 else:
393 create_payload["pull_steps"] = []
395 try:
396 deployment_id = await client.create_deployment(**create_payload)
397 except Exception as exc:
398 if isinstance(exc, PrefectHTTPStatusError):
399 detail = exc.response.json().get("detail")
400 if detail:
401 raise DeploymentApplyError(detail) from exc
402 raise DeploymentApplyError(
403 f"Error while applying deployment: {str(exc)}"
404 ) from exc
406 await self._create_triggers(deployment_id, client)
408 # We plan to support SLA configuration on the Prefect Server in the future.
409 # For now, we only support it on Prefect Cloud.
411 # If we're provided with an empty list, we will call the apply endpoint
412 # to remove existing SLAs for the deployment. If the argument is not provided,
413 # we will not call the endpoint.
414 if self._sla or self._sla == []:
415 await self._create_slas(deployment_id, client)
417 return deployment_id
419 async def _update( 1a
420 self,
421 deployment_id: UUID,
422 client: PrefectClient,
423 version_info: VersionInfo | None,
424 ):
425 parameter_openapi_schema = self._parameter_openapi_schema.model_dump(
426 exclude_unset=True
427 )
429 update_payload = self.model_dump(
430 mode="json",
431 exclude_unset=True,
432 exclude={"storage", "name", "flow_name", "triggers", "version_type"},
433 )
435 if self.storage:
436 pull_steps = self.storage.to_pull_step()
437 if pull_steps and not isinstance(pull_steps, list):
438 pull_steps = [pull_steps]
439 update_payload["pull_steps"] = pull_steps
440 else:
441 update_payload["pull_steps"] = None
443 if self.schedules:
444 update_payload["schedules"] = [
445 schedule.model_dump(mode="json", exclude_unset=True)
446 for schedule in self.schedules
447 ]
449 await client.update_deployment(
450 deployment_id,
451 deployment=DeploymentUpdate(
452 **update_payload,
453 version_info=version_info,
454 parameter_openapi_schema=parameter_openapi_schema,
455 ),
456 )
458 await self._create_triggers(deployment_id, client)
460 # We plan to support SLA configuration on the Prefect Server in the future.
461 # For now, we only support it on Prefect Cloud.
463 # If we're provided with an empty list, we will call the apply endpoint
464 # to remove existing SLAs for the deployment. If the argument is not provided,
465 # we will not call the endpoint.
466 if self._sla or self._sla == []:
467 await self._create_slas(deployment_id, client)
469 return deployment_id
471 async def _create_triggers(self, deployment_id: UUID, client: PrefectClient): 1a
472 try:
473 # The triggers defined in the deployment spec are, essentially,
474 # anonymous and attempting truly sync them with cloud is not
475 # feasible. Instead, we remove all automations that are owned
476 # by the deployment, meaning that they were created via this
477 # mechanism below, and then recreate them.
478 await client.delete_resource_owned_automations(
479 f"prefect.deployment.{deployment_id}"
480 )
481 except PrefectHTTPStatusError as e:
482 if e.response.status_code == 404:
483 # This Prefect server does not support automations, so we can safely
484 # ignore this 404 and move on.
485 return deployment_id
486 raise e
488 for trigger in self.triggers:
489 trigger.set_deployment_id(deployment_id)
490 await client.create_automation(trigger.as_automation())
492 @sync_compatible 1a
493 async def apply( 1a
494 self,
495 schedules: Optional[List[dict[str, Any]]] = None,
496 work_pool_name: Optional[str] = None,
497 image: Optional[str] = None,
498 version_info: Optional[VersionInfo] = None,
499 ) -> UUID:
500 """
501 Registers this deployment with the API and returns the deployment's ID.
503 Args:
504 work_pool_name: The name of the work pool to use for this
505 deployment.
506 image: The registry, name, and tag of the Docker image to
507 use for this deployment. Only used when the deployment is
508 deployed to a work pool.
509 version_info: The version information to use for the deployment.
510 Returns:
511 The ID of the created deployment.
512 """
514 version_info = version_info or self._get_deployment_version_info(
515 self.version_type
516 )
518 async with get_client() as client:
519 try:
520 deployment = await client.read_deployment_by_name(self.full_name)
521 except ObjectNotFound:
522 if schedules:
523 self.schedules = [
524 DeploymentScheduleCreate(**schedule) for schedule in schedules
525 ]
526 return await self._create(work_pool_name, image, version_info)
527 else:
528 if image:
529 self.job_variables["image"] = image
530 if work_pool_name:
531 self.work_pool_name = work_pool_name
532 if schedules:
533 self.schedules = [
534 DeploymentScheduleUpdate(**schedule) for schedule in schedules
535 ]
536 return await self._update(deployment.id, client, version_info)
538 async def _create_slas(self, deployment_id: UUID, client: PrefectClient): 1a
539 if not isinstance(self._sla, list):
540 self._sla = [self._sla]
542 if client.server_type == ServerType.CLOUD:
543 await client.apply_slas_for_deployment(deployment_id, self._sla)
544 else:
545 raise ValueError(
546 "SLA configuration is currently only supported on Prefect Cloud."
547 )
549 @staticmethod 1a
550 def _construct_deployment_schedules( 1a
551 interval: Optional[
552 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]
553 ] = None,
554 anchor_date: Optional[Union[datetime, str]] = None,
555 cron: Optional[Union[Iterable[str], str]] = None,
556 rrule: Optional[Union[Iterable[str], str]] = None,
557 timezone: Optional[str] = None,
558 schedule: Union[SCHEDULE_TYPES, Schedule, None] = None,
559 schedules: Optional["FlexibleScheduleList"] = None,
560 ) -> Union[List[DeploymentScheduleCreate], "FlexibleScheduleList"]:
561 """
562 Construct a schedule or schedules from the provided arguments.
564 This method serves as a unified interface for creating deployment
565 schedules. If `schedules` is provided, it is directly returned. If
566 `schedule` is provided, it is encapsulated in a list and returned. If
567 `interval`, `cron`, or `rrule` are provided, they are used to construct
568 schedule objects.
570 Args:
571 interval: An interval on which to schedule runs, either as a single
572 value or as a list of values. Accepts numbers (interpreted as
573 seconds) or `timedelta` objects. Each value defines a separate
574 scheduling interval.
575 anchor_date: The anchor date from which interval schedules should
576 start. This applies to all intervals if a list is provided.
577 cron: A cron expression or a list of cron expressions defining cron
578 schedules. Each expression defines a separate cron schedule.
579 rrule: An rrule string or a list of rrule strings for scheduling.
580 Each string defines a separate recurrence rule.
581 timezone: The timezone to apply to the cron or rrule schedules.
582 This is a single value applied uniformly to all schedules.
583 schedule: A singular schedule object, used for advanced scheduling
584 options like specifying a timezone. This is returned as a list
585 containing this single schedule.
586 schedules: A pre-defined list of schedule objects. If provided,
587 this list is returned as-is, bypassing other schedule construction
588 logic.
589 """
590 num_schedules = sum(
591 1
592 for entry in (interval, cron, rrule, schedule, schedules)
593 if entry is not None
594 )
595 if num_schedules > 1:
596 raise ValueError(
597 "Only one of interval, cron, rrule, schedule, or schedules can be provided."
598 )
599 elif num_schedules == 0:
600 return []
602 if schedules is not None:
603 return schedules
604 elif interval or cron or rrule:
605 # `interval`, `cron`, and `rrule` can be lists of values. This
606 # block figures out which one is not None and uses that to
607 # construct the list of schedules via `construct_schedule`.
608 parameters = [("interval", interval), ("cron", cron), ("rrule", rrule)]
609 schedule_type, value = [
610 param for param in parameters if param[1] is not None
611 ][0]
613 if not isiterable(value):
614 value = [value]
616 return [
617 create_deployment_schedule_create(
618 construct_schedule(
619 **{
620 schedule_type: v,
621 "timezone": timezone,
622 "anchor_date": anchor_date,
623 }
624 )
625 )
626 for v in value
627 ]
628 else:
629 return [create_deployment_schedule_create(schedule)]
631 def _set_defaults_from_flow(self, flow: "Flow[..., Any]"): 1a
632 self._parameter_openapi_schema = parameter_schema(flow)
634 if not self.version:
635 self.version = flow.version
636 self._version_from_flow = True
637 if not self.description:
638 self.description = flow.description
640 @classmethod 1a
641 def from_flow( 1a
642 cls,
643 flow: "Flow[..., Any]",
644 name: str,
645 interval: Optional[
646 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]
647 ] = None,
648 cron: Optional[Union[Iterable[str], str]] = None,
649 rrule: Optional[Union[Iterable[str], str]] = None,
650 paused: Optional[bool] = None,
651 schedule: Optional[Schedule] = None,
652 schedules: Optional["FlexibleScheduleList"] = None,
653 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
654 parameters: Optional[dict[str, Any]] = None,
655 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
656 description: Optional[str] = None,
657 tags: Optional[List[str]] = None,
658 version: Optional[str] = None,
659 version_type: Optional[VersionType] = None,
660 enforce_parameter_schema: bool = True,
661 work_pool_name: Optional[str] = None,
662 work_queue_name: Optional[str] = None,
663 job_variables: Optional[dict[str, Any]] = None,
664 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH,
665 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental
666 ) -> "RunnerDeployment":
667 """
668 Configure a deployment for a given flow.
670 Args:
671 flow: A flow function to deploy
672 name: A name for the deployment
673 interval: An interval on which to execute the current flow. Accepts either a number
674 or a timedelta object. If a number is given, it will be interpreted as seconds.
675 cron: A cron schedule of when to execute runs of this flow.
676 rrule: An rrule schedule of when to execute runs of this flow.
677 paused: Whether or not to set this deployment as paused.
678 schedule: A schedule object defining when to execute runs of this deployment.
679 Used to provide additional scheduling options like `timezone` or `parameters`.
680 schedules: A list of schedule objects defining when to execute runs of this deployment.
681 Used to define multiple schedules or additional scheduling options like `timezone`.
682 concurrency_limit: The maximum number of concurrent runs this deployment will allow.
683 triggers: A list of triggers that should kick of a run of this flow.
684 parameters: A dictionary of default parameter values to pass to runs of this flow.
685 description: A description for the created deployment. Defaults to the flow's
686 description if not provided.
687 tags: A list of tags to associate with the created deployment for organizational
688 purposes.
689 version: A version for the created deployment. Defaults to the flow's version.
690 version_type: The type of version information to use for the deployment.
691 enforce_parameter_schema: Whether or not the Prefect API should enforce the
692 parameter schema for this deployment.
693 work_pool_name: The name of the work pool to use for this deployment.
694 work_queue_name: The name of the work queue to use for this deployment's scheduled runs.
695 If not provided the default work queue for the work pool will be used.
696 job_variables: Settings used to override the values specified default base job template
697 of the chosen work pool. Refer to the base job template of the chosen work pool for
698 available settings.
699 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
700 """
701 constructed_schedules = cls._construct_deployment_schedules(
702 interval=interval,
703 cron=cron,
704 rrule=rrule,
705 schedules=schedules,
706 schedule=schedule,
707 )
709 job_variables = job_variables or {}
711 if isinstance(concurrency_limit, ConcurrencyLimitConfig):
712 concurrency_options = {
713 "collision_strategy": concurrency_limit.collision_strategy
714 }
715 concurrency_limit = concurrency_limit.limit
716 else:
717 concurrency_options = None
719 deployment = cls(
720 name=name,
721 flow_name=flow.name,
722 schedules=constructed_schedules,
723 concurrency_limit=concurrency_limit,
724 concurrency_options=concurrency_options,
725 paused=paused,
726 tags=tags or [],
727 triggers=triggers or [],
728 parameters=parameters or {},
729 description=description,
730 version=version,
731 version_type=version_type,
732 enforce_parameter_schema=enforce_parameter_schema,
733 work_pool_name=work_pool_name,
734 work_queue_name=work_queue_name,
735 job_variables=job_variables,
736 )
737 deployment._sla = _sla
739 if not deployment.entrypoint:
740 no_file_location_error = (
741 "Flows defined interactively cannot be deployed. Check out the"
742 " quickstart guide for help getting started:"
743 " https://docs.prefect.io/latest/get-started/quickstart"
744 )
745 ## first see if an entrypoint can be determined
746 flow_file = getattr(flow, "__globals__", {}).get("__file__")
747 mod_name = getattr(flow, "__module__", None)
748 if entrypoint_type == EntrypointType.MODULE_PATH:
749 if mod_name:
750 deployment.entrypoint = f"{mod_name}.{flow.__name__}"
751 else:
752 raise ValueError(
753 "Unable to determine module path for provided flow."
754 )
755 else:
756 if not flow_file:
757 if not mod_name:
758 raise ValueError(no_file_location_error)
759 try:
760 module = importlib.import_module(mod_name)
761 flow_file = getattr(module, "__file__", None)
762 except ModuleNotFoundError:
763 raise ValueError(no_file_location_error)
764 if not flow_file:
765 raise ValueError(no_file_location_error)
767 # set entrypoint
768 entry_path = (
769 Path(flow_file).absolute().relative_to(Path.cwd().absolute())
770 )
771 deployment.entrypoint = (
772 f"{entry_path}:{getattr(flow.fn, '__qualname__', flow.fn.__name__)}"
773 )
775 if entrypoint_type == EntrypointType.FILE_PATH and not deployment._path:
776 deployment._path = "."
778 deployment._entrypoint_type = entrypoint_type
780 cls._set_defaults_from_flow(deployment, flow)
782 return deployment
784 @classmethod 1a
785 def from_entrypoint( 1a
786 cls,
787 entrypoint: str,
788 name: str,
789 flow_name: Optional[str] = None,
790 interval: Optional[
791 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]
792 ] = None,
793 cron: Optional[Union[Iterable[str], str]] = None,
794 rrule: Optional[Union[Iterable[str], str]] = None,
795 paused: Optional[bool] = None,
796 schedule: Optional[Schedule] = None,
797 schedules: Optional["FlexibleScheduleList"] = None,
798 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
799 parameters: Optional[dict[str, Any]] = None,
800 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
801 description: Optional[str] = None,
802 tags: Optional[List[str]] = None,
803 version: Optional[str] = None,
804 enforce_parameter_schema: bool = True,
805 work_pool_name: Optional[str] = None,
806 work_queue_name: Optional[str] = None,
807 job_variables: Optional[dict[str, Any]] = None,
808 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental
809 ) -> "RunnerDeployment":
810 """
811 Configure a deployment for a given flow located at a given entrypoint.
813 Args:
814 entrypoint: The path to a file containing a flow and the name of the flow function in
815 the format `./path/to/file.py:flow_func_name`.
816 name: A name for the deployment
817 flow_name: The name of the flow to deploy
818 interval: An interval on which to execute the current flow. Accepts either a number
819 or a timedelta object. If a number is given, it will be interpreted as seconds.
820 cron: A cron schedule of when to execute runs of this flow.
821 rrule: An rrule schedule of when to execute runs of this flow.
822 paused: Whether or not to set this deployment as paused.
823 schedules: A list of schedule objects defining when to execute runs of this deployment.
824 Used to define multiple schedules or additional scheduling options like `timezone`.
825 triggers: A list of triggers that should kick of a run of this flow.
826 parameters: A dictionary of default parameter values to pass to runs of this flow.
827 description: A description for the created deployment. Defaults to the flow's
828 description if not provided.
829 tags: A list of tags to associate with the created deployment for organizational
830 purposes.
831 version: A version for the created deployment. Defaults to the flow's version.
832 enforce_parameter_schema: Whether or not the Prefect API should enforce the
833 parameter schema for this deployment.
834 work_pool_name: The name of the work pool to use for this deployment.
835 work_queue_name: The name of the work queue to use for this deployment's scheduled runs.
836 If not provided the default work queue for the work pool will be used.
837 job_variables: Settings used to override the values specified default base job template
838 of the chosen work pool. Refer to the base job template of the chosen work pool for
839 available settings.
840 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
841 """
842 from prefect.flows import load_flow_from_entrypoint
844 job_variables = job_variables or {}
845 flow = load_flow_from_entrypoint(entrypoint)
847 constructed_schedules = cls._construct_deployment_schedules(
848 interval=interval,
849 cron=cron,
850 rrule=rrule,
851 schedules=schedules,
852 schedule=schedule,
853 )
855 if isinstance(concurrency_limit, ConcurrencyLimitConfig):
856 concurrency_options = {
857 "collision_strategy": concurrency_limit.collision_strategy
858 }
859 concurrency_limit = concurrency_limit.limit
860 else:
861 concurrency_options = None
863 deployment = cls(
864 name=name,
865 flow_name=flow_name or flow.name,
866 schedules=constructed_schedules,
867 concurrency_limit=concurrency_limit,
868 concurrency_options=concurrency_options,
869 paused=paused,
870 tags=tags or [],
871 triggers=triggers or [],
872 parameters=parameters or {},
873 description=description,
874 version=version,
875 entrypoint=entrypoint,
876 enforce_parameter_schema=enforce_parameter_schema,
877 work_pool_name=work_pool_name,
878 work_queue_name=work_queue_name,
879 job_variables=job_variables,
880 )
881 deployment._sla = _sla
882 deployment._path = str(Path.cwd())
884 cls._set_defaults_from_flow(deployment, flow)
886 return deployment
888 @classmethod 1a
889 async def afrom_storage( 1a
890 cls,
891 storage: RunnerStorage,
892 entrypoint: str,
893 name: str,
894 flow_name: Optional[str] = None,
895 interval: Optional[
896 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]
897 ] = None,
898 cron: Optional[Union[Iterable[str], str]] = None,
899 rrule: Optional[Union[Iterable[str], str]] = None,
900 paused: Optional[bool] = None,
901 schedule: Optional[Schedule] = None,
902 schedules: Optional["FlexibleScheduleList"] = None,
903 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
904 parameters: Optional[dict[str, Any]] = None,
905 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
906 description: Optional[str] = None,
907 tags: Optional[List[str]] = None,
908 version: Optional[str] = None,
909 version_type: Optional[VersionType] = None,
910 enforce_parameter_schema: bool = True,
911 work_pool_name: Optional[str] = None,
912 work_queue_name: Optional[str] = None,
913 job_variables: Optional[dict[str, Any]] = None,
914 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental
915 ) -> "RunnerDeployment":
916 """
917 Create a RunnerDeployment from a flow located at a given entrypoint and stored in a
918 local storage location.
920 Args:
921 entrypoint: The path to a file containing a flow and the name of the flow function in
922 the format `./path/to/file.py:flow_func_name`.
923 name: A name for the deployment
924 flow_name: The name of the flow to deploy
925 storage: A storage object to use for retrieving flow code. If not provided, a
926 URL must be provided.
927 interval: An interval on which to execute the current flow. Accepts either a number
928 or a timedelta object. If a number is given, it will be interpreted as seconds.
929 cron: A cron schedule of when to execute runs of this flow.
930 rrule: An rrule schedule of when to execute runs of this flow.
931 paused: Whether or not the deployment is paused.
932 schedule: A schedule object defining when to execute runs of this deployment.
933 Used to provide additional scheduling options like `timezone` or `parameters`.
934 schedules: A list of schedule objects defining when to execute runs of this deployment.
935 Used to provide additional scheduling options like `timezone` or `parameters`.
936 triggers: A list of triggers that should kick of a run of this flow.
937 parameters: A dictionary of default parameter values to pass to runs of this flow.
938 description: A description for the created deployment. Defaults to the flow's
939 description if not provided.
940 tags: A list of tags to associate with the created deployment for organizational
941 purposes.
942 version: A version for the created deployment. Defaults to the flow's version.
943 version_type: The type of version information to use for the deployment. The version type
944 will be inferred if not provided.
945 enforce_parameter_schema: Whether or not the Prefect API should enforce the
946 parameter schema for this deployment.
947 work_pool_name: The name of the work pool to use for this deployment.
948 work_queue_name: The name of the work queue to use for this deployment's scheduled runs.
949 If not provided the default work queue for the work pool will be used.
950 job_variables: Settings used to override the values specified default base job template
951 of the chosen work pool. Refer to the base job template of the chosen work pool for
952 available settings.
953 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
954 """
955 from prefect.flows import load_flow_from_entrypoint
957 constructed_schedules = cls._construct_deployment_schedules(
958 interval=interval,
959 cron=cron,
960 rrule=rrule,
961 schedules=schedules,
962 schedule=schedule,
963 )
965 if isinstance(concurrency_limit, ConcurrencyLimitConfig):
966 concurrency_options = {
967 "collision_strategy": concurrency_limit.collision_strategy
968 }
969 concurrency_limit = concurrency_limit.limit
970 else:
971 concurrency_options = None
973 job_variables = job_variables or {}
975 with tempfile.TemporaryDirectory() as tmpdir:
976 storage.set_base_path(Path(tmpdir))
977 await storage.pull_code()
979 full_entrypoint = str(storage.destination / entrypoint)
980 flow = await from_async.wait_for_call_in_new_thread(
981 create_call(load_flow_from_entrypoint, full_entrypoint)
982 )
984 deployment = cls(
985 name=name,
986 flow_name=flow_name or flow.name,
987 schedules=constructed_schedules,
988 concurrency_limit=concurrency_limit,
989 concurrency_options=concurrency_options,
990 paused=paused,
991 tags=tags or [],
992 triggers=triggers or [],
993 parameters=parameters or {},
994 description=description,
995 version=version,
996 version_type=version_type,
997 entrypoint=entrypoint,
998 enforce_parameter_schema=enforce_parameter_schema,
999 storage=storage,
1000 work_pool_name=work_pool_name,
1001 work_queue_name=work_queue_name,
1002 job_variables=job_variables,
1003 )
1004 deployment._sla = _sla
1005 deployment._path = str(storage.destination).replace(
1006 tmpdir, "$STORAGE_BASE_PATH"
1007 )
1009 cls._set_defaults_from_flow(deployment, flow)
1011 return deployment
1013 @classmethod 1a
1014 @async_dispatch(afrom_storage) 1a
1015 def from_storage( 1a
1016 cls,
1017 storage: RunnerStorage,
1018 entrypoint: str,
1019 name: str,
1020 flow_name: Optional[str] = None,
1021 interval: Optional[
1022 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta]
1023 ] = None,
1024 cron: Optional[Union[Iterable[str], str]] = None,
1025 rrule: Optional[Union[Iterable[str], str]] = None,
1026 paused: Optional[bool] = None,
1027 schedule: Optional[Schedule] = None,
1028 schedules: Optional["FlexibleScheduleList"] = None,
1029 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None,
1030 parameters: Optional[dict[str, Any]] = None,
1031 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None,
1032 description: Optional[str] = None,
1033 tags: Optional[List[str]] = None,
1034 version: Optional[str] = None,
1035 version_type: Optional[VersionType] = None,
1036 enforce_parameter_schema: bool = True,
1037 work_pool_name: Optional[str] = None,
1038 work_queue_name: Optional[str] = None,
1039 job_variables: Optional[dict[str, Any]] = None,
1040 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental
1041 ) -> "RunnerDeployment":
1042 """
1043 Create a RunnerDeployment from a flow located at a given entrypoint and stored in a
1044 local storage location.
1046 Args:
1047 entrypoint: The path to a file containing a flow and the name of the flow function in
1048 the format `./path/to/file.py:flow_func_name`.
1049 name: A name for the deployment
1050 flow_name: The name of the flow to deploy
1051 storage: A storage object to use for retrieving flow code. If not provided, a
1052 URL must be provided.
1053 interval: An interval on which to execute the current flow. Accepts either a number
1054 or a timedelta object. If a number is given, it will be interpreted as seconds.
1055 cron: A cron schedule of when to execute runs of this flow.
1056 rrule: An rrule schedule of when to execute runs of this flow.
1057 paused: Whether or not the deployment is paused.
1058 schedule: A schedule object defining when to execute runs of this deployment.
1059 Used to provide additional scheduling options like `timezone` or `parameters`.
1060 schedules: A list of schedule objects defining when to execute runs of this deployment.
1061 Used to provide additional scheduling options like `timezone` or `parameters`.
1062 triggers: A list of triggers that should kick of a run of this flow.
1063 parameters: A dictionary of default parameter values to pass to runs of this flow.
1064 description: A description for the created deployment. Defaults to the flow's
1065 description if not provided.
1066 tags: A list of tags to associate with the created deployment for organizational
1067 purposes.
1068 version: A version for the created deployment. Defaults to the flow's version.
1069 version_type: The type of version information to use for the deployment. The version type
1070 will be inferred if not provided.
1071 enforce_parameter_schema: Whether or not the Prefect API should enforce the
1072 parameter schema for this deployment.
1073 work_pool_name: The name of the work pool to use for this deployment.
1074 work_queue_name: The name of the work queue to use for this deployment's scheduled runs.
1075 If not provided the default work queue for the work pool will be used.
1076 job_variables: Settings used to override the values specified default base job template
1077 of the chosen work pool. Refer to the base job template of the chosen work pool for
1078 available settings.
1079 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.
1080 """
1081 from prefect.flows import load_flow_from_entrypoint
1083 constructed_schedules = cls._construct_deployment_schedules(
1084 interval=interval,
1085 cron=cron,
1086 rrule=rrule,
1087 schedules=schedules,
1088 schedule=schedule,
1089 )
1091 if isinstance(concurrency_limit, ConcurrencyLimitConfig):
1092 concurrency_options = {
1093 "collision_strategy": concurrency_limit.collision_strategy
1094 }
1095 concurrency_limit = concurrency_limit.limit
1096 else:
1097 concurrency_options = None
1099 job_variables = job_variables or {}
1101 with tempfile.TemporaryDirectory() as tmpdir:
1102 storage.set_base_path(Path(tmpdir))
1103 run_coro_as_sync(storage.pull_code())
1105 full_entrypoint = str(storage.destination / entrypoint)
1106 flow = load_flow_from_entrypoint(full_entrypoint)
1108 deployment = cls(
1109 name=name,
1110 flow_name=flow_name or flow.name,
1111 schedules=constructed_schedules,
1112 concurrency_limit=concurrency_limit,
1113 concurrency_options=concurrency_options,
1114 paused=paused,
1115 tags=tags or [],
1116 triggers=triggers or [],
1117 parameters=parameters or {},
1118 description=description,
1119 version=version,
1120 version_type=version_type,
1121 entrypoint=entrypoint,
1122 enforce_parameter_schema=enforce_parameter_schema,
1123 storage=storage,
1124 work_pool_name=work_pool_name,
1125 work_queue_name=work_queue_name,
1126 job_variables=job_variables,
1127 )
1128 deployment._sla = _sla
1129 deployment._path = str(storage.destination).replace(
1130 tmpdir, "$STORAGE_BASE_PATH"
1131 )
1133 cls._set_defaults_from_flow(deployment, flow)
1135 return deployment
1138@sync_compatible 1a
1139async def deploy( 1a
1140 *deployments: RunnerDeployment,
1141 work_pool_name: Optional[str] = None,
1142 image: Optional[Union[str, DockerImage]] = None,
1143 build: bool = True,
1144 push: bool = True,
1145 print_next_steps_message: bool = True,
1146 ignore_warnings: bool = False,
1147) -> List[UUID]:
1148 """
1149 Deploy the provided list of deployments to dynamic infrastructure via a
1150 work pool.
1152 By default, calling this function will build a Docker image for the deployments, push it to a
1153 registry, and create each deployment via the Prefect API that will run the corresponding
1154 flow on the given schedule.
1156 If you want to use an existing image, you can pass `build=False` to skip building and pushing
1157 an image.
1159 Args:
1160 *deployments: A list of deployments to deploy.
1161 work_pool_name: The name of the work pool to use for these deployments. Defaults to
1162 the value of `PREFECT_DEFAULT_WORK_POOL_NAME`.
1163 image: The name of the Docker image to build, including the registry and
1164 repository. Pass a DockerImage instance to customize the Dockerfile used
1165 and build arguments.
1166 build: Whether or not to build a new image for the flow. If False, the provided
1167 image will be used as-is and pulled at runtime.
1168 push: Whether or not to skip pushing the built image to a registry.
1169 print_next_steps_message: Whether or not to print a message with next steps
1170 after deploying the deployments.
1172 Returns:
1173 A list of deployment IDs for the created/updated deployments.
1175 Examples:
1176 Deploy a group of flows to a work pool:
1178 ```python
1179 from prefect import deploy, flow
1181 @flow(log_prints=True)
1182 def local_flow():
1183 print("I'm a locally defined flow!")
1185 if __name__ == "__main__":
1186 deploy(
1187 local_flow.to_deployment(name="example-deploy-local-flow"),
1188 flow.from_source(
1189 source="https://github.com/org/repo.git",
1190 entrypoint="flows.py:my_flow",
1191 ).to_deployment(
1192 name="example-deploy-remote-flow",
1193 ),
1194 work_pool_name="my-work-pool",
1195 image="my-registry/my-image:dev",
1196 )
1197 ```
1198 """
1199 work_pool_name = work_pool_name or PREFECT_DEFAULT_WORK_POOL_NAME.value()
1201 if not image and not all(
1202 d.storage or d.entrypoint_type == EntrypointType.MODULE_PATH
1203 for d in deployments
1204 ):
1205 raise ValueError(
1206 "Either an image or remote storage location must be provided when deploying"
1207 " a deployment."
1208 )
1210 if not work_pool_name:
1211 raise ValueError(
1212 "A work pool name must be provided when deploying a deployment. Either"
1213 " provide a work pool name when calling `deploy` or set"
1214 " `PREFECT_DEFAULT_WORK_POOL_NAME` in your profile."
1215 )
1217 if image and isinstance(image, str):
1218 image_name, image_tag = parse_image_tag(image)
1219 image = DockerImage(name=image_name, tag=image_tag)
1221 try:
1222 async with get_client() as client:
1223 work_pool = await client.read_work_pool(work_pool_name)
1224 active_workers = await client.read_workers_for_work_pool(
1225 work_pool_name,
1226 worker_filter=WorkerFilter(status=WorkerFilterStatus(any_=["ONLINE"])),
1227 )
1228 except ObjectNotFound as exc:
1229 raise ValueError(
1230 f"Could not find work pool {work_pool_name!r}. Please create it before"
1231 " deploying this flow."
1232 ) from exc
1234 is_docker_based_work_pool = get_from_dict(
1235 work_pool.base_job_template, "variables.properties.image", False
1236 )
1237 is_block_based_work_pool = get_from_dict(
1238 work_pool.base_job_template, "variables.properties.block", False
1239 )
1240 # carve out an exception for block based work pools that only have a block in their base job template
1241 console = Console()
1242 if not is_docker_based_work_pool and not is_block_based_work_pool:
1243 if image:
1244 raise ValueError(
1245 f"Work pool {work_pool_name!r} does not support custom Docker images."
1246 " Please use a work pool with an `image` variable in its base job template"
1247 " or specify a remote storage location for the flow with `.from_source`."
1248 " If you are attempting to deploy a flow to a local process work pool,"
1249 " consider using `flow.serve` instead. See the documentation for more"
1250 " information: https://docs.prefect.io/latest/how-to-guides/deployments/run-flows-in-local-processes"
1251 )
1252 elif work_pool.type == "process" and not ignore_warnings:
1253 console.print(
1254 "Looks like you're deploying to a process work pool. If you're creating a"
1255 " deployment for local development, calling `.serve` on your flow is a great"
1256 " way to get started. See the documentation for more information:"
1257 " https://docs.prefect.io/latest/how-to-guides/deployments/run-flows-in-local-processes "
1258 " Set `ignore_warnings=True` to suppress this message.",
1259 style="yellow",
1260 )
1262 is_managed_pool = work_pool.is_managed_pool
1263 if is_managed_pool:
1264 build = False
1265 push = False
1267 if image and build:
1268 with Progress(
1269 SpinnerColumn(),
1270 TextColumn(f"Building image {image.reference}..."),
1271 transient=True,
1272 console=console,
1273 ) as progress:
1274 docker_build_task = progress.add_task("docker_build", total=1)
1275 image.build()
1277 progress.update(docker_build_task, completed=1)
1278 console.print(
1279 f"Successfully built image {image.reference!r}", style="green"
1280 )
1282 if image and build and push:
1283 with Progress(
1284 SpinnerColumn(),
1285 TextColumn("Pushing image..."),
1286 transient=True,
1287 console=console,
1288 ) as progress:
1289 docker_push_task = progress.add_task("docker_push", total=1)
1291 image.push()
1293 progress.update(docker_push_task, completed=1)
1295 console.print(f"Successfully pushed image {image.reference!r}", style="green")
1297 deployment_exceptions: list[dict[str, Any]] = []
1298 deployment_ids: list[UUID] = []
1299 image_ref = image.reference if image else None
1300 for deployment in track(
1301 deployments,
1302 description="Creating/updating deployments...",
1303 console=console,
1304 transient=True,
1305 ):
1306 try:
1307 deployment_ids.append(
1308 await deployment.apply(image=image_ref, work_pool_name=work_pool_name)
1309 )
1310 except Exception as exc:
1311 if len(deployments) == 1:
1312 raise
1313 deployment_exceptions.append({"deployment": deployment, "exc": exc})
1315 if deployment_exceptions:
1316 console.print(
1317 "Encountered errors while creating/updating deployments:\n",
1318 style="orange_red1",
1319 )
1320 else:
1321 console.print("Successfully created/updated all deployments!\n", style="green")
1323 complete_failure = len(deployment_exceptions) == len(deployments)
1325 table = Table(
1326 title="Deployments",
1327 show_lines=True,
1328 )
1330 table.add_column(header="Name", style="blue", no_wrap=True)
1331 table.add_column(header="Status", style="blue", no_wrap=True)
1332 table.add_column(header="Details", style="blue")
1334 for deployment in deployments:
1335 errored_deployment = next(
1336 (d for d in deployment_exceptions if d["deployment"] == deployment),
1337 None,
1338 )
1339 if errored_deployment:
1340 table.add_row(
1341 f"{deployment.flow_name}/{deployment.name}",
1342 "failed",
1343 str(errored_deployment["exc"]),
1344 style="red",
1345 )
1346 else:
1347 table.add_row(f"{deployment.flow_name}/{deployment.name}", "applied")
1348 console.print(table)
1350 if print_next_steps_message and not complete_failure:
1351 if (
1352 not work_pool.is_push_pool
1353 and not work_pool.is_managed_pool
1354 and not active_workers
1355 ):
1356 console.print(
1357 "\nTo execute flow runs from these deployments, start a worker in a"
1358 " separate terminal that pulls work from the"
1359 f" {work_pool_name!r} work pool:"
1360 f"\n\t[blue]$ prefect worker start --pool {work_pool_name!r}[/]",
1361 )
1362 console.print(
1363 "\nTo trigger any of these deployments, use the"
1364 " following command:\n[blue]\n\t$ prefect deployment run"
1365 " [DEPLOYMENT_NAME]\n[/]"
1366 )
1368 if PREFECT_UI_URL:
1369 console.print(
1370 "\nYou can also trigger your deployments via the Prefect UI:"
1371 f" [blue]{PREFECT_UI_URL.value()}/deployments[/]\n"
1372 )
1374 return deployment_ids