Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/cli/automations.py: 12%

227 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Command line interface for working with automations. 

3""" 

4 

5import asyncio 1a

6import functools 1a

7from pathlib import Path 1a

8from typing import Any, Callable, Optional, Type 1a

9from uuid import UUID 1a

10 

11import orjson 1a

12import typer 1a

13import yaml as pyyaml 1a

14from pydantic import BaseModel 1a

15from rich.pretty import Pretty 1a

16from rich.table import Table 1a

17from rich.text import Text 1a

18 

19from prefect.cli._types import PrefectTyper 1a

20from prefect.cli._utilities import exit_with_error, exit_with_success 1a

21from prefect.cli.root import app, is_interactive 1a

22from prefect.client.orchestration import get_client 1a

23from prefect.events.schemas.automations import Automation, AutomationCore 1a

24from prefect.exceptions import PrefectHTTPStatusError 1a

25 

26automations_app: PrefectTyper = PrefectTyper( 1a

27 name="automation", 

28 help="Manage automations.", 

29) 

30app.add_typer(automations_app, aliases=["automations"]) 1a

31 

32 

33def requires_automations(func: Callable[..., Any]) -> Callable[..., Any]: 1a

34 @functools.wraps(func) 1a

35 async def wrapper(*args: Any, **kwargs: Any) -> Any: 1a

36 try: 

37 return await func(*args, **kwargs) 

38 except RuntimeError as exc: 

39 if "Enable experimental" in str(exc): 

40 exit_with_error(str(exc)) 

41 raise 

42 

43 return wrapper 1a

44 

45 

46@automations_app.command() 1a

47@requires_automations 1a

48async def ls(): 1a

49 """List all automations.""" 

50 async with get_client() as client: 

51 automations = await client.read_automations() 

52 

53 table = Table(title="Automations", show_lines=True) 

54 

55 table.add_column("Automation") 

56 table.add_column("Enabled") 

57 table.add_column("Trigger") 

58 table.add_column("Actions") 

59 

60 for automation in automations: 

61 identifier_column = Text() 

62 

63 identifier_column.append(automation.name, style="white bold") 

64 identifier_column.append("\n") 

65 

66 identifier_column.append(str(automation.id), style="cyan") 

67 identifier_column.append("\n") 

68 

69 if automation.description: 

70 identifier_column.append(automation.description, style="white") 

71 identifier_column.append("\n") 

72 

73 if automation.actions_on_trigger or automation.actions_on_resolve: 

74 actions = ( 

75 [ 

76 f"(trigger) {action.describe_for_cli()}" 

77 for action in automation.actions 

78 ] 

79 + [ 

80 f"(trigger) {action.describe_for_cli()}" 

81 for action in automation.actions_on_trigger 

82 ] 

83 + [ 

84 f"(resolve) {action.describe_for_cli()}" 

85 for action in automation.actions 

86 ] 

87 + [ 

88 f"(resolve) {action.describe_for_cli()}" 

89 for action in automation.actions_on_resolve 

90 ] 

91 ) 

92 else: 

93 actions = [action.describe_for_cli() for action in automation.actions] 

94 

95 table.add_row( 

96 identifier_column, 

97 str(automation.enabled), 

98 automation.trigger.describe_for_cli(), 

99 "\n".join(actions), 

100 ) 

101 

102 app.console.print(table) 

103 

104 

105@automations_app.command() 1a

106@requires_automations 1a

107async def inspect( 1a

108 name: Optional[str] = typer.Argument(None, help="An automation's name"), 

109 id: Optional[str] = typer.Option(None, "--id", help="An automation's id"), 

110 yaml: bool = typer.Option(False, "--yaml", help="Output as YAML"), 

111 json: bool = typer.Option(False, "--json", help="Output as JSON"), 

112 output: Optional[str] = typer.Option( 

113 None, 

114 "--output", 

115 "-o", 

116 help="Specify an output format. Currently supports: json, yaml", 

117 ), 

118): 

119 """ 

120 Inspect an automation. 

121 

122 Arguments: 

123 

124 name: the name of the automation to inspect 

125 

126 id: the id of the automation to inspect 

127 

128 yaml: output as YAML 

129 

130 json: output as JSON 

131 

132 Examples: 

133 

134 `$ prefect automation inspect "my-automation"` 

135 

136 `$ prefect automation inspect --id "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"` 

137 

138 `$ prefect automation inspect "my-automation" --yaml` 

139 

140 `$ prefect automation inspect "my-automation" --output json` 

141 `$ prefect automation inspect "my-automation" --output yaml` 

142 """ 

143 if output and output.lower() not in ["json", "yaml"]: 

144 exit_with_error("Only 'json' and 'yaml' output formats are supported.") 

145 

146 if not id and not name: 

147 exit_with_error("Please provide either a name or an id.") 

148 

149 if name: 

150 async with get_client() as client: 

151 automation = await client.read_automations_by_name(name=name) 

152 if not automation: 

153 exit_with_error(f"Automation {name!r} not found.") 

154 

155 elif id: 

156 async with get_client() as client: 

157 try: 

158 uuid_id = UUID(id) 

159 automation = await client.read_automation(uuid_id) 

160 except (PrefectHTTPStatusError, ValueError): 

161 exit_with_error(f"Automation with id {id!r} not found.") 

162 

163 if yaml or json or (output and output.lower() in ["json", "yaml"]): 

164 

165 def no_really_json(obj: Type[BaseModel]): 

166 # Working around a weird bug where pydantic isn't rendering enums as strings 

167 # 

168 # automation.trigger.model_dump(mode="json") 

169 # {..., 'posture': 'Reactive', ...} 

170 # 

171 # automation.model_dump(mode="json") 

172 # {..., 'posture': Posture.Reactive, ...} 

173 return orjson.loads(obj.model_dump_json()) 

174 

175 if isinstance(automation, list): 

176 automation = [no_really_json(a) for a in automation] 

177 elif isinstance(automation, Automation): 

178 automation = no_really_json(automation) 

179 

180 if yaml or (output and output.lower() == "yaml"): 

181 app.console.print(pyyaml.dump(automation, sort_keys=False)) 

182 elif json or (output and output.lower() == "json"): 

183 app.console.print( 

184 orjson.dumps(automation, option=orjson.OPT_INDENT_2).decode() 

185 ) 

186 else: 

187 app.console.print(Pretty(automation)) 

188 

189 

190@automations_app.command(aliases=["enable"]) 1a

191@requires_automations 1a

192async def resume( 1a

193 name: Optional[str] = typer.Argument(None, help="An automation's name"), 

194 id: Optional[str] = typer.Option(None, "--id", help="An automation's id"), 

195): 

196 """ 

197 Resume an automation. 

198 

199 Arguments: 

200 

201 name: the name of the automation to resume 

202 

203 id: the id of the automation to resume 

204 

205 Examples: 

206 

207 `$ prefect automation resume "my-automation"` 

208 

209 `$ prefect automation resume --id "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"` 

210 """ 

211 if not id and not name: 

212 exit_with_error("Please provide either a name or an id.") 

213 

214 if name: 

215 async with get_client() as client: 

216 automation = await client.read_automations_by_name(name=name) 

217 if not automation: 

218 exit_with_error( 

219 f"Automation with name {name!r} not found. You can also specify an id with the `--id` flag." 

220 ) 

221 if len(automation) > 1: 

222 if not typer.confirm( 

223 f"Multiple automations found with name {name!r}. Do you want to resume all of them?", 

224 default=False, 

225 ): 

226 exit_with_error("Resume aborted.") 

227 

228 for a in automation: 

229 await client.resume_automation(a.id) 

230 exit_with_success( 

231 f"Resumed automation(s) with name {name!r} and id(s) {', '.join([repr(str(a.id)) for a in automation])}." 

232 ) 

233 

234 elif id: 

235 async with get_client() as client: 

236 try: 

237 uuid_id = UUID(id) 

238 automation = await client.read_automation(uuid_id) 

239 except (PrefectHTTPStatusError, ValueError): 

240 exit_with_error(f"Automation with id {id!r} not found.") 

241 await client.resume_automation(automation.id) 

242 exit_with_success(f"Resumed automation with id {str(automation.id)!r}.") 

243 

244 

245@automations_app.command(aliases=["disable"]) 1a

246@requires_automations 1a

247async def pause( 1a

248 name: Optional[str] = typer.Argument(None, help="An automation's name"), 

249 id: Optional[str] = typer.Option(None, "--id", help="An automation's id"), 

250): 

251 """ 

252 Pause an automation. 

253 

254 Arguments: 

255 

256 name: the name of the automation to pause 

257 

258 id: the id of the automation to pause 

259 

260 Examples: 

261 

262 `$ prefect automation pause "my-automation"` 

263 

264 `$ prefect automation pause --id "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"` 

265 """ 

266 if not id and not name: 

267 exit_with_error("Please provide either a name or an id.") 

268 

269 if name: 

270 async with get_client() as client: 

271 automation = await client.read_automations_by_name(name=name) 

272 if not automation: 

273 exit_with_error( 

274 f"Automation with name {name!r} not found. You can also specify an id with the `--id` flag." 

275 ) 

276 if len(automation) > 1: 

277 if not typer.confirm( 

278 f"Multiple automations found with name {name!r}. Do you want to pause all of them?", 

279 default=False, 

280 ): 

281 exit_with_error("Pause aborted.") 

282 

283 for a in automation: 

284 await client.pause_automation(a.id) 

285 exit_with_success( 

286 f"Paused automation(s) with name {name!r} and id(s) {', '.join([repr(str(a.id)) for a in automation])}." 

287 ) 

288 

289 elif id: 

290 async with get_client() as client: 

291 try: 

292 uuid_id = UUID(id) 

293 automation = await client.read_automation(uuid_id) 

294 except (PrefectHTTPStatusError, ValueError): 

295 exit_with_error(f"Automation with id {id!r} not found.") 

296 await client.pause_automation(automation.id) 

297 exit_with_success(f"Paused automation with id {str(automation.id)!r}.") 

298 

299 

300@automations_app.command() 1a

301@requires_automations 1a

302async def delete( 1a

303 name: Optional[str] = typer.Argument(None, help="An automation's name"), 

304 id: Optional[str] = typer.Option(None, "--id", help="An automation's id"), 

305 _all: bool = typer.Option(False, "--all", help="Delete all automations"), 

306): 

307 """Delete an automation. 

308 

309 Arguments: 

310 name: the name of the automation to delete 

311 id: the id of the automation to delete 

312 

313 Examples: 

314 `$ prefect automation delete "my-automation"` 

315 `$ prefect automation delete --id "aaaaaaaa-aaaa-aaaa-aaaa-aaaaaaaaaaaa"` 

316 `$ prefect automation delete --all` 

317 """ 

318 

319 async with get_client() as client: 

320 if _all: 

321 if name is not None or id is not None: 

322 exit_with_error( 

323 "Cannot provide an automation name or id when deleting all automations." 

324 ) 

325 automations = await client.read_automations() 

326 if len(automations) == 0: 

327 exit_with_success("No automations found.") 

328 if is_interactive() and not typer.confirm( 

329 f"Are you sure you want to delete all {len(automations)} automations?", 

330 default=False, 

331 ): 

332 exit_with_error("Deletion aborted.") 

333 

334 semaphore = asyncio.Semaphore(10) 

335 

336 async def limited_delete(automation_id): 

337 async with semaphore: 

338 await client.delete_automation(automation_id) 

339 

340 await asyncio.gather( 

341 *[limited_delete(automation.id) for automation in automations] 

342 ) 

343 

344 plural = "" if len(automations) == 1 else "s" 

345 exit_with_success(f"Deleted {len(automations)} automation{plural}.") 

346 

347 if not id and not name: 

348 exit_with_error("Please provide either a name or an id.") 

349 

350 if id: 

351 automation = await client.read_automation(id) 

352 if not automation: 

353 exit_with_error(f"Automation with id {id!r} not found.") 

354 if is_interactive() and not typer.confirm( 

355 (f"Are you sure you want to delete automation with id {id!r}?"), 

356 default=False, 

357 ): 

358 exit_with_error("Deletion aborted.") 

359 await client.delete_automation(id) 

360 exit_with_success(f"Deleted automation with id {id!r}") 

361 

362 elif name: 

363 automation = await client.read_automations_by_name(name=name) 

364 if not automation: 

365 exit_with_error( 

366 f"Automation {name!r} not found. You can also specify an id with the `--id` flag." 

367 ) 

368 elif len(automation) > 1: 

369 exit_with_error( 

370 f"Multiple automations found with name {name!r}. Please specify an id with the `--id` flag instead." 

371 ) 

372 if is_interactive() and not typer.confirm( 

373 (f"Are you sure you want to delete automation with name {name!r}?"), 

374 default=False, 

375 ): 

376 exit_with_error("Deletion aborted.") 

377 await client.delete_automation(automation[0].id) 

378 exit_with_success(f"Deleted automation with name {name!r}") 

379 

380 

381@automations_app.command() 1a

382@requires_automations 1a

383async def create( 1a

384 from_file: Optional[str] = typer.Option( 

385 None, 

386 "--from-file", 

387 "-f", 

388 help="Path to YAML or JSON file containing automation(s)", 

389 ), 

390 from_json: Optional[str] = typer.Option( 

391 None, 

392 "--from-json", 

393 "-j", 

394 help="JSON string containing automation(s)", 

395 ), 

396): 

397 """Create one or more automations from a file or JSON string. 

398 

399 Examples: 

400 `$ prefect automation create --from-file automation.yaml` 

401 `$ prefect automation create -f automation.json` 

402 `$ prefect automation create --from-json '{"name": "my-automation", "trigger": {...}, "actions": [...]}'` 

403 `$ prefect automation create -j '[{"name": "auto1", ...}, {"name": "auto2", ...}]'` 

404 """ 

405 if from_file and from_json: 

406 exit_with_error("Please provide either --from-file or --from-json, not both.") 

407 

408 if not from_file and not from_json: 

409 exit_with_error("Please provide either --from-file or --from-json.") 

410 

411 if from_file: 

412 file_path = Path(from_file) 

413 if not file_path.exists(): 

414 exit_with_error(f"File not found: {from_file}") 

415 

416 with open(file_path, "r") as f: 

417 content = f.read() 

418 

419 if file_path.suffix.lower() in [".yaml", ".yml"]: 

420 data = pyyaml.safe_load(content) 

421 elif file_path.suffix.lower() == ".json": 

422 data = orjson.loads(content) 

423 else: 

424 exit_with_error( 

425 "File extension not recognized. Please use .yaml, .yml, or .json" 

426 ) 

427 else: # from_json 

428 try: 

429 data = orjson.loads(from_json) 

430 except orjson.JSONDecodeError as e: 

431 exit_with_error(f"Invalid JSON: {e}") 

432 

433 automations_data = [] 

434 if isinstance(data, dict) and "automations" in data: 

435 automations_data = data["automations"] 

436 elif isinstance(data, list): 

437 automations_data = data 

438 else: 

439 automations_data = [data] 

440 

441 created = [] 

442 failed = [] 

443 async with get_client() as client: 

444 for i, automation_data in enumerate(automations_data): 

445 try: 

446 automation = AutomationCore.model_validate(automation_data) 

447 automation_id = await client.create_automation(automation) 

448 created.append((automation.name, automation_id)) 

449 except Exception as e: 

450 name = automation_data.get("name", f"automation at index {i}") 

451 failed.append((name, str(e))) 

452 

453 if failed: 

454 app.console.print(f"[red]Failed to create {len(failed)} automation(s):[/red]") 

455 for name, error in failed: 

456 app.console.print(f" - {name}: {error}") 

457 

458 if created: 

459 if len(created) == 1 and not failed: 

460 name, automation_id = created[0] 

461 exit_with_success(f"Created automation '{name}' with id {automation_id}") 

462 else: 

463 app.console.print(f"[green]Created {len(created)} automation(s):[/green]") 

464 for name, automation_id in created: 

465 app.console.print(f" - '{name}' with id {automation_id}") 

466 

467 if failed: 

468 raise typer.Exit(1) 

469 else: 

470 raise typer.Exit(0)