Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deployment.py: 11%
462 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Command line interface for working with deployments.
3"""
5from __future__ import annotations 1a
7import json 1a
8import sys 1a
9import textwrap 1a
10import warnings 1a
11from asyncio import gather, iscoroutine 1a
12from datetime import datetime, timedelta, timezone 1a
13from typing import TYPE_CHECKING, Any, Optional, TypedDict 1a
14from uuid import UUID 1a
16import orjson 1a
17import typer 1a
18import yaml 1a
19from rich.console import Console 1a
20from rich.pretty import Pretty 1a
21from rich.table import Table 1a
23import prefect.types._datetime 1a
24from prefect.blocks.core import Block 1a
25from prefect.cli._types import PrefectTyper 1a
26from prefect.cli._utilities import exit_with_error, exit_with_success 1a
27from prefect.cli.flow_runs_watching import watch_flow_run 1a
28from prefect.cli.root import app, is_interactive 1a
29from prefect.client.orchestration import get_client 1a
30from prefect.client.schemas.filters import ( 1a
31 DeploymentFilter,
32 FlowFilter,
33 FlowFilterId,
34 FlowFilterName,
35)
36from prefect.client.schemas.objects import DeploymentSchedule 1a
37from prefect.client.schemas.responses import DeploymentResponse 1a
38from prefect.client.schemas.schedules import ( 1a
39 CronSchedule,
40 IntervalSchedule,
41 RRuleSchedule,
42)
43from prefect.client.utilities import inject_client 1a
44from prefect.exceptions import ( 1a
45 ObjectAlreadyExists,
46 ObjectNotFound,
47 PrefectHTTPStatusError,
48)
49from prefect.states import Scheduled 1a
50from prefect.types._datetime import ( 1a
51 DateTime,
52 create_datetime_instance,
53 human_friendly_diff,
54 in_local_tz,
55 parse_datetime,
56 to_datetime_string,
57)
58from prefect.utilities import urls 1a
59from prefect.utilities.collections import listrepr 1a
61if TYPE_CHECKING: 61 ↛ 62line 61 didn't jump to line 62 because the condition on line 61 was never true1a
62 from prefect.client.orchestration import PrefectClient
65def str_presenter( 1a
66 dumper: yaml.Dumper | yaml.representer.SafeRepresenter, data: str
67) -> yaml.ScalarNode:
68 """
69 configures yaml for dumping multiline strings
70 Ref: https://stackoverflow.com/questions/8640959/how-can-i-control-what-scalar-form-pyyaml-uses-for-my-data
71 """
72 if len(data.splitlines()) > 1: # check for multiline string
73 return dumper.represent_scalar("tag:yaml.org,2002:str", data, style="|") # type: ignore[reportUnknownMemberType] incomplete type stubs
74 return dumper.represent_scalar("tag:yaml.org,2002:str", data) # type: ignore[reportUnknownMemberType] incomplete type stubs
77yaml.add_representer(str, str_presenter) 1a
78yaml.representer.SafeRepresenter.add_representer(str, str_presenter) 1a
80deployment_app: PrefectTyper = PrefectTyper( 1a
81 name="deployment", help="Manage deployments."
82)
83schedule_app: PrefectTyper = PrefectTyper( 1a
84 name="schedule", help="Manage deployment schedules."
85)
87deployment_app.add_typer(schedule_app, aliases=["schedule"]) 1a
88app.add_typer(deployment_app, aliases=["deployments"]) 1a
91def assert_deployment_name_format(name: str) -> None: 1a
92 if "/" not in name:
93 exit_with_error(
94 "Invalid deployment name. Expected '<flow-name>/<deployment-name>'"
95 )
98async def get_deployment( 1a
99 client: "PrefectClient", name: str | None, deployment_id: str | None
100) -> DeploymentResponse:
101 if name is None and deployment_id is not None:
102 try:
103 deployment = await client.read_deployment(deployment_id)
104 except PrefectHTTPStatusError:
105 exit_with_error(f"Deployment {deployment_id!r} not found!")
106 elif name is not None and deployment_id is None:
107 try:
108 deployment = await client.read_deployment_by_name(name)
109 except ObjectNotFound:
110 exit_with_error(f"Deployment {name!r} not found!")
111 elif name is None and deployment_id is None:
112 exit_with_error("Must provide a deployed flow's name or id")
113 else:
114 exit_with_error("Only provide a deployed flow's name or id")
116 return deployment
119async def create_work_queue_and_set_concurrency_limit( 1a
120 work_queue_name: str,
121 work_pool_name: str | None,
122 work_queue_concurrency: int | None,
123) -> None:
124 async with get_client() as client:
125 if work_queue_concurrency is not None and work_queue_name:
126 try:
127 try:
128 await check_work_pool_exists(work_pool_name)
129 res = await client.create_work_queue(
130 name=work_queue_name, work_pool_name=work_pool_name
131 )
132 except ObjectAlreadyExists:
133 res = await client.read_work_queue_by_name(
134 name=work_queue_name, work_pool_name=work_pool_name
135 )
136 if res.concurrency_limit != work_queue_concurrency:
137 if work_pool_name is None:
138 app.console.print(
139 (
140 f"Work queue {work_queue_name!r} already exists"
141 " with a concurrency limit of"
142 f" {res.concurrency_limit}, this limit is being"
143 " updated..."
144 ),
145 style="red",
146 )
147 else:
148 app.console.print(
149 (
150 f"Work queue {work_queue_name!r} in work pool"
151 f" {work_pool_name!r} already exists with a"
152 f" concurrency limit of {res.concurrency_limit},"
153 " this limit is being updated..."
154 ),
155 style="red",
156 )
157 await client.update_work_queue(
158 res.id, concurrency_limit=work_queue_concurrency
159 )
160 if work_pool_name is None:
161 app.console.print(
162 (
163 "Updated concurrency limit on work queue"
164 f" {work_queue_name!r} to {work_queue_concurrency}"
165 ),
166 style="green",
167 )
168 else:
169 app.console.print(
170 (
171 "Updated concurrency limit on work queue"
172 f" {work_queue_name!r} in work pool {work_pool_name!r} to"
173 f" {work_queue_concurrency}"
174 ),
175 style="green",
176 )
177 except Exception:
178 exit_with_error(
179 "Failed to set concurrency limit on work queue"
180 f" {work_queue_name!r} in work pool {work_pool_name!r}."
181 )
182 elif work_queue_concurrency:
183 app.console.print(
184 "No work queue set! The concurrency limit cannot be updated."
185 )
188@inject_client 1a
189async def check_work_pool_exists( 1a
190 work_pool_name: str | None, client: "PrefectClient | None" = None
191):
192 if TYPE_CHECKING:
193 assert client is not None
194 if work_pool_name is not None:
195 try:
196 await client.read_work_pool(work_pool_name=work_pool_name)
197 except ObjectNotFound:
198 app.console.print(
199 (
200 "\nThis deployment specifies a work pool name of"
201 f" {work_pool_name!r}, but no such work pool exists.\n"
202 ),
203 style="red ",
204 )
205 app.console.print("To create a work pool via the CLI:\n")
206 app.console.print(
207 f"$ prefect work-pool create {work_pool_name!r}\n", style="blue"
208 )
209 exit_with_error("Work pool not found!")
212class RichTextIO: 1a
213 def __init__(self, console: Console, prefix: str | None = None) -> None: 1a
214 self.console = console
215 self.prefix = prefix
217 def write(self, content: str) -> None: 1a
218 if self.prefix:
219 content = self.prefix + content
220 self.console.print(content)
223@deployment_app.command() 1a
224async def inspect( 1a
225 name: str,
226 output: Optional[str] = typer.Option(
227 None,
228 "--output",
229 "-o",
230 help="Specify an output format. Currently supports: json",
231 ),
232):
233 """
234 View details about a deployment.
236 Examples:
237 `$ prefect deployment inspect "hello-world/my-deployment"`
239 ```python
240 {
241 'id': '610df9c3-0fb4-4856-b330-67f588d20201',
242 'created': '2022-08-01T18:36:25.192102+00:00',
243 'updated': '2022-08-01T18:36:25.188166+00:00',
244 'name': 'my-deployment',
245 'description': None,
246 'flow_id': 'b57b0aa2-ef3a-479e-be49-381fb0483b4e',
247 'schedules': None,
248 'parameters': {'name': 'Marvin'},
249 'tags': ['test'],
250 'parameter_openapi_schema': {
251 'title': 'Parameters',
252 'type': 'object',
253 'properties': {
254 'name': {
255 'title': 'name',
256 'type': 'string'
257 }
258 },
259 'required': ['name']
260 },
261 'storage_document_id': '63ef008f-1e5d-4e07-a0d4-4535731adb32',
262 'infrastructure_document_id': '6702c598-7094-42c8-9785-338d2ec3a028',
263 'infrastructure': {
264 'type': 'process',
265 'env': {},
266 'labels': {},
267 'name': None,
268 'command': ['python', '-m', 'prefect.engine'],
269 'stream_output': True
270 }
271 }
272 ```
273 """
274 if output and output.lower() != "json":
275 exit_with_error("Only 'json' output format is supported.")
277 assert_deployment_name_format(name)
279 async with get_client() as client:
280 try:
281 deployment = await client.read_deployment_by_name(name)
282 except ObjectNotFound:
283 exit_with_error(f"Deployment {name!r} not found!")
285 deployment_json = deployment.model_dump(mode="json")
287 if deployment.infrastructure_document_id:
288 coro = Block.load_from_ref(deployment.infrastructure_document_id)
289 if TYPE_CHECKING:
290 assert iscoroutine(coro)
291 infrastructure = await coro
292 deployment_json["infrastructure"] = infrastructure.model_dump(
293 exclude={"_block_document_id", "_block_document_name", "_is_anonymous"}
294 )
296 deployment_json["automations"] = [
297 a.model_dump()
298 for a in await client.read_resource_related_automations(
299 f"prefect.deployment.{deployment.id}"
300 )
301 ]
303 if output and output.lower() == "json":
304 json_output = orjson.dumps(deployment_json, option=orjson.OPT_INDENT_2).decode()
305 app.console.print(json_output)
306 else:
307 app.console.print(Pretty(deployment_json))
310@schedule_app.command("create") 1a
311async def create_schedule( 1a
312 name: str,
313 interval: Optional[float] = typer.Option(
314 None,
315 "--interval",
316 help="An interval to schedule on, specified in seconds",
317 min=0.0001,
318 ),
319 interval_anchor: Optional[str] = typer.Option(
320 None,
321 "--anchor-date",
322 help="The anchor date for an interval schedule",
323 ),
324 rrule_string: Optional[str] = typer.Option(
325 None, "--rrule", help="Deployment schedule rrule string"
326 ),
327 cron_string: Optional[str] = typer.Option(
328 None, "--cron", help="Deployment schedule cron string"
329 ),
330 cron_day_or: bool = typer.Option(
331 True,
332 "--day_or",
333 help="Control how croniter handles `day` and `day_of_week` entries",
334 ),
335 timezone: Optional[str] = typer.Option(
336 None,
337 "--timezone",
338 help="Deployment schedule timezone string e.g. 'America/New_York'",
339 ),
340 active: bool = typer.Option(
341 True,
342 "--active",
343 help="Whether the schedule is active. Defaults to True.",
344 ),
345 replace: Optional[bool] = typer.Option(
346 False,
347 "--replace",
348 help="Replace the deployment's current schedule(s) with this new schedule.",
349 ),
350 assume_yes: Optional[bool] = typer.Option(
351 False,
352 "--accept-yes",
353 "-y",
354 help="Accept the confirmation prompt without prompting",
355 ),
356):
357 """
358 Create a schedule for a given deployment.
359 """
360 assert_deployment_name_format(name)
362 if sum(option is not None for option in [interval, rrule_string, cron_string]) != 1:
363 exit_with_error(
364 "Exactly one of `--interval`, `--rrule`, or `--cron` must be provided."
365 )
367 schedule = None
369 if interval_anchor and not interval:
370 exit_with_error("An anchor date can only be provided with an interval schedule")
372 if interval is not None:
373 if interval_anchor:
374 try:
375 parsed_interval_anchor = parse_datetime(interval_anchor)
376 except ValueError:
377 return exit_with_error("The anchor date must be a valid date string.")
379 IntervalScheduleOptions = TypedDict(
380 "IntervalScheduleOptions",
381 {
382 "interval": timedelta,
383 "anchor_date": str,
384 "timezone": str,
385 },
386 total=False,
387 )
388 interval_schedule: IntervalScheduleOptions = {
389 "interval": timedelta(seconds=interval),
390 }
391 if interval_anchor:
392 interval_schedule["anchor_date"] = parsed_interval_anchor
393 if timezone:
394 interval_schedule["timezone"] = timezone
395 schedule = IntervalSchedule(**interval_schedule)
397 if cron_string is not None:
398 CronScheduleOptions = TypedDict(
399 "CronScheduleOptions",
400 {
401 "cron": str,
402 "day_or": bool,
403 "timezone": str,
404 },
405 total=False,
406 )
407 cron_schedule: CronScheduleOptions = {
408 "cron": cron_string,
409 "day_or": cron_day_or,
410 }
411 if timezone:
412 cron_schedule["timezone"] = timezone
414 schedule = CronSchedule(**cron_schedule)
416 if rrule_string is not None:
417 # a timezone in the `rrule_string` gets ignored by the RRuleSchedule constructor
418 if "TZID" in rrule_string and not timezone:
419 exit_with_error(
420 "You can provide a timezone by providing a dict with a `timezone` key"
421 " to the --rrule option. E.g. {'rrule': 'FREQ=MINUTELY;INTERVAL=5',"
422 " 'timezone': 'America/New_York'}.\nAlternatively, you can provide a"
423 " timezone by passing in a --timezone argument."
424 )
425 try:
426 schedule = RRuleSchedule(**json.loads(rrule_string))
427 if timezone:
428 # override timezone if specified via CLI argument
429 schedule.timezone = timezone
430 except json.JSONDecodeError:
431 schedule = RRuleSchedule(rrule=rrule_string, timezone=timezone)
433 if schedule is None:
434 return exit_with_success(
435 "Could not create a valid schedule from the provided options."
436 )
438 async with get_client() as client:
439 try:
440 deployment = await client.read_deployment_by_name(name)
441 except ObjectNotFound:
442 return exit_with_error(f"Deployment {name!r} not found!")
444 num_schedules = len(deployment.schedules)
445 noun = "schedule" if num_schedules == 1 else "schedules"
447 if replace and num_schedules > 0:
448 if not assume_yes and not typer.confirm(
449 f"Are you sure you want to replace {num_schedules} {noun} for {name}?"
450 ):
451 return exit_with_error("Schedule replacement cancelled.")
453 for existing_schedule in deployment.schedules:
454 try:
455 await client.delete_deployment_schedule(
456 deployment.id, existing_schedule.id
457 )
458 except ObjectNotFound:
459 pass
461 await client.create_deployment_schedules(deployment.id, [(schedule, active)])
463 if replace and num_schedules > 0:
464 exit_with_success(f"Replaced existing deployment {noun} with new schedule!")
465 else:
466 exit_with_success("Created deployment schedule!")
469@schedule_app.command("delete") 1a
470async def delete_schedule( 1a
471 deployment_name: str,
472 schedule_id: UUID,
473 assume_yes: bool = typer.Option(
474 False,
475 "--accept-yes",
476 "-y",
477 help="Accept the confirmation prompt without prompting",
478 ),
479):
480 """
481 Delete a deployment schedule.
482 """
483 assert_deployment_name_format(deployment_name)
485 async with get_client() as client:
486 try:
487 deployment = await client.read_deployment_by_name(deployment_name)
488 except ObjectNotFound:
489 return exit_with_error(f"Deployment {deployment_name} not found!")
491 try:
492 schedule = [s for s in deployment.schedules if s.id == schedule_id][0]
493 except IndexError:
494 return exit_with_error("Deployment schedule not found!")
496 if not assume_yes and not typer.confirm(
497 f"Are you sure you want to delete this schedule: {schedule.schedule}",
498 ):
499 return exit_with_error("Deletion cancelled.")
501 try:
502 await client.delete_deployment_schedule(deployment.id, schedule_id)
503 except ObjectNotFound:
504 exit_with_error("Deployment schedule not found!")
506 exit_with_success(f"Deleted deployment schedule {schedule_id}")
509async def _set_schedule_activation( 1a
510 deployment_name: Optional[str],
511 schedule_id: Optional[UUID],
512 _all: bool,
513 activate: bool,
514) -> None:
515 """Enable or disable deployment schedules for one or all deployments."""
516 past_tense = "resumed" if activate else "paused"
517 present_tense = "resume" if activate else "pause"
519 # Early argument validation
520 if _all and (deployment_name is not None or schedule_id is not None):
521 return exit_with_error(
522 "Cannot specify deployment name or schedule ID with --all"
523 )
524 if not _all and (deployment_name is None or schedule_id is None):
525 return exit_with_error(
526 "Must provide deployment name and schedule ID, or use --all"
527 )
529 if _all:
530 async with get_client() as client:
531 # Read all deployments with pagination to avoid truncation at default limits
532 deployments: list[DeploymentResponse] = []
533 page_limit = 200
534 offset = 0
535 while True:
536 page = await client.read_deployments(
537 deployment_filter=DeploymentFilter(),
538 limit=page_limit,
539 offset=offset,
540 )
541 if not page:
542 break
543 deployments.extend(page)
544 if len(page) < page_limit:
545 break
546 offset += page_limit
548 if not deployments:
549 return exit_with_success("No deployments found.")
551 schedules_to_update = sum(
552 1 for d in deployments for s in d.schedules if s.active != activate
553 )
555 if schedules_to_update == 0:
556 state_msg = "inactive" if activate else "active"
557 return exit_with_success(
558 f"No {state_msg} schedules found to {present_tense}."
559 )
561 if is_interactive() and not typer.confirm(
562 f"Are you sure you want to {present_tense} {schedules_to_update} schedule(s) across all deployments?",
563 default=False,
564 ):
565 return exit_with_error("Operation cancelled.")
567 update_tasks = []
568 deployment_names = []
569 for deployment in deployments:
570 if deployment.schedules:
571 for schedule in deployment.schedules:
572 if schedule.active != activate:
573 update_tasks.append((deployment.id, schedule.id))
574 deployment_names.append(deployment.name)
576 if update_tasks:
577 import asyncio
579 semaphore = asyncio.Semaphore(10)
581 async def limited_update(dep_id: UUID, sched_id: UUID):
582 async with semaphore:
583 await client.update_deployment_schedule(
584 dep_id, sched_id, active=activate
585 )
587 await gather(*[limited_update(did, sid) for did, sid in update_tasks])
589 # Display progress after all updates complete
590 for name in deployment_names:
591 app.console.print(
592 f"{past_tense.capitalize()} schedule for deployment [cyan]{name}[/cyan]"
593 )
595 exit_with_success(
596 f"{past_tense.capitalize()} {len(update_tasks)} deployment schedule(s)."
597 )
599 else:
600 assert_deployment_name_format(deployment_name)
602 async with get_client() as client:
603 try:
604 deployment = await client.read_deployment_by_name(deployment_name)
605 except ObjectNotFound:
606 return exit_with_error(f"Deployment {deployment_name!r} not found!")
608 schedule = next(
609 (s for s in deployment.schedules if s.id == schedule_id), None
610 )
611 if schedule is None:
612 return exit_with_error("Deployment schedule not found!")
614 if schedule.active == activate:
615 state = "active" if activate else "inactive"
616 return exit_with_error(
617 f"Deployment schedule {schedule_id} is already {state}"
618 )
620 await client.update_deployment_schedule(
621 deployment.id, schedule_id, active=activate
622 )
623 exit_with_success(
624 f"{past_tense.capitalize()} schedule {schedule.schedule} for deployment {deployment_name}"
625 )
628@schedule_app.command("pause") 1a
629async def pause_schedule( 1a
630 deployment_name: Optional[str] = typer.Argument(None),
631 schedule_id: Optional[UUID] = typer.Argument(None),
632 _all: bool = typer.Option(False, "--all", help="Pause all deployment schedules"),
633):
634 """
635 Pause deployment schedules.
637 Examples:
638 Pause a specific schedule:
639 $ prefect deployment schedule pause my-flow/my-deployment abc123-...
641 Pause all schedules:
642 $ prefect deployment schedule pause --all
643 """
644 await _set_schedule_activation(deployment_name, schedule_id, _all, activate=False)
647@schedule_app.command("resume") 1a
648async def resume_schedule( 1a
649 deployment_name: Optional[str] = typer.Argument(None),
650 schedule_id: Optional[UUID] = typer.Argument(None),
651 _all: bool = typer.Option(False, "--all", help="Resume all deployment schedules"),
652):
653 """
654 Resume deployment schedules.
656 Examples:
657 Resume a specific schedule:
658 $ prefect deployment schedule resume my-flow/my-deployment abc123-...
660 Resume all schedules:
661 $ prefect deployment schedule resume --all
662 """
663 await _set_schedule_activation(deployment_name, schedule_id, _all, activate=True)
666@schedule_app.command("ls") 1a
667async def list_schedules(deployment_name: str): 1a
668 """
669 View all schedules for a deployment.
670 """
671 assert_deployment_name_format(deployment_name)
672 async with get_client() as client:
673 try:
674 deployment = await client.read_deployment_by_name(deployment_name)
675 except ObjectNotFound:
676 return exit_with_error(f"Deployment {deployment_name!r} not found!")
678 def sort_by_created_key(schedule: DeploymentSchedule): # type: ignore
679 assert schedule.created is not None, "All schedules should have a created time."
680 return prefect.types._datetime.now("UTC") - schedule.created
682 def schedule_details(schedule: DeploymentSchedule) -> str:
683 if isinstance(schedule.schedule, IntervalSchedule):
684 return f"interval: {schedule.schedule.interval}s"
685 elif isinstance(schedule.schedule, CronSchedule):
686 return f"cron: {schedule.schedule.cron}"
687 elif isinstance(schedule.schedule, RRuleSchedule):
688 return f"rrule: {schedule.schedule.rrule}"
689 else:
690 return "unknown"
692 table = Table(
693 title="Deployment Schedules",
694 )
695 table.add_column("ID", style="blue", no_wrap=True)
696 table.add_column("Schedule", style="cyan", no_wrap=False)
697 table.add_column("Active", style="purple", no_wrap=True)
699 for schedule in sorted(deployment.schedules, key=sort_by_created_key):
700 table.add_row(
701 str(schedule.id),
702 schedule_details(schedule),
703 str(schedule.active),
704 )
706 app.console.print(table)
709@schedule_app.command("clear") 1a
710async def clear_schedules( 1a
711 deployment_name: str,
712 assume_yes: bool = typer.Option(
713 False,
714 "--accept-yes",
715 "-y",
716 help="Accept the confirmation prompt without prompting",
717 ),
718):
719 """
720 Clear all schedules for a deployment.
721 """
722 assert_deployment_name_format(deployment_name)
723 async with get_client() as client:
724 try:
725 deployment = await client.read_deployment_by_name(deployment_name)
726 except ObjectNotFound:
727 return exit_with_error(f"Deployment {deployment_name!r} not found!")
729 await client.read_flow(deployment.flow_id)
731 # Get input from user: confirm removal of all schedules
732 if not assume_yes and not typer.confirm(
733 "Are you sure you want to clear all schedules for this deployment?",
734 ):
735 exit_with_error("Clearing schedules cancelled.")
737 for schedule in deployment.schedules:
738 try:
739 await client.delete_deployment_schedule(deployment.id, schedule.id)
740 except ObjectNotFound:
741 pass
743 exit_with_success(f"Cleared all schedules for deployment {deployment_name}")
746@deployment_app.command() 1a
747async def ls(flow_name: Optional[list[str]] = None, by_created: bool = False): 1a
748 """
749 View all deployments or deployments for specific flows.
750 """
751 async with get_client() as client:
752 deployments = await client.read_deployments(
753 flow_filter=FlowFilter(name=FlowFilterName(any_=flow_name))
754 if flow_name
755 else None
756 )
757 flows = {
758 flow.id: flow
759 for flow in await client.read_flows(
760 flow_filter=FlowFilter(
761 id=FlowFilterId(any_=[d.flow_id for d in deployments])
762 )
763 )
764 }
766 def sort_by_name_keys(d: DeploymentResponse):
767 return flows[d.flow_id].name, d.name
769 def sort_by_created_key(d: DeploymentResponse):
770 assert d.created is not None, "All deployments should have a created time."
771 return DateTime.now("utc") - d.created
773 table = Table(
774 title="Deployments",
775 expand=True,
776 )
777 table.add_column("Name", style="blue", no_wrap=True, ratio=40)
778 table.add_column("ID", style="cyan", no_wrap=True, ratio=40)
779 table.add_column(
780 "Work Pool", style="green", no_wrap=True, ratio=20, overflow="crop"
781 )
783 for deployment in sorted(
784 deployments, key=sort_by_created_key if by_created else sort_by_name_keys
785 ):
786 table.add_row(
787 f"{flows[deployment.flow_id].name}/[bold]{deployment.name}[/]",
788 str(deployment.id),
789 deployment.work_pool_name or "",
790 )
792 app.console.print(table)
795@deployment_app.command() 1a
796async def run( 1a
797 name: Optional[str] = typer.Argument(
798 None, help="A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME>"
799 ),
800 deployment_id: Optional[str] = typer.Option(
801 None,
802 "--id",
803 help=("A deployment id to search for if no name is given"),
804 ),
805 job_variables: list[str] = typer.Option(
806 None,
807 "-jv",
808 "--job-variable",
809 help=(
810 "A key, value pair (key=value) specifying a flow run job variable. The value will"
811 " be interpreted as JSON. May be passed multiple times to specify multiple"
812 " job variable values."
813 ),
814 ),
815 params: list[str] = typer.Option(
816 None,
817 "-p",
818 "--param",
819 help=(
820 "A key, value pair (key=value) specifying a flow parameter. The value will"
821 " be interpreted as JSON. May be passed multiple times to specify multiple"
822 " parameter values."
823 ),
824 ),
825 multiparams: Optional[str] = typer.Option(
826 None,
827 "--params",
828 help=(
829 "A mapping of parameters to values. To use a stdin, pass '-'. Any "
830 "parameters passed with `--param` will take precedence over these values."
831 ),
832 ),
833 start_in: Optional[str] = typer.Option(
834 None,
835 "--start-in",
836 help=(
837 "A human-readable string specifying a time interval to wait before starting"
838 " the flow run. E.g. 'in 5 minutes', 'in 1 hour', 'in 2 days'."
839 ),
840 ),
841 start_at: Optional[str] = typer.Option(
842 None,
843 "--start-at",
844 help=(
845 "A human-readable string specifying a time to start the flow run. E.g."
846 " 'at 5:30pm', 'at 2022-08-01 17:30', 'at 2022-08-01 17:30:00'."
847 ),
848 ),
849 tags: list[str] = typer.Option(
850 None,
851 "--tag",
852 help=("Tag(s) to be applied to flow run."),
853 ),
854 watch: bool = typer.Option(
855 False,
856 "--watch",
857 help=("Whether to poll the flow run until a terminal state is reached."),
858 ),
859 watch_interval: Optional[int] = typer.Option(
860 None,
861 "--watch-interval",
862 help=("How often to poll the flow run for state changes (in seconds)."),
863 ),
864 watch_timeout: Optional[int] = typer.Option(
865 None,
866 "--watch-timeout",
867 help=("Timeout for --watch."),
868 ),
869 flow_run_name: Optional[str] = typer.Option(
870 None, "--flow-run-name", help="Custom name to give the flow run."
871 ),
872):
873 """
874 Create a flow run for the given flow and deployment.
876 The flow run will be scheduled to run immediately unless `--start-in` or `--start-at` is specified.
877 The flow run will not execute until a worker starts.
878 To watch the flow run until it reaches a terminal state, use the `--watch` flag.
879 """
880 import dateparser
882 now = prefect.types._datetime.now("UTC")
884 multi_params: dict[str, Any] = {}
885 if multiparams:
886 if multiparams == "-":
887 multiparams = sys.stdin.read()
888 if not multiparams:
889 exit_with_error("No data passed to stdin")
891 try:
892 multi_params = json.loads(multiparams)
893 except ValueError as exc:
894 exit_with_error(f"Failed to parse JSON: {exc}")
895 cli_params: dict[str, Any] = _load_json_key_values(params or [], "parameter")
896 conflicting_keys = set(cli_params.keys()).intersection(multi_params.keys())
897 if conflicting_keys:
898 app.console.print(
899 "The following parameters were specified by `--param` and `--params`, the "
900 f"`--param` value will be used: {conflicting_keys}"
901 )
902 parameters: dict[str, Any] = {**multi_params, **cli_params}
904 job_vars: dict[str, Any] = _load_json_key_values(
905 job_variables or [], "job variable"
906 )
907 if start_in and start_at:
908 exit_with_error(
909 "Only one of `--start-in` or `--start-at` can be set, not both."
910 )
912 elif start_in is None and start_at is None:
913 scheduled_start_time = now
914 human_dt_diff = " (now)"
915 else:
916 if start_in:
917 start_time_raw = "in " + start_in
918 elif start_at:
919 start_time_raw = "at " + start_at
920 else:
921 exit_with_error("No start time specified")
923 with warnings.catch_warnings():
924 # PyTZ throws a warning based on dateparser usage of the library
925 # See https://github.com/scrapinghub/dateparser/issues/1089
926 warnings.filterwarnings("ignore", module="dateparser")
928 try:
929 start_time_parsed = dateparser.parse( # type: ignore[reportUnknownMemberType]
930 start_time_raw,
931 settings={
932 "TO_TIMEZONE": "UTC",
933 "RETURN_AS_TIMEZONE_AWARE": False,
934 "PREFER_DATES_FROM": "future",
935 "RELATIVE_BASE": datetime.fromtimestamp(
936 now.timestamp(), tz=timezone.utc
937 ),
938 },
939 )
941 except Exception as exc:
942 exit_with_error(f"Failed to parse '{start_time_raw!r}': {exc!s}")
944 if start_time_parsed is None:
945 exit_with_error(f"Unable to parse scheduled start time {start_time_raw!r}.")
947 scheduled_start_time = create_datetime_instance(start_time_parsed)
948 human_dt_diff = " (" + human_friendly_diff(scheduled_start_time) + ")"
950 async with get_client() as client:
951 deployment = await get_deployment(client, name, deployment_id)
952 flow = await client.read_flow(deployment.flow_id)
954 if TYPE_CHECKING:
955 assert deployment.parameter_openapi_schema is not None
956 deployment_parameters = (
957 deployment.parameter_openapi_schema.get("properties", {}).keys()
958 if deployment.parameter_openapi_schema
959 else []
960 )
961 unknown_keys = set(parameters.keys()).difference(deployment_parameters)
962 if unknown_keys:
963 available_parameters = (
964 (
965 "The following parameters are available on the deployment: "
966 + listrepr(deployment_parameters, sep=", ")
967 )
968 if deployment_parameters
969 else "This deployment does not accept parameters."
970 )
972 exit_with_error(
973 "The following parameters were specified but not found on the "
974 f"deployment: {listrepr(unknown_keys, sep=', ')}"
975 f"\n{available_parameters}"
976 )
977 templating_parameters = {**(deployment.parameters or {}), **(parameters or {})}
978 if flow_run_name:
979 try:
980 flow_run_name = flow_run_name.format(**templating_parameters)
981 except KeyError as e:
982 exit_with_error(
983 f"Missing parameter for flow run name: '{e.args[0]}' is undefined"
984 )
985 except Exception as e:
986 exit_with_error(f"Failed to format flow run name: {e}")
988 app.console.print(
989 f"Creating flow run for deployment '{flow.name}/{deployment.name}'...",
990 )
992 try:
993 flow_run = await client.create_flow_run_from_deployment(
994 deployment.id,
995 parameters=parameters,
996 state=Scheduled(scheduled_time=scheduled_start_time),
997 tags=tags,
998 job_variables=job_vars,
999 name=flow_run_name,
1000 )
1001 except PrefectHTTPStatusError as exc:
1002 detail = exc.response.json().get("detail")
1003 if detail:
1004 exit_with_error(
1005 exc.response.json()["detail"],
1006 )
1007 else:
1008 raise
1010 run_url = urls.url_for(flow_run) or "<no dashboard available>"
1011 datetime_local_tz = in_local_tz(scheduled_start_time)
1012 scheduled_display = to_datetime_string(datetime_local_tz)
1013 scheduled_display += human_dt_diff
1015 app.console.print(f"Created flow run {flow_run.name!r}.")
1016 app.console.print(
1017 textwrap.dedent(
1018 f"""
1019 └── UUID: {flow_run.id}
1020 └── Parameters: {flow_run.parameters}
1021 └── Job Variables: {flow_run.job_variables}
1022 └── Scheduled start time: {scheduled_display}
1023 └── URL: {run_url}
1024 """
1025 ).strip(),
1026 soft_wrap=True,
1027 )
1028 if watch:
1029 if watch_interval is not None:
1030 warnings.warn(
1031 "The --watch-interval flag is deprecated and will be removed in a future release. "
1032 "Flow run watching now uses real-time event streaming.",
1033 DeprecationWarning,
1034 stacklevel=2,
1035 )
1037 finished_flow_run = await watch_flow_run(
1038 flow_run.id, app.console, timeout=watch_timeout
1039 )
1040 finished_flow_run_state = finished_flow_run.state
1041 if finished_flow_run_state is None:
1042 exit_with_error("Flow run finished in an unknown state.")
1043 if finished_flow_run_state.is_completed():
1044 exit_with_success(
1045 f"Flow run finished successfully in {finished_flow_run_state.name!r}."
1046 )
1047 exit_with_error(
1048 f"Flow run finished in state {finished_flow_run_state.name!r}.",
1049 code=1,
1050 )
1053@deployment_app.command() 1a
1054async def delete( 1a
1055 name: Optional[str] = typer.Argument(
1056 None, help="A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME>"
1057 ),
1058 deployment_id: Optional[UUID] = typer.Option(
1059 None, "--id", help="A deployment id to search for if no name is given"
1060 ),
1061 _all: bool = typer.Option(False, "--all", help="Delete all deployments"),
1062):
1063 """
1064 Delete a deployment.
1066 Examples:
1067 ```bash
1068 $ prefect deployment delete test_flow/test_deployment
1069 $ prefect deployment delete --id dfd3e220-a130-4149-9af6-8d487e02fea6
1070 ```
1071 """
1072 async with get_client() as client:
1073 if _all:
1074 if name is not None or deployment_id is not None:
1075 exit_with_error(
1076 "Cannot provide a deployment name or id when deleting all deployments."
1077 )
1078 deployments = await client.read_deployments()
1079 if len(deployments) == 0:
1080 exit_with_success("No deployments found.")
1081 if is_interactive() and not typer.confirm(
1082 f"Are you sure you want to delete all {len(deployments)} deployments?",
1083 default=False,
1084 ):
1085 exit_with_error("Deletion aborted.")
1086 for deployment in deployments:
1087 await client.delete_deployment(deployment.id)
1088 plural = "" if len(deployments) == 1 else "s"
1089 exit_with_success(f"Deleted {len(deployments)} deployment{plural}.")
1091 if name is None and deployment_id is not None:
1092 try:
1093 if is_interactive() and not typer.confirm(
1094 (
1095 f"Are you sure you want to delete deployment with id {deployment_id!r}?"
1096 ),
1097 default=False,
1098 ):
1099 exit_with_error("Deletion aborted.")
1100 await client.delete_deployment(deployment_id)
1101 exit_with_success(f"Deleted deployment '{deployment_id}'.")
1102 except ObjectNotFound:
1103 exit_with_error(f"Deployment {deployment_id!r} not found!")
1104 elif name is not None:
1105 try:
1106 deployment = await client.read_deployment_by_name(name)
1107 if is_interactive() and not typer.confirm(
1108 (f"Are you sure you want to delete deployment with name {name!r}?"),
1109 default=False,
1110 ):
1111 exit_with_error("Deletion aborted.")
1112 await client.delete_deployment(deployment.id)
1113 exit_with_success(f"Deleted deployment '{name}'.")
1114 except ObjectNotFound:
1115 exit_with_error(f"Deployment {name!r} not found!")
1116 else:
1117 exit_with_error("Must provide a deployment name or id")
1120def _load_json_key_values( 1a
1121 cli_input: list[str], display_name: str
1122) -> dict[str, dict[str, Any] | str | int]:
1123 """
1124 Parse a list of strings formatted as "key=value" where the value is loaded as JSON.
1126 We do the best here to display a helpful JSON parsing message, e.g.
1127 ```
1128 Error: Failed to parse JSON for parameter 'name' with value
1130 foo
1132 JSON Error: Expecting value: line 1 column 1 (char 0)
1133 Did you forget to include quotes? You may need to escape so your shell does not remove them, e.g. \"
1134 ```
1136 Args:
1137 cli_input: A list of "key=value" strings to parse
1138 display_name: A name to display in exceptions
1140 Returns:
1141 A mapping of keys -> parsed values
1142 """
1143 parsed: dict[str, dict[str, Any] | str | int] = {}
1145 def cast_value(value: str) -> Any:
1146 """Cast the value from a string to a valid JSON type; add quotes for the user
1147 if necessary
1148 """
1149 try:
1150 return json.loads(value)
1151 except ValueError as exc:
1152 if (
1153 "Extra data" in str(exc) or "Expecting value" in str(exc)
1154 ) and '"' not in value:
1155 return cast_value(f'"{value}"')
1156 raise exc
1158 for spec in cli_input:
1159 try:
1160 key, _, value = spec.partition("=")
1161 except ValueError:
1162 exit_with_error(
1163 f"Invalid {display_name} option {spec!r}. Expected format 'key=value'."
1164 )
1166 try:
1167 parsed[key] = cast_value(value)
1168 except ValueError as exc:
1169 indented_value = textwrap.indent(value, prefix="\t")
1170 exit_with_error(
1171 f"Failed to parse JSON for {display_name} {key!r} with value"
1172 f"\n\n{indented_value}\n\n"
1173 f"JSON Error: {exc}"
1174 )
1176 return parsed