Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/work_pool.py: 16%

342 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Command line interface for working with work queues. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import datetime 1a

8import json 1a

9import textwrap 1a

10from typing import Annotated, Any, Optional 1a

11 

12import orjson 1a

13import typer 1a

14from rich.pretty import Pretty 1a

15from rich.table import Table 1a

16 

17from prefect.cli._prompts import prompt_select_from_table 1a

18from prefect.cli._types import PrefectTyper 1a

19from prefect.cli._utilities import ( 1a

20 exit_with_error, 

21 exit_with_success, 

22) 

23from prefect.cli.root import app, is_interactive 1a

24from prefect.client.collections import get_collections_metadata_client 1a

25from prefect.client.orchestration import PrefectClient, get_client 1a

26from prefect.client.schemas.actions import ( 1a

27 BlockDocumentCreate, 

28 BlockDocumentUpdate, 

29 WorkPoolCreate, 

30 WorkPoolUpdate, 

31) 

32from prefect.client.schemas.objects import ( 1a

33 BlockDocument, 

34 FlowRun, 

35 WorkPool, 

36 WorkPoolStorageConfiguration, 

37) 

38from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a

39from prefect.infrastructure import provisioners 1a

40from prefect.settings import update_current_profile 1a

41from prefect.types._datetime import now as now_fn 1a

42from prefect.utilities import urls 1a

43from prefect.workers.utilities import ( 1a

44 get_available_work_pool_types, 

45 get_default_base_job_template_for_infrastructure_type, 

46) 

47 

48work_pool_app: PrefectTyper = PrefectTyper(name="work-pool", help="Manage work pools.") 1a

49app.add_typer(work_pool_app, aliases=["work-pool"]) 1a

50 

51 

52def set_work_pool_as_default(name: str) -> None: 1a

53 profile = update_current_profile({"PREFECT_DEFAULT_WORK_POOL_NAME": name}) 

54 app.console.print( 

55 f"Set {name!r} as default work pool for profile {profile.name!r}\n", 

56 style="green", 

57 ) 

58 app.console.print( 

59 ( 

60 "To change your default work pool, run:\n\n\t[blue]prefect config set" 

61 " PREFECT_DEFAULT_WORK_POOL_NAME=<work-pool-name>[/]\n" 

62 ), 

63 ) 

64 

65 

66def has_provisioner_for_type(work_pool_type: str) -> bool: 1a

67 """ 

68 Check if there is a provisioner for the given work pool type. 

69 

70 Args: 

71 work_pool_type (str): The type of the work pool. 

72 

73 Returns: 

74 bool: True if a provisioner exists for the given type, False otherwise. 

75 """ 

76 return work_pool_type in provisioners._provisioners 

77 

78 

79@work_pool_app.command() 1a

80async def create( 1a

81 name: str = typer.Argument(..., help="The name of the work pool."), 

82 base_job_template: typer.FileText = typer.Option( 

83 None, 

84 "--base-job-template", 

85 help=( 

86 "The path to a JSON file containing the base job template to use. If" 

87 " unspecified, Prefect will use the default base job template for the given" 

88 " worker type." 

89 ), 

90 ), 

91 paused: bool = typer.Option( 

92 False, 

93 "--paused", 

94 help="Whether or not to create the work pool in a paused state.", 

95 ), 

96 type: str = typer.Option( 

97 None, "-t", "--type", help="The type of work pool to create." 

98 ), 

99 set_as_default: bool = typer.Option( 

100 False, 

101 "--set-as-default", 

102 help=( 

103 "Whether or not to use the created work pool as the local default for" 

104 " deployment." 

105 ), 

106 ), 

107 provision_infrastructure: bool = typer.Option( 

108 False, 

109 "--provision-infrastructure", 

110 "--provision-infra", 

111 help=( 

112 "Whether or not to provision infrastructure for the work pool if supported" 

113 " for the given work pool type." 

114 ), 

115 ), 

116 overwrite: bool = typer.Option( 

117 False, 

118 "--overwrite", 

119 help=("Whether or not to overwrite an existing work pool with the same name."), 

120 ), 

121): 

122 """ 

123 Create a new work pool or update an existing one. 

124 

125 \b 

126 Examples: 

127 \b 

128 Create a Kubernetes work pool in a paused state: 

129 \b 

130 $ prefect work-pool create "my-pool" --type kubernetes --paused 

131 \b 

132 Create a Docker work pool with a custom base job template: 

133 \b 

134 $ prefect work-pool create "my-pool" --type docker --base-job-template ./base-job-template.json 

135 \b 

136 Update an existing work pool: 

137 \b 

138 $ prefect work-pool create "existing-pool" --base-job-template ./base-job-template.json --overwrite 

139 

140 """ 

141 if not name.lower().strip("'\" "): 

142 exit_with_error("Work pool name cannot be empty.") 

143 async with get_client() as client: 

144 try: 

145 existing_pool = await client.read_work_pool(work_pool_name=name) 

146 if not overwrite: 

147 exit_with_error( 

148 f"Work pool named {name!r} already exists. Use --overwrite to update it." 

149 ) 

150 except ObjectNotFound: 

151 existing_pool = None 

152 

153 if type is None and existing_pool is None: 

154 async with get_collections_metadata_client() as collections_client: 

155 if not is_interactive(): 

156 exit_with_error( 

157 "When not using an interactive terminal, you must supply a" 

158 " `--type` value." 

159 ) 

160 worker_metadata = await collections_client.read_worker_metadata() 

161 

162 # Retrieve only push pools if provisioning infrastructure 

163 data = [ 

164 worker 

165 for collection in worker_metadata.values() 

166 for worker in collection.values() 

167 if provision_infrastructure 

168 and has_provisioner_for_type(worker["type"]) 

169 or not provision_infrastructure 

170 ] 

171 worker = prompt_select_from_table( 

172 app.console, 

173 "What type of work pool infrastructure would you like to use?", 

174 columns=[ 

175 {"header": "Infrastructure Type", "key": "display_name"}, 

176 {"header": "Description", "key": "description"}, 

177 ], 

178 data=data, 

179 table_kwargs={"show_lines": True}, 

180 ) 

181 type = worker["type"] 

182 elif existing_pool: 

183 type = existing_pool.type 

184 

185 available_work_pool_types = await get_available_work_pool_types() 

186 if type not in available_work_pool_types: 

187 exit_with_error( 

188 f"Unknown work pool type {type!r}. " 

189 "Please choose from" 

190 f" {', '.join(available_work_pool_types)}." 

191 ) 

192 

193 if base_job_template is None: 

194 template_contents = ( 

195 await get_default_base_job_template_for_infrastructure_type(type) 

196 ) 

197 else: 

198 template_contents = json.load(base_job_template) 

199 

200 if provision_infrastructure: 

201 try: 

202 provisioner = ( 

203 provisioners.get_infrastructure_provisioner_for_work_pool_type(type) 

204 ) 

205 provisioner.console = app.console 

206 template_contents = await provisioner.provision( 

207 work_pool_name=name, base_job_template=template_contents 

208 ) 

209 except ValueError as exc: 

210 print(exc) 

211 app.console.print( 

212 ( 

213 "Automatic infrastructure provisioning is not supported for" 

214 f" {type!r} work pools." 

215 ), 

216 style="yellow", 

217 ) 

218 except RuntimeError as exc: 

219 exit_with_error(f"Failed to provision infrastructure: {exc}") 

220 

221 try: 

222 wp = WorkPoolCreate( 

223 name=name, 

224 type=type, 

225 base_job_template=template_contents, 

226 is_paused=paused, 

227 ) 

228 work_pool = await client.create_work_pool(work_pool=wp, overwrite=overwrite) 

229 action = "Updated" if overwrite and existing_pool else "Created" 

230 app.console.print( 

231 f"{action} work pool {work_pool.name!r}!\n", style="green" 

232 ) 

233 if ( 

234 not work_pool.is_paused 

235 and not work_pool.is_managed_pool 

236 and not work_pool.is_push_pool 

237 ): 

238 app.console.print("To start a worker for this work pool, run:\n") 

239 app.console.print( 

240 f"\t[blue]prefect worker start --pool {work_pool.name}[/]\n" 

241 ) 

242 if set_as_default: 

243 set_work_pool_as_default(work_pool.name) 

244 

245 url = urls.url_for(work_pool) 

246 pool_url = url if url else "<no dashboard available>" 

247 

248 app.console.print( 

249 textwrap.dedent( 

250 f""" 

251 └── UUID: {work_pool.id} 

252 └── Type: {work_pool.type} 

253 └── Description: {work_pool.description} 

254 └── Status: {work_pool.status.display_name} 

255 └── URL: {pool_url} 

256 """ 

257 ).strip(), 

258 soft_wrap=True, 

259 ) 

260 exit_with_success("") 

261 except ObjectAlreadyExists: 

262 exit_with_error( 

263 f"Work pool named {name!r} already exists. Please use --overwrite to update it." 

264 ) 

265 

266 

267@work_pool_app.command() 1a

268async def ls( 1a

269 verbose: bool = typer.Option( 

270 False, 

271 "--verbose", 

272 "-v", 

273 help="Show additional information about work pools.", 

274 ), 

275): 

276 """ 

277 List work pools. 

278 

279 \b 

280 Examples: 

281 $ prefect work-pool ls 

282 """ 

283 table = Table( 

284 title="Work Pools", caption="(**) denotes a paused pool", caption_style="red" 

285 ) 

286 table.add_column("Name", style="green", no_wrap=True) 

287 table.add_column("Type", style="magenta", no_wrap=True) 

288 table.add_column("ID", justify="right", style="cyan", no_wrap=True) 

289 table.add_column("Concurrency Limit", style="blue", no_wrap=True) 

290 if verbose: 

291 table.add_column("Base Job Template", style="magenta", no_wrap=True) 

292 

293 async with get_client() as client: 

294 pools = await client.read_work_pools() 

295 

296 def sort_by_created_key(q: WorkPool) -> datetime.timedelta: 

297 assert q.created is not None 

298 return now_fn("UTC") - q.created 

299 

300 for pool in sorted(pools, key=sort_by_created_key): 

301 row = [ 

302 f"{pool.name} [red](**)" if pool.is_paused else pool.name, 

303 str(pool.type), 

304 str(pool.id), 

305 ( 

306 f"[red]{pool.concurrency_limit}" 

307 if pool.concurrency_limit is not None 

308 else "[blue]None" 

309 ), 

310 ] 

311 if verbose: 

312 row.append(str(pool.base_job_template)) 

313 table.add_row(*row) 

314 

315 app.console.print(table) 

316 

317 

318@work_pool_app.command() 1a

319async def inspect( 1a

320 name: str = typer.Argument(..., help="The name of the work pool to inspect."), 

321 output: Optional[str] = typer.Option( 

322 None, 

323 "--output", 

324 "-o", 

325 help="Specify an output format. Currently supports: json", 

326 ), 

327): 

328 """ 

329 Inspect a work pool. 

330 

331 \b 

332 Examples: 

333 $ prefect work-pool inspect "my-pool" 

334 $ prefect work-pool inspect "my-pool" --output json 

335 

336 """ 

337 if output and output.lower() != "json": 

338 exit_with_error("Only 'json' output format is supported.") 

339 

340 async with get_client() as client: 

341 try: 

342 pool = await client.read_work_pool(work_pool_name=name) 

343 if output and output.lower() == "json": 

344 pool_json = pool.model_dump(mode="json") 

345 json_output = orjson.dumps( 

346 pool_json, option=orjson.OPT_INDENT_2 

347 ).decode() 

348 app.console.print(json_output) 

349 else: 

350 app.console.print(Pretty(pool)) 

351 except ObjectNotFound: 

352 exit_with_error(f"Work pool {name!r} not found!") 

353 

354 

355@work_pool_app.command() 1a

356async def pause( 1a

357 name: str = typer.Argument(..., help="The name of the work pool to pause."), 

358): 

359 """ 

360 Pause a work pool. 

361 

362 \b 

363 Examples: 

364 $ prefect work-pool pause "my-pool" 

365 

366 """ 

367 async with get_client() as client: 

368 try: 

369 await client.update_work_pool( 

370 work_pool_name=name, 

371 work_pool=WorkPoolUpdate( 

372 is_paused=True, 

373 ), 

374 ) 

375 except ObjectNotFound as exc: 

376 exit_with_error(exc) 

377 

378 exit_with_success(f"Paused work pool {name!r}") 

379 

380 

381@work_pool_app.command() 1a

382async def resume( 1a

383 name: str = typer.Argument(..., help="The name of the work pool to resume."), 

384): 

385 """ 

386 Resume a work pool. 

387 

388 \b 

389 Examples: 

390 $ prefect work-pool resume "my-pool" 

391 

392 """ 

393 async with get_client() as client: 

394 try: 

395 await client.update_work_pool( 

396 work_pool_name=name, 

397 work_pool=WorkPoolUpdate( 

398 is_paused=False, 

399 ), 

400 ) 

401 except ObjectNotFound as exc: 

402 exit_with_error(exc) 

403 

404 exit_with_success(f"Resumed work pool {name!r}") 

405 

406 

407@work_pool_app.command() 1a

408async def update( 1a

409 name: str = typer.Argument(..., help="The name of the work pool to update."), 

410 base_job_template: typer.FileText = typer.Option( 

411 None, 

412 "--base-job-template", 

413 help=( 

414 "The path to a JSON file containing the base job template to use. If" 

415 " unspecified, Prefect will use the default base job template for the given" 

416 " worker type. If None, the base job template will not be modified." 

417 ), 

418 ), 

419 concurrency_limit: int = typer.Option( 

420 None, 

421 "--concurrency-limit", 

422 help=( 

423 "The concurrency limit for the work pool. If None, the concurrency limit" 

424 " will not be modified." 

425 ), 

426 ), 

427 description: str = typer.Option( 

428 None, 

429 "--description", 

430 help=( 

431 "The description for the work pool. If None, the description will not be" 

432 " modified." 

433 ), 

434 ), 

435): 

436 """ 

437 Update a work pool. 

438 

439 \b 

440 Examples: 

441 $ prefect work-pool update "my-pool" 

442 

443 """ 

444 wp = WorkPoolUpdate() 

445 if base_job_template: 

446 wp.base_job_template = json.load(base_job_template) 

447 if concurrency_limit: 

448 wp.concurrency_limit = concurrency_limit 

449 if description: 

450 wp.description = description 

451 

452 async with get_client() as client: 

453 try: 

454 await client.update_work_pool( 

455 work_pool_name=name, 

456 work_pool=wp, 

457 ) 

458 except ObjectNotFound: 

459 exit_with_error(f"Work pool named {name!r} does not exist.") 

460 

461 exit_with_success(f"Updated work pool {name!r}") 

462 

463 

464@work_pool_app.command(aliases=["provision-infra"]) 1a

465async def provision_infrastructure( 1a

466 name: str = typer.Argument( 

467 ..., help="The name of the work pool to provision infrastructure for." 

468 ), 

469): 

470 """ 

471 Provision infrastructure for a work pool. 

472 

473 \b 

474 Examples: 

475 $ prefect work-pool provision-infrastructure "my-pool" 

476 

477 $ prefect work-pool provision-infra "my-pool" 

478 

479 """ 

480 async with get_client() as client: 

481 try: 

482 work_pool = await client.read_work_pool(work_pool_name=name) 

483 if not work_pool.is_push_pool: 

484 exit_with_error( 

485 f"Work pool {name!r} is not a push pool type. " 

486 "Please try provisioning infrastructure for a push pool." 

487 ) 

488 except ObjectNotFound: 

489 exit_with_error(f"Work pool {name!r} does not exist.") 

490 except Exception as exc: 

491 exit_with_error(f"Failed to read work pool {name!r}: {exc}") 

492 

493 try: 

494 provisioner = ( 

495 provisioners.get_infrastructure_provisioner_for_work_pool_type( 

496 work_pool.type 

497 ) 

498 ) 

499 provisioner.console = app.console 

500 new_base_job_template = await provisioner.provision( 

501 work_pool_name=name, base_job_template=work_pool.base_job_template 

502 ) 

503 

504 await client.update_work_pool( 

505 work_pool_name=name, 

506 work_pool=WorkPoolUpdate( 

507 base_job_template=new_base_job_template, 

508 ), 

509 ) 

510 

511 except ValueError as exc: 

512 app.console.print(f"Error: {exc}") 

513 app.console.print( 

514 ( 

515 "Automatic infrastructure provisioning is not supported for" 

516 f" {work_pool.type!r} work pools." 

517 ), 

518 style="yellow", 

519 ) 

520 except RuntimeError as exc: 

521 exit_with_error( 

522 f"Failed to provision infrastructure for '{name}' work pool: {exc}" 

523 ) 

524 

525 

526@work_pool_app.command() 1a

527async def delete( 1a

528 name: str = typer.Argument(..., help="The name of the work pool to delete."), 

529): 

530 """ 

531 Delete a work pool. 

532 

533 \b 

534 Examples: 

535 $ prefect work-pool delete "my-pool" 

536 

537 """ 

538 async with get_client() as client: 

539 try: 

540 work_pool = await client.read_work_pool(work_pool_name=name) 

541 if is_interactive() and not typer.confirm( 

542 ( 

543 f"Are you sure you want to delete work pool with name {work_pool.name!r}?" 

544 ), 

545 default=False, 

546 ): 

547 exit_with_error("Deletion aborted.") 

548 await client.delete_work_pool(work_pool_name=name) 

549 except ObjectNotFound: 

550 exit_with_error(f"Work pool {name!r} does not exist.") 

551 

552 exit_with_success(f"Deleted work pool {name!r}") 

553 

554 

555@work_pool_app.command() 1a

556async def set_concurrency_limit( 1a

557 name: str = typer.Argument(..., help="The name of the work pool to update."), 

558 concurrency_limit: int = typer.Argument( 

559 ..., help="The new concurrency limit for the work pool." 

560 ), 

561): 

562 """ 

563 Set the concurrency limit for a work pool. 

564 

565 \b 

566 Examples: 

567 $ prefect work-pool set-concurrency-limit "my-pool" 10 

568 

569 """ 

570 async with get_client() as client: 

571 try: 

572 await client.update_work_pool( 

573 work_pool_name=name, 

574 work_pool=WorkPoolUpdate( 

575 concurrency_limit=concurrency_limit, 

576 ), 

577 ) 

578 except ObjectNotFound as exc: 

579 exit_with_error(exc) 

580 

581 exit_with_success( 

582 f"Set concurrency limit for work pool {name!r} to {concurrency_limit}" 

583 ) 

584 

585 

586@work_pool_app.command() 1a

587async def clear_concurrency_limit( 1a

588 name: str = typer.Argument(..., help="The name of the work pool to update."), 

589): 

590 """ 

591 Clear the concurrency limit for a work pool. 

592 

593 \b 

594 Examples: 

595 $ prefect work-pool clear-concurrency-limit "my-pool" 

596 

597 """ 

598 async with get_client() as client: 

599 try: 

600 await client.update_work_pool( 

601 work_pool_name=name, 

602 work_pool=WorkPoolUpdate( 

603 concurrency_limit=None, 

604 ), 

605 ) 

606 except ObjectNotFound as exc: 

607 exit_with_error(exc) 

608 

609 exit_with_success(f"Cleared concurrency limit for work pool {name!r}") 

610 

611 

612@work_pool_app.command() 1a

613async def get_default_base_job_template( 1a

614 type: str = typer.Option( 

615 None, 

616 "-t", 

617 "--type", 

618 help="The type of work pool for which to get the default base job template.", 

619 ), 

620 file: str = typer.Option( 

621 None, "-f", "--file", help="If set, write the output to a file." 

622 ), 

623): 

624 """ 

625 Get the default base job template for a given work pool type. 

626 

627 \b 

628 Examples: 

629 $ prefect work-pool get-default-base-job-template --type kubernetes 

630 """ 

631 base_job_template = await get_default_base_job_template_for_infrastructure_type( 

632 type 

633 ) 

634 if base_job_template is None: 

635 exit_with_error( 

636 f"Unknown work pool type {type!r}. " 

637 "Please choose from" 

638 f" {', '.join(await get_available_work_pool_types())}." 

639 ) 

640 

641 if file is None: 

642 print(json.dumps(base_job_template, indent=2)) 

643 else: 

644 with open(file, mode="w") as f: 

645 json.dump(base_job_template, fp=f, indent=2) 

646 

647 

648@work_pool_app.command() 1a

649async def preview( 1a

650 name: str = typer.Argument(None, help="The name or ID of the work pool to preview"), 

651 hours: int = typer.Option( 

652 None, 

653 "-h", 

654 "--hours", 

655 help="The number of hours to look ahead; defaults to 1 hour", 

656 ), 

657): 

658 """ 

659 Preview the work pool's scheduled work for all queues. 

660 

661 \b 

662 Examples: 

663 $ prefect work-pool preview "my-pool" --hours 24 

664 

665 """ 

666 if hours is None: 

667 hours = 1 

668 

669 async with get_client() as client: 

670 try: 

671 responses = await client.get_scheduled_flow_runs_for_work_pool( 

672 work_pool_name=name, 

673 ) 

674 except ObjectNotFound as exc: 

675 exit_with_error(exc) 

676 

677 runs = [response.flow_run for response in responses] 

678 table = Table(caption="(**) denotes a late run", caption_style="red") 

679 

680 table.add_column( 

681 "Scheduled Start Time", justify="left", style="yellow", no_wrap=True 

682 ) 

683 table.add_column("Run ID", justify="left", style="cyan", no_wrap=True) 

684 table.add_column("Name", style="green", no_wrap=True) 

685 table.add_column("Deployment ID", style="blue", no_wrap=True) 

686 

687 now = now_fn("UTC") 

688 

689 def sort_by_created_key(r: FlowRun) -> datetime.timedelta: 

690 assert r.created is not None 

691 return now - r.created 

692 

693 for run in sorted(runs, key=sort_by_created_key): 

694 table.add_row( 

695 ( 

696 f"{run.expected_start_time} [red](**)" 

697 if run.expected_start_time and run.expected_start_time < now 

698 else f"{run.expected_start_time}" 

699 ), 

700 str(run.id), 

701 run.name, 

702 str(run.deployment_id), 

703 ) 

704 

705 if runs: 

706 app.console.print(table) 

707 else: 

708 app.console.print( 

709 ( 

710 "No runs found - try increasing how far into the future you preview" 

711 " with the --hours flag" 

712 ), 

713 style="yellow", 

714 ) 

715 

716 

717# -------------------------------------------------------------------------- 

718# Work Pool Storage Configuration 

719# -------------------------------------------------------------------------- 

720 

721work_pool_storage_app: PrefectTyper = PrefectTyper( 1a

722 name="storage", help="EXPERIMENTAL: Manage work pool storage." 

723) 

724work_pool_app.add_typer(work_pool_storage_app) 1a

725 

726 

727def _determine_storage_type(storage_config: WorkPoolStorageConfiguration) -> str | None: 1a

728 if storage_config.bundle_upload_step is None: 

729 return None 

730 if storage_config.bundle_upload_step and any( 

731 "prefect_aws" in step for step in storage_config.bundle_upload_step.keys() 

732 ): 

733 return "S3" 

734 if storage_config.bundle_upload_step and any( 

735 "prefect_gcp" in step for step in storage_config.bundle_upload_step.keys() 

736 ): 

737 return "GCS" 

738 if storage_config.bundle_upload_step and any( 

739 "prefect_azure" in step for step in storage_config.bundle_upload_step.keys() 

740 ): 

741 return "Azure Blob Storage" 

742 return "Unknown" 

743 

744 

745@work_pool_storage_app.command(name="inspect") 1a

746async def storage_inspect( 1a

747 work_pool_name: Annotated[ 

748 str, 

749 typer.Argument( 

750 ..., help="The name of the work pool to display storage configuration for." 

751 ), 

752 ], 

753 output: Optional[str] = typer.Option( 

754 None, 

755 "--output", 

756 "-o", 

757 help="Specify an output format. Currently supports: json", 

758 ), 

759): 

760 """ 

761 EXPERIMENTAL: Inspect the storage configuration for a work pool. 

762 

763 Examples: 

764 $ prefect work-pool storage inspect "my-pool" 

765 $ prefect work-pool storage inspect "my-pool" --output json 

766 """ 

767 if output and output.lower() != "json": 

768 exit_with_error("Only 'json' output format is supported.") 

769 

770 async with get_client() as client: 

771 try: 

772 work_pool = await client.read_work_pool(work_pool_name=work_pool_name) 

773 from rich.panel import Panel 

774 from rich.table import Table 

775 

776 storage_table = Table(show_header=True, header_style="bold") 

777 storage_table.add_column("Setting", style="cyan") 

778 storage_table.add_column("Value") 

779 

780 storage_type = _determine_storage_type(work_pool.storage_configuration) 

781 if not storage_type: 

782 if output and output.lower() == "json": 

783 app.console.print("{}") 

784 else: 

785 app.console.print( 

786 f"No storage configuration found for work pool {work_pool_name!r}", 

787 style="yellow", 

788 ) 

789 return 

790 

791 if output and output.lower() == "json": 

792 storage_data = {"type": storage_type} 

793 if work_pool.storage_configuration.bundle_upload_step is not None: 

794 fqn = list( 

795 work_pool.storage_configuration.bundle_upload_step.keys() 

796 )[0] 

797 config_values = work_pool.storage_configuration.bundle_upload_step[ 

798 fqn 

799 ] 

800 storage_data.update(config_values) 

801 

802 json_output = orjson.dumps( 

803 storage_data, option=orjson.OPT_INDENT_2 

804 ).decode() 

805 app.console.print(json_output) 

806 else: 

807 storage_table.add_row("type", storage_type) 

808 

809 # Add other storage settings, filtering out None values 

810 if work_pool.storage_configuration.bundle_upload_step is not None: 

811 fqn = list( 

812 work_pool.storage_configuration.bundle_upload_step.keys() 

813 )[0] 

814 config_values = work_pool.storage_configuration.bundle_upload_step[ 

815 fqn 

816 ] 

817 for key, value in config_values.items(): 

818 storage_table.add_row(key, str(value)) 

819 

820 panel = Panel( 

821 storage_table, 

822 title=f"[bold]Storage Configuration for {work_pool_name}[/bold]", 

823 expand=False, 

824 ) 

825 

826 app.console.print(panel) 

827 

828 except ObjectNotFound: 

829 exit_with_error(f"Work pool {work_pool_name!r} does not exist.") 

830 

831 

832async def _create_or_update_result_storage_block( 1a

833 client: PrefectClient, 

834 block_document_name: str, 

835 block_document_data: dict[str, Any], 

836 block_type_slug: str, 

837 missing_block_definition_error: str, 

838) -> BlockDocument: 

839 try: 

840 existing_block_document = await client.read_block_document_by_name( 

841 name=block_document_name, block_type_slug=block_type_slug 

842 ) 

843 except ObjectNotFound: 

844 existing_block_document = None 

845 

846 if existing_block_document is not None: 

847 await client.update_block_document( 

848 block_document_id=existing_block_document.id, 

849 block_document=BlockDocumentUpdate( 

850 data=block_document_data, 

851 ), 

852 ) 

853 block_document = existing_block_document 

854 else: 

855 try: 

856 block_type = await client.read_block_type_by_slug(slug=block_type_slug) 

857 block_schema = await client.get_most_recent_block_schema_for_block_type( 

858 block_type_id=block_type.id 

859 ) 

860 except ObjectNotFound: 

861 exit_with_error(missing_block_definition_error) 

862 else: 

863 if block_schema is None: 

864 exit_with_error(missing_block_definition_error) 

865 

866 block_document = await client.create_block_document( 

867 block_document=BlockDocumentCreate( 

868 name=block_document_name, 

869 block_type_id=block_type.id, 

870 block_schema_id=block_schema.id, 

871 data=block_document_data, 

872 ) 

873 ) 

874 

875 return block_document 

876 

877 

878work_pool_storage_configure_app: PrefectTyper = PrefectTyper( 1a

879 name="configure", help="EXPERIMENTAL: Configure work pool storage." 

880) 

881work_pool_storage_app.add_typer(work_pool_storage_configure_app) 1a

882 

883 

884@work_pool_storage_configure_app.command() 1a

885async def s3( 1a

886 work_pool_name: str = typer.Argument( 

887 ..., 

888 help="The name of the work pool to configure storage for.", 

889 show_default=False, 

890 ), 

891 bucket: str = typer.Option( 

892 ..., 

893 "--bucket", 

894 help="The name of the S3 bucket to use.", 

895 show_default=False, 

896 prompt="Enter the name of the S3 bucket to use", 

897 ), 

898 credentials_block_name: str = typer.Option( 

899 ..., 

900 "--aws-credentials-block-name", 

901 help="The name of the AWS credentials block to use.", 

902 show_default=False, 

903 prompt="Enter the name of the AWS credentials block to use", 

904 ), 

905): 

906 """ 

907 EXPERIMENTAL: Configure AWS S3 storage for a work pool. 

908 

909 \b 

910 Examples: 

911 $ prefect work-pool storage configure s3 "my-pool" --bucket my-bucket --aws-credentials-block-name my-credentials 

912 """ 

913 # TODO: Allow passing in AWS keys and creating a block for the user. 

914 async with get_client() as client: 

915 try: 

916 credentials_block_document = await client.read_block_document_by_name( 

917 name=credentials_block_name, block_type_slug="aws-credentials" 

918 ) 

919 except ObjectNotFound: 

920 exit_with_error( 

921 f"AWS credentials block {credentials_block_name!r} does not exist. Please create one using `prefect block create aws-credentials`." 

922 ) 

923 

924 result_storage_block_document_name = f"default-{work_pool_name}-result-storage" 

925 block_data = { 

926 "bucket_name": bucket, 

927 "bucket_folder": "results", 

928 "credentials": { 

929 "$ref": {"block_document_id": credentials_block_document.id} 

930 }, 

931 } 

932 

933 block_document = await _create_or_update_result_storage_block( 

934 client=client, 

935 block_document_name=result_storage_block_document_name, 

936 block_document_data=block_data, 

937 block_type_slug="s3-bucket", 

938 missing_block_definition_error="S3 bucket block definition does not exist server-side. Please install `prefect-aws` and run `prefect blocks register -m prefect_aws`.", 

939 ) 

940 

941 try: 

942 await client.update_work_pool( 

943 work_pool_name=work_pool_name, 

944 work_pool=WorkPoolUpdate( 

945 storage_configuration=WorkPoolStorageConfiguration( 

946 bundle_upload_step={ 

947 "prefect_aws.experimental.bundles.upload": { 

948 "requires": "prefect-aws", 

949 "bucket": bucket, 

950 "aws_credentials_block_name": credentials_block_name, 

951 } 

952 }, 

953 bundle_execution_step={ 

954 "prefect_aws.experimental.bundles.execute": { 

955 "requires": "prefect-aws", 

956 "bucket": bucket, 

957 "aws_credentials_block_name": credentials_block_name, 

958 } 

959 }, 

960 default_result_storage_block_id=block_document.id, 

961 ), 

962 ), 

963 ) 

964 except ObjectNotFound: 

965 exit_with_error(f"Work pool {work_pool_name!r} does not exist.") 

966 

967 exit_with_success(f"Configured S3 storage for work pool {work_pool_name!r}") 

968 

969 

970@work_pool_storage_configure_app.command() 1a

971async def gcs( 1a

972 work_pool_name: str = typer.Argument( 

973 ..., 

974 help="The name of the work pool to configure storage for.", 

975 show_default=False, 

976 ), 

977 bucket: str = typer.Option( 

978 ..., 

979 "--bucket", 

980 help="The name of the Google Cloud Storage bucket to use.", 

981 show_default=False, 

982 prompt="Enter the name of the Google Cloud Storage bucket to use", 

983 ), 

984 credentials_block_name: str = typer.Option( 

985 ..., 

986 "--gcp-credentials-block-name", 

987 help="The name of the Google Cloud credentials block to use.", 

988 show_default=False, 

989 prompt="Enter the name of the Google Cloud credentials block to use", 

990 ), 

991): 

992 """ 

993 EXPERIMENTAL: Configure Google Cloud storage for a work pool. 

994 

995 \b 

996 Examples: 

997 $ prefect work-pool storage configure gcs "my-pool" --bucket my-bucket --gcp-credentials-block-name my-credentials 

998 """ 

999 async with get_client() as client: 

1000 try: 

1001 credentials_block_document = await client.read_block_document_by_name( 

1002 name=credentials_block_name, block_type_slug="gcp-credentials" 

1003 ) 

1004 except ObjectNotFound: 

1005 exit_with_error( 

1006 f"GCS credentials block {credentials_block_name!r} does not exist. Please create one using `prefect block create gcp-credentials`." 

1007 ) 

1008 

1009 result_storage_block_document_name = f"default-{work_pool_name}-result-storage" 

1010 block_data = { 

1011 "bucket_name": bucket, 

1012 "bucket_folder": "results", 

1013 "credentials": { 

1014 "$ref": {"block_document_id": credentials_block_document.id} 

1015 }, 

1016 } 

1017 

1018 block_document = await _create_or_update_result_storage_block( 

1019 client=client, 

1020 block_document_name=result_storage_block_document_name, 

1021 block_document_data=block_data, 

1022 block_type_slug="gcs-bucket", 

1023 missing_block_definition_error="GCS bucket block definition does not exist server-side. Please install `prefect-gcp` and run `prefect blocks register -m prefect_gcp`.", 

1024 ) 

1025 

1026 try: 

1027 await client.update_work_pool( 

1028 work_pool_name=work_pool_name, 

1029 work_pool=WorkPoolUpdate( 

1030 storage_configuration=WorkPoolStorageConfiguration( 

1031 bundle_upload_step={ 

1032 "prefect_gcp.experimental.bundles.upload": { 

1033 "requires": "prefect-gcp", 

1034 "bucket": bucket, 

1035 "gcp_credentials_block_name": credentials_block_name, 

1036 } 

1037 }, 

1038 bundle_execution_step={ 

1039 "prefect_gcp.experimental.bundles.execute": { 

1040 "requires": "prefect-gcp", 

1041 "bucket": bucket, 

1042 "gcp_credentials_block_name": credentials_block_name, 

1043 } 

1044 }, 

1045 default_result_storage_block_id=block_document.id, 

1046 ), 

1047 ), 

1048 ) 

1049 except ObjectNotFound: 

1050 exit_with_error(f"Work pool {work_pool_name!r} does not exist.") 

1051 

1052 exit_with_success(f"Configured GCS storage for work pool {work_pool_name!r}") 

1053 

1054 

1055@work_pool_storage_configure_app.command() 1a

1056async def azure_blob_storage( 1a

1057 work_pool_name: str = typer.Argument( 

1058 ..., 

1059 help="The name of the work pool to configure storage for.", 

1060 show_default=False, 

1061 ), 

1062 container: str = typer.Option( 

1063 ..., 

1064 "--container", 

1065 help="The name of the Azure Blob Storage container to use.", 

1066 show_default=False, 

1067 prompt="Enter the name of the Azure Blob Storage container to use", 

1068 ), 

1069 credentials_block_name: str = typer.Option( 

1070 ..., 

1071 "--azure-blob-storage-credentials-block-name", 

1072 help="The name of the Azure Blob Storage credentials block to use.", 

1073 show_default=False, 

1074 prompt="Enter the name of the Azure Blob Storage credentials block to use", 

1075 ), 

1076): 

1077 """ 

1078 EXPERIMENTAL: Configure Azure Blob Storage for a work pool. 

1079 

1080 \b 

1081 Examples: 

1082 $ prefect work-pool storage configure azure-blob-storage "my-pool" --container my-container --azure-blob-storage-credentials-block-name my-credentials 

1083 """ 

1084 async with get_client() as client: 

1085 try: 

1086 credentials_block_document = await client.read_block_document_by_name( 

1087 name=credentials_block_name, 

1088 block_type_slug="azure-blob-storage-credentials", 

1089 ) 

1090 except ObjectNotFound: 

1091 exit_with_error( 

1092 f"Azure Blob Storage credentials block {credentials_block_name!r} does not exist. Please create one using `prefect block create azure-blob-storage-credentials`." 

1093 ) 

1094 

1095 result_storage_block_document_name = f"default-{work_pool_name}-result-storage" 

1096 block_data = { 

1097 "container_name": container, 

1098 "credentials": { 

1099 "$ref": {"block_document_id": credentials_block_document.id} 

1100 }, 

1101 } 

1102 

1103 block_document = await _create_or_update_result_storage_block( 

1104 client=client, 

1105 block_document_name=result_storage_block_document_name, 

1106 block_document_data=block_data, 

1107 block_type_slug="azure-blob-storage-container", 

1108 missing_block_definition_error="Azure Blob Storage container block definition does not exist server-side. Please install `prefect-azure[storage]` and run `prefect blocks register -m prefect_azure`.", 

1109 ) 

1110 

1111 try: 

1112 await client.update_work_pool( 

1113 work_pool_name=work_pool_name, 

1114 work_pool=WorkPoolUpdate( 

1115 storage_configuration=WorkPoolStorageConfiguration( 

1116 bundle_upload_step={ 

1117 "prefect_azure.experimental.bundles.upload": { 

1118 "requires": "prefect-azure", 

1119 "container": container, 

1120 "azure_blob_storage_credentials_block_name": credentials_block_name, 

1121 } 

1122 }, 

1123 bundle_execution_step={ 

1124 "prefect_azure.experimental.bundles.execute": { 

1125 "requires": "prefect-azure", 

1126 "container": container, 

1127 "azure_blob_storage_credentials_block_name": credentials_block_name, 

1128 } 

1129 }, 

1130 default_result_storage_block_id=block_document.id, 

1131 ), 

1132 ), 

1133 ) 

1134 except ObjectNotFound: 

1135 exit_with_error(f"Work pool {work_pool_name!r} does not exist.") 

1136 

1137 exit_with_success( 

1138 f"Configured Azure Blob Storage for work pool {work_pool_name!r}" 

1139 )