Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/cli/automations.py: 12%
227 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Command line interface for working with automations.
3"""
5import asyncio 1a
6import functools 1a
7from pathlib import Path 1a
8from typing import Any, Callable, Optional, Type 1a
9from uuid import UUID 1a
11import orjson 1a
12import typer 1a
13import yaml as pyyaml 1a
14from pydantic import BaseModel 1a
15from rich.pretty import Pretty 1a
16from rich.table import Table 1a
17from rich.text import Text 1a
19from prefect.cli._types import PrefectTyper 1a
20from prefect.cli._utilities import exit_with_error, exit_with_success 1a
21from prefect.cli.root import app, is_interactive 1a
22from prefect.client.orchestration import get_client 1a
23from prefect.events.schemas.automations import Automation, AutomationCore 1a
24from prefect.exceptions import PrefectHTTPStatusError 1a
26automations_app: PrefectTyper = PrefectTyper( 1a
27 name="automation",
28 help="Manage automations.",
29)
30app.add_typer(automations_app, aliases=["automations"]) 1a
33def requires_automations(func: Callable[..., Any]) -> Callable[..., Any]: 1a
34 @functools.wraps(func) 1a
35 async def wrapper(*args: Any, **kwargs: Any) -> Any: 1a
36 try:
37 return await func(*args, **kwargs)
38 except RuntimeError as exc:
39 if "Enable experimental" in str(exc):
40 exit_with_error(str(exc))
41 raise
43 return wrapper 1a
46@automations_app.command() 1a
47@requires_automations 1a
48async def ls(): 1a
49 """List all automations."""
50 async with get_client() as client:
51 automations = await client.read_automations()
53 table = Table(title="Automations", show_lines=True)
55 table.add_column("Automation")
56 table.add_column("Enabled")
57 table.add_column("Trigger")
58 table.add_column("Actions")
60 for automation in automations:
61 identifier_column = Text()
63 identifier_column.append(automation.name, style="white bold")
64 identifier_column.append("\n")
66 identifier_column.append(str(automation.id), style="cyan")
67 identifier_column.append("\n")
69 if automation.description:
70 identifier_column.append(automation.description, style="white")
71 identifier_column.append("\n")
73 if automation.actions_on_trigger or automation.actions_on_resolve:
74 actions = (
75 [
76 f"(trigger) {action.describe_for_cli()}"
77 for action in automation.actions
78 ]
79 + [
80 f"(trigger) {action.describe_for_cli()}"
81 for action in automation.actions_on_trigger
82 ]
83 + [
84 f"(resolve) {action.describe_for_cli()}"
85 for action in automation.actions
86 ]
87 + [
88 f"(resolve) {action.describe_for_cli()}"
89 for action in automation.actions_on_resolve
90 ]
91 )
92 else:
93 actions = [action.describe_for_cli() for action in automation.actions]
95 table.add_row(
96 identifier_column,
97 str(automation.enabled),
98 automation.trigger.describe_for_cli(),
99 "\n".join(actions),
100 )
102 app.console.print(table)
105@automations_app.command() 1a
106@requires_automations 1a
107async def inspect( 1a
108 name: Optional[str] = typer.Argument(None, help="An automation's name"),
109 id: Optional[str] = typer.Option(None, "--id", help="An automation's id"),
110 yaml: bool = typer.Option(False, "--yaml", help="Output as YAML"),
111 json: bool = typer.Option(False, "--json", help="Output as JSON"),
112 output: Optional[str] = typer.Option(
113 None,
114 "--output",
115 "-o",
116 help="Specify an output format. Currently supports: json, yaml",
117 ),
118):
119 """
120 Inspect an automation.
122 Arguments:
124 name: the name of the automation to inspect
126 id: the id of the automation to inspect
128 yaml: output as YAML
130 json: output as JSON
132 Examples:
134 `$ prefect automation inspect "my-automation"`
136 `$ prefect automation inspect --id "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"`
138 `$ prefect automation inspect "my-automation" --yaml`
140 `$ prefect automation inspect "my-automation" --output json`
141 `$ prefect automation inspect "my-automation" --output yaml`
142 """
143 if output and output.lower() not in ["json", "yaml"]:
144 exit_with_error("Only 'json' and 'yaml' output formats are supported.")
146 if not id and not name:
147 exit_with_error("Please provide either a name or an id.")
149 if name:
150 async with get_client() as client:
151 automation = await client.read_automations_by_name(name=name)
152 if not automation:
153 exit_with_error(f"Automation {name!r} not found.")
155 elif id:
156 async with get_client() as client:
157 try:
158 uuid_id = UUID(id)
159 automation = await client.read_automation(uuid_id)
160 except (PrefectHTTPStatusError, ValueError):
161 exit_with_error(f"Automation with id {id!r} not found.")
163 if yaml or json or (output and output.lower() in ["json", "yaml"]):
165 def no_really_json(obj: Type[BaseModel]):
166 # Working around a weird bug where pydantic isn't rendering enums as strings
167 #
168 # automation.trigger.model_dump(mode="json")
169 # {..., 'posture': 'Reactive', ...}
170 #
171 # automation.model_dump(mode="json")
172 # {..., 'posture': Posture.Reactive, ...}
173 return orjson.loads(obj.model_dump_json())
175 if isinstance(automation, list):
176 automation = [no_really_json(a) for a in automation]
177 elif isinstance(automation, Automation):
178 automation = no_really_json(automation)
180 if yaml or (output and output.lower() == "yaml"):
181 app.console.print(pyyaml.dump(automation, sort_keys=False))
182 elif json or (output and output.lower() == "json"):
183 app.console.print(
184 orjson.dumps(automation, option=orjson.OPT_INDENT_2).decode()
185 )
186 else:
187 app.console.print(Pretty(automation))
190@automations_app.command(aliases=["enable"]) 1a
191@requires_automations 1a
192async def resume( 1a
193 name: Optional[str] = typer.Argument(None, help="An automation's name"),
194 id: Optional[str] = typer.Option(None, "--id", help="An automation's id"),
195):
196 """
197 Resume an automation.
199 Arguments:
201 name: the name of the automation to resume
203 id: the id of the automation to resume
205 Examples:
207 `$ prefect automation resume "my-automation"`
209 `$ prefect automation resume --id "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"`
210 """
211 if not id and not name:
212 exit_with_error("Please provide either a name or an id.")
214 if name:
215 async with get_client() as client:
216 automation = await client.read_automations_by_name(name=name)
217 if not automation:
218 exit_with_error(
219 f"Automation with name {name!r} not found. You can also specify an id with the `--id` flag."
220 )
221 if len(automation) > 1:
222 if not typer.confirm(
223 f"Multiple automations found with name {name!r}. Do you want to resume all of them?",
224 default=False,
225 ):
226 exit_with_error("Resume aborted.")
228 for a in automation:
229 await client.resume_automation(a.id)
230 exit_with_success(
231 f"Resumed automation(s) with name {name!r} and id(s) {', '.join([repr(str(a.id)) for a in automation])}."
232 )
234 elif id:
235 async with get_client() as client:
236 try:
237 uuid_id = UUID(id)
238 automation = await client.read_automation(uuid_id)
239 except (PrefectHTTPStatusError, ValueError):
240 exit_with_error(f"Automation with id {id!r} not found.")
241 await client.resume_automation(automation.id)
242 exit_with_success(f"Resumed automation with id {str(automation.id)!r}.")
245@automations_app.command(aliases=["disable"]) 1a
246@requires_automations 1a
247async def pause( 1a
248 name: Optional[str] = typer.Argument(None, help="An automation's name"),
249 id: Optional[str] = typer.Option(None, "--id", help="An automation's id"),
250):
251 """
252 Pause an automation.
254 Arguments:
256 name: the name of the automation to pause
258 id: the id of the automation to pause
260 Examples:
262 `$ prefect automation pause "my-automation"`
264 `$ prefect automation pause --id "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"`
265 """
266 if not id and not name:
267 exit_with_error("Please provide either a name or an id.")
269 if name:
270 async with get_client() as client:
271 automation = await client.read_automations_by_name(name=name)
272 if not automation:
273 exit_with_error(
274 f"Automation with name {name!r} not found. You can also specify an id with the `--id` flag."
275 )
276 if len(automation) > 1:
277 if not typer.confirm(
278 f"Multiple automations found with name {name!r}. Do you want to pause all of them?",
279 default=False,
280 ):
281 exit_with_error("Pause aborted.")
283 for a in automation:
284 await client.pause_automation(a.id)
285 exit_with_success(
286 f"Paused automation(s) with name {name!r} and id(s) {', '.join([repr(str(a.id)) for a in automation])}."
287 )
289 elif id:
290 async with get_client() as client:
291 try:
292 uuid_id = UUID(id)
293 automation = await client.read_automation(uuid_id)
294 except (PrefectHTTPStatusError, ValueError):
295 exit_with_error(f"Automation with id {id!r} not found.")
296 await client.pause_automation(automation.id)
297 exit_with_success(f"Paused automation with id {str(automation.id)!r}.")
300@automations_app.command() 1a
301@requires_automations 1a
302async def delete( 1a
303 name: Optional[str] = typer.Argument(None, help="An automation's name"),
304 id: Optional[str] = typer.Option(None, "--id", help="An automation's id"),
305 _all: bool = typer.Option(False, "--all", help="Delete all automations"),
306):
307 """Delete an automation.
309 Arguments:
310 name: the name of the automation to delete
311 id: the id of the automation to delete
313 Examples:
314 `$ prefect automation delete "my-automation"`
315 `$ prefect automation delete --id "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"`
316 `$ prefect automation delete --all`
317 """
319 async with get_client() as client:
320 if _all:
321 if name is not None or id is not None:
322 exit_with_error(
323 "Cannot provide an automation name or id when deleting all automations."
324 )
325 automations = await client.read_automations()
326 if len(automations) == 0:
327 exit_with_success("No automations found.")
328 if is_interactive() and not typer.confirm(
329 f"Are you sure you want to delete all {len(automations)} automations?",
330 default=False,
331 ):
332 exit_with_error("Deletion aborted.")
334 semaphore = asyncio.Semaphore(10)
336 async def limited_delete(automation_id):
337 async with semaphore:
338 await client.delete_automation(automation_id)
340 await asyncio.gather(
341 *[limited_delete(automation.id) for automation in automations]
342 )
344 plural = "" if len(automations) == 1 else "s"
345 exit_with_success(f"Deleted {len(automations)} automation{plural}.")
347 if not id and not name:
348 exit_with_error("Please provide either a name or an id.")
350 if id:
351 automation = await client.read_automation(id)
352 if not automation:
353 exit_with_error(f"Automation with id {id!r} not found.")
354 if is_interactive() and not typer.confirm(
355 (f"Are you sure you want to delete automation with id {id!r}?"),
356 default=False,
357 ):
358 exit_with_error("Deletion aborted.")
359 await client.delete_automation(id)
360 exit_with_success(f"Deleted automation with id {id!r}")
362 elif name:
363 automation = await client.read_automations_by_name(name=name)
364 if not automation:
365 exit_with_error(
366 f"Automation {name!r} not found. You can also specify an id with the `--id` flag."
367 )
368 elif len(automation) > 1:
369 exit_with_error(
370 f"Multiple automations found with name {name!r}. Please specify an id with the `--id` flag instead."
371 )
372 if is_interactive() and not typer.confirm(
373 (f"Are you sure you want to delete automation with name {name!r}?"),
374 default=False,
375 ):
376 exit_with_error("Deletion aborted.")
377 await client.delete_automation(automation[0].id)
378 exit_with_success(f"Deleted automation with name {name!r}")
381@automations_app.command() 1a
382@requires_automations 1a
383async def create( 1a
384 from_file: Optional[str] = typer.Option(
385 None,
386 "--from-file",
387 "-f",
388 help="Path to YAML or JSON file containing automation(s)",
389 ),
390 from_json: Optional[str] = typer.Option(
391 None,
392 "--from-json",
393 "-j",
394 help="JSON string containing automation(s)",
395 ),
396):
397 """Create one or more automations from a file or JSON string.
399 Examples:
400 `$ prefect automation create --from-file automation.yaml`
401 `$ prefect automation create -f automation.json`
402 `$ prefect automation create --from-json '{"name": "my-automation", "trigger": {...}, "actions": [...]}'`
403 `$ prefect automation create -j '[{"name": "auto1", ...}, {"name": "auto2", ...}]'`
404 """
405 if from_file and from_json:
406 exit_with_error("Please provide either --from-file or --from-json, not both.")
408 if not from_file and not from_json:
409 exit_with_error("Please provide either --from-file or --from-json.")
411 if from_file:
412 file_path = Path(from_file)
413 if not file_path.exists():
414 exit_with_error(f"File not found: {from_file}")
416 with open(file_path, "r") as f:
417 content = f.read()
419 if file_path.suffix.lower() in [".yaml", ".yml"]:
420 data = pyyaml.safe_load(content)
421 elif file_path.suffix.lower() == ".json":
422 data = orjson.loads(content)
423 else:
424 exit_with_error(
425 "File extension not recognized. Please use .yaml, .yml, or .json"
426 )
427 else: # from_json
428 try:
429 data = orjson.loads(from_json)
430 except orjson.JSONDecodeError as e:
431 exit_with_error(f"Invalid JSON: {e}")
433 automations_data = []
434 if isinstance(data, dict) and "automations" in data:
435 automations_data = data["automations"]
436 elif isinstance(data, list):
437 automations_data = data
438 else:
439 automations_data = [data]
441 created = []
442 failed = []
443 async with get_client() as client:
444 for i, automation_data in enumerate(automations_data):
445 try:
446 automation = AutomationCore.model_validate(automation_data)
447 automation_id = await client.create_automation(automation)
448 created.append((automation.name, automation_id))
449 except Exception as e:
450 name = automation_data.get("name", f"automation at index {i}")
451 failed.append((name, str(e)))
453 if failed:
454 app.console.print(f"[red]Failed to create {len(failed)} automation(s):[/red]")
455 for name, error in failed:
456 app.console.print(f" - {name}: {error}")
458 if created:
459 if len(created) == 1 and not failed:
460 name, automation_id = created[0]
461 exit_with_success(f"Created automation '{name}' with id {automation_id}")
462 else:
463 app.console.print(f"[green]Created {len(created)} automation(s):[/green]")
464 for name, automation_id in created:
465 app.console.print(f" - '{name}' with id {automation_id}")
467 if failed:
468 raise typer.Exit(1)
469 else:
470 raise typer.Exit(0)