Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/work_pool.py: 16%
342 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Command line interface for working with work queues.
3"""
5from __future__ import annotations 1a
7import datetime 1a
8import json 1a
9import textwrap 1a
10from typing import Annotated, Any, Optional 1a
12import orjson 1a
13import typer 1a
14from rich.pretty import Pretty 1a
15from rich.table import Table 1a
17from prefect.cli._prompts import prompt_select_from_table 1a
18from prefect.cli._types import PrefectTyper 1a
19from prefect.cli._utilities import ( 1a
20 exit_with_error,
21 exit_with_success,
22)
23from prefect.cli.root import app, is_interactive 1a
24from prefect.client.collections import get_collections_metadata_client 1a
25from prefect.client.orchestration import PrefectClient, get_client 1a
26from prefect.client.schemas.actions import ( 1a
27 BlockDocumentCreate,
28 BlockDocumentUpdate,
29 WorkPoolCreate,
30 WorkPoolUpdate,
31)
32from prefect.client.schemas.objects import ( 1a
33 BlockDocument,
34 FlowRun,
35 WorkPool,
36 WorkPoolStorageConfiguration,
37)
38from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a
39from prefect.infrastructure import provisioners 1a
40from prefect.settings import update_current_profile 1a
41from prefect.types._datetime import now as now_fn 1a
42from prefect.utilities import urls 1a
43from prefect.workers.utilities import ( 1a
44 get_available_work_pool_types,
45 get_default_base_job_template_for_infrastructure_type,
46)
48work_pool_app: PrefectTyper = PrefectTyper(name="work-pool", help="Manage work pools.") 1a
49app.add_typer(work_pool_app, aliases=["work-pool"]) 1a
52def set_work_pool_as_default(name: str) -> None: 1a
53 profile = update_current_profile({"PREFECT_DEFAULT_WORK_POOL_NAME": name})
54 app.console.print(
55 f"Set {name!r} as default work pool for profile {profile.name!r}\n",
56 style="green",
57 )
58 app.console.print(
59 (
60 "To change your default work pool, run:\n\n\t[blue]prefect config set"
61 " PREFECT_DEFAULT_WORK_POOL_NAME=<work-pool-name>[/]\n"
62 ),
63 )
66def has_provisioner_for_type(work_pool_type: str) -> bool: 1a
67 """
68 Check if there is a provisioner for the given work pool type.
70 Args:
71 work_pool_type (str): The type of the work pool.
73 Returns:
74 bool: True if a provisioner exists for the given type, False otherwise.
75 """
76 return work_pool_type in provisioners._provisioners
79@work_pool_app.command() 1a
80async def create( 1a
81 name: str = typer.Argument(..., help="The name of the work pool."),
82 base_job_template: typer.FileText = typer.Option(
83 None,
84 "--base-job-template",
85 help=(
86 "The path to a JSON file containing the base job template to use. If"
87 " unspecified, Prefect will use the default base job template for the given"
88 " worker type."
89 ),
90 ),
91 paused: bool = typer.Option(
92 False,
93 "--paused",
94 help="Whether or not to create the work pool in a paused state.",
95 ),
96 type: str = typer.Option(
97 None, "-t", "--type", help="The type of work pool to create."
98 ),
99 set_as_default: bool = typer.Option(
100 False,
101 "--set-as-default",
102 help=(
103 "Whether or not to use the created work pool as the local default for"
104 " deployment."
105 ),
106 ),
107 provision_infrastructure: bool = typer.Option(
108 False,
109 "--provision-infrastructure",
110 "--provision-infra",
111 help=(
112 "Whether or not to provision infrastructure for the work pool if supported"
113 " for the given work pool type."
114 ),
115 ),
116 overwrite: bool = typer.Option(
117 False,
118 "--overwrite",
119 help=("Whether or not to overwrite an existing work pool with the same name."),
120 ),
121):
122 """
123 Create a new work pool or update an existing one.
125 \b
126 Examples:
127 \b
128 Create a Kubernetes work pool in a paused state:
129 \b
130 $ prefect work-pool create "my-pool" --type kubernetes --paused
131 \b
132 Create a Docker work pool with a custom base job template:
133 \b
134 $ prefect work-pool create "my-pool" --type docker --base-job-template ./base-job-template.json
135 \b
136 Update an existing work pool:
137 \b
138 $ prefect work-pool create "existing-pool" --base-job-template ./base-job-template.json --overwrite
140 """
141 if not name.lower().strip("'\" "):
142 exit_with_error("Work pool name cannot be empty.")
143 async with get_client() as client:
144 try:
145 existing_pool = await client.read_work_pool(work_pool_name=name)
146 if not overwrite:
147 exit_with_error(
148 f"Work pool named {name!r} already exists. Use --overwrite to update it."
149 )
150 except ObjectNotFound:
151 existing_pool = None
153 if type is None and existing_pool is None:
154 async with get_collections_metadata_client() as collections_client:
155 if not is_interactive():
156 exit_with_error(
157 "When not using an interactive terminal, you must supply a"
158 " `--type` value."
159 )
160 worker_metadata = await collections_client.read_worker_metadata()
162 # Retrieve only push pools if provisioning infrastructure
163 data = [
164 worker
165 for collection in worker_metadata.values()
166 for worker in collection.values()
167 if provision_infrastructure
168 and has_provisioner_for_type(worker["type"])
169 or not provision_infrastructure
170 ]
171 worker = prompt_select_from_table(
172 app.console,
173 "What type of work pool infrastructure would you like to use?",
174 columns=[
175 {"header": "Infrastructure Type", "key": "display_name"},
176 {"header": "Description", "key": "description"},
177 ],
178 data=data,
179 table_kwargs={"show_lines": True},
180 )
181 type = worker["type"]
182 elif existing_pool:
183 type = existing_pool.type
185 available_work_pool_types = await get_available_work_pool_types()
186 if type not in available_work_pool_types:
187 exit_with_error(
188 f"Unknown work pool type {type!r}. "
189 "Please choose from"
190 f" {', '.join(available_work_pool_types)}."
191 )
193 if base_job_template is None:
194 template_contents = (
195 await get_default_base_job_template_for_infrastructure_type(type)
196 )
197 else:
198 template_contents = json.load(base_job_template)
200 if provision_infrastructure:
201 try:
202 provisioner = (
203 provisioners.get_infrastructure_provisioner_for_work_pool_type(type)
204 )
205 provisioner.console = app.console
206 template_contents = await provisioner.provision(
207 work_pool_name=name, base_job_template=template_contents
208 )
209 except ValueError as exc:
210 print(exc)
211 app.console.print(
212 (
213 "Automatic infrastructure provisioning is not supported for"
214 f" {type!r} work pools."
215 ),
216 style="yellow",
217 )
218 except RuntimeError as exc:
219 exit_with_error(f"Failed to provision infrastructure: {exc}")
221 try:
222 wp = WorkPoolCreate(
223 name=name,
224 type=type,
225 base_job_template=template_contents,
226 is_paused=paused,
227 )
228 work_pool = await client.create_work_pool(work_pool=wp, overwrite=overwrite)
229 action = "Updated" if overwrite and existing_pool else "Created"
230 app.console.print(
231 f"{action} work pool {work_pool.name!r}!\n", style="green"
232 )
233 if (
234 not work_pool.is_paused
235 and not work_pool.is_managed_pool
236 and not work_pool.is_push_pool
237 ):
238 app.console.print("To start a worker for this work pool, run:\n")
239 app.console.print(
240 f"\t[blue]prefect worker start --pool {work_pool.name}[/]\n"
241 )
242 if set_as_default:
243 set_work_pool_as_default(work_pool.name)
245 url = urls.url_for(work_pool)
246 pool_url = url if url else "<no dashboard available>"
248 app.console.print(
249 textwrap.dedent(
250 f"""
251 └── UUID: {work_pool.id}
252 └── Type: {work_pool.type}
253 └── Description: {work_pool.description}
254 └── Status: {work_pool.status.display_name}
255 └── URL: {pool_url}
256 """
257 ).strip(),
258 soft_wrap=True,
259 )
260 exit_with_success("")
261 except ObjectAlreadyExists:
262 exit_with_error(
263 f"Work pool named {name!r} already exists. Please use --overwrite to update it."
264 )
267@work_pool_app.command() 1a
268async def ls( 1a
269 verbose: bool = typer.Option(
270 False,
271 "--verbose",
272 "-v",
273 help="Show additional information about work pools.",
274 ),
275):
276 """
277 List work pools.
279 \b
280 Examples:
281 $ prefect work-pool ls
282 """
283 table = Table(
284 title="Work Pools", caption="(**) denotes a paused pool", caption_style="red"
285 )
286 table.add_column("Name", style="green", no_wrap=True)
287 table.add_column("Type", style="magenta", no_wrap=True)
288 table.add_column("ID", justify="right", style="cyan", no_wrap=True)
289 table.add_column("Concurrency Limit", style="blue", no_wrap=True)
290 if verbose:
291 table.add_column("Base Job Template", style="magenta", no_wrap=True)
293 async with get_client() as client:
294 pools = await client.read_work_pools()
296 def sort_by_created_key(q: WorkPool) -> datetime.timedelta:
297 assert q.created is not None
298 return now_fn("UTC") - q.created
300 for pool in sorted(pools, key=sort_by_created_key):
301 row = [
302 f"{pool.name} [red](**)" if pool.is_paused else pool.name,
303 str(pool.type),
304 str(pool.id),
305 (
306 f"[red]{pool.concurrency_limit}"
307 if pool.concurrency_limit is not None
308 else "[blue]None"
309 ),
310 ]
311 if verbose:
312 row.append(str(pool.base_job_template))
313 table.add_row(*row)
315 app.console.print(table)
318@work_pool_app.command() 1a
319async def inspect( 1a
320 name: str = typer.Argument(..., help="The name of the work pool to inspect."),
321 output: Optional[str] = typer.Option(
322 None,
323 "--output",
324 "-o",
325 help="Specify an output format. Currently supports: json",
326 ),
327):
328 """
329 Inspect a work pool.
331 \b
332 Examples:
333 $ prefect work-pool inspect "my-pool"
334 $ prefect work-pool inspect "my-pool" --output json
336 """
337 if output and output.lower() != "json":
338 exit_with_error("Only 'json' output format is supported.")
340 async with get_client() as client:
341 try:
342 pool = await client.read_work_pool(work_pool_name=name)
343 if output and output.lower() == "json":
344 pool_json = pool.model_dump(mode="json")
345 json_output = orjson.dumps(
346 pool_json, option=orjson.OPT_INDENT_2
347 ).decode()
348 app.console.print(json_output)
349 else:
350 app.console.print(Pretty(pool))
351 except ObjectNotFound:
352 exit_with_error(f"Work pool {name!r} not found!")
355@work_pool_app.command() 1a
356async def pause( 1a
357 name: str = typer.Argument(..., help="The name of the work pool to pause."),
358):
359 """
360 Pause a work pool.
362 \b
363 Examples:
364 $ prefect work-pool pause "my-pool"
366 """
367 async with get_client() as client:
368 try:
369 await client.update_work_pool(
370 work_pool_name=name,
371 work_pool=WorkPoolUpdate(
372 is_paused=True,
373 ),
374 )
375 except ObjectNotFound as exc:
376 exit_with_error(exc)
378 exit_with_success(f"Paused work pool {name!r}")
381@work_pool_app.command() 1a
382async def resume( 1a
383 name: str = typer.Argument(..., help="The name of the work pool to resume."),
384):
385 """
386 Resume a work pool.
388 \b
389 Examples:
390 $ prefect work-pool resume "my-pool"
392 """
393 async with get_client() as client:
394 try:
395 await client.update_work_pool(
396 work_pool_name=name,
397 work_pool=WorkPoolUpdate(
398 is_paused=False,
399 ),
400 )
401 except ObjectNotFound as exc:
402 exit_with_error(exc)
404 exit_with_success(f"Resumed work pool {name!r}")
407@work_pool_app.command() 1a
408async def update( 1a
409 name: str = typer.Argument(..., help="The name of the work pool to update."),
410 base_job_template: typer.FileText = typer.Option(
411 None,
412 "--base-job-template",
413 help=(
414 "The path to a JSON file containing the base job template to use. If"
415 " unspecified, Prefect will use the default base job template for the given"
416 " worker type. If None, the base job template will not be modified."
417 ),
418 ),
419 concurrency_limit: int = typer.Option(
420 None,
421 "--concurrency-limit",
422 help=(
423 "The concurrency limit for the work pool. If None, the concurrency limit"
424 " will not be modified."
425 ),
426 ),
427 description: str = typer.Option(
428 None,
429 "--description",
430 help=(
431 "The description for the work pool. If None, the description will not be"
432 " modified."
433 ),
434 ),
435):
436 """
437 Update a work pool.
439 \b
440 Examples:
441 $ prefect work-pool update "my-pool"
443 """
444 wp = WorkPoolUpdate()
445 if base_job_template:
446 wp.base_job_template = json.load(base_job_template)
447 if concurrency_limit:
448 wp.concurrency_limit = concurrency_limit
449 if description:
450 wp.description = description
452 async with get_client() as client:
453 try:
454 await client.update_work_pool(
455 work_pool_name=name,
456 work_pool=wp,
457 )
458 except ObjectNotFound:
459 exit_with_error(f"Work pool named {name!r} does not exist.")
461 exit_with_success(f"Updated work pool {name!r}")
464@work_pool_app.command(aliases=["provision-infra"]) 1a
465async def provision_infrastructure( 1a
466 name: str = typer.Argument(
467 ..., help="The name of the work pool to provision infrastructure for."
468 ),
469):
470 """
471 Provision infrastructure for a work pool.
473 \b
474 Examples:
475 $ prefect work-pool provision-infrastructure "my-pool"
477 $ prefect work-pool provision-infra "my-pool"
479 """
480 async with get_client() as client:
481 try:
482 work_pool = await client.read_work_pool(work_pool_name=name)
483 if not work_pool.is_push_pool:
484 exit_with_error(
485 f"Work pool {name!r} is not a push pool type. "
486 "Please try provisioning infrastructure for a push pool."
487 )
488 except ObjectNotFound:
489 exit_with_error(f"Work pool {name!r} does not exist.")
490 except Exception as exc:
491 exit_with_error(f"Failed to read work pool {name!r}: {exc}")
493 try:
494 provisioner = (
495 provisioners.get_infrastructure_provisioner_for_work_pool_type(
496 work_pool.type
497 )
498 )
499 provisioner.console = app.console
500 new_base_job_template = await provisioner.provision(
501 work_pool_name=name, base_job_template=work_pool.base_job_template
502 )
504 await client.update_work_pool(
505 work_pool_name=name,
506 work_pool=WorkPoolUpdate(
507 base_job_template=new_base_job_template,
508 ),
509 )
511 except ValueError as exc:
512 app.console.print(f"Error: {exc}")
513 app.console.print(
514 (
515 "Automatic infrastructure provisioning is not supported for"
516 f" {work_pool.type!r} work pools."
517 ),
518 style="yellow",
519 )
520 except RuntimeError as exc:
521 exit_with_error(
522 f"Failed to provision infrastructure for '{name}' work pool: {exc}"
523 )
526@work_pool_app.command() 1a
527async def delete( 1a
528 name: str = typer.Argument(..., help="The name of the work pool to delete."),
529):
530 """
531 Delete a work pool.
533 \b
534 Examples:
535 $ prefect work-pool delete "my-pool"
537 """
538 async with get_client() as client:
539 try:
540 work_pool = await client.read_work_pool(work_pool_name=name)
541 if is_interactive() and not typer.confirm(
542 (
543 f"Are you sure you want to delete work pool with name {work_pool.name!r}?"
544 ),
545 default=False,
546 ):
547 exit_with_error("Deletion aborted.")
548 await client.delete_work_pool(work_pool_name=name)
549 except ObjectNotFound:
550 exit_with_error(f"Work pool {name!r} does not exist.")
552 exit_with_success(f"Deleted work pool {name!r}")
555@work_pool_app.command() 1a
556async def set_concurrency_limit( 1a
557 name: str = typer.Argument(..., help="The name of the work pool to update."),
558 concurrency_limit: int = typer.Argument(
559 ..., help="The new concurrency limit for the work pool."
560 ),
561):
562 """
563 Set the concurrency limit for a work pool.
565 \b
566 Examples:
567 $ prefect work-pool set-concurrency-limit "my-pool" 10
569 """
570 async with get_client() as client:
571 try:
572 await client.update_work_pool(
573 work_pool_name=name,
574 work_pool=WorkPoolUpdate(
575 concurrency_limit=concurrency_limit,
576 ),
577 )
578 except ObjectNotFound as exc:
579 exit_with_error(exc)
581 exit_with_success(
582 f"Set concurrency limit for work pool {name!r} to {concurrency_limit}"
583 )
586@work_pool_app.command() 1a
587async def clear_concurrency_limit( 1a
588 name: str = typer.Argument(..., help="The name of the work pool to update."),
589):
590 """
591 Clear the concurrency limit for a work pool.
593 \b
594 Examples:
595 $ prefect work-pool clear-concurrency-limit "my-pool"
597 """
598 async with get_client() as client:
599 try:
600 await client.update_work_pool(
601 work_pool_name=name,
602 work_pool=WorkPoolUpdate(
603 concurrency_limit=None,
604 ),
605 )
606 except ObjectNotFound as exc:
607 exit_with_error(exc)
609 exit_with_success(f"Cleared concurrency limit for work pool {name!r}")
612@work_pool_app.command() 1a
613async def get_default_base_job_template( 1a
614 type: str = typer.Option(
615 None,
616 "-t",
617 "--type",
618 help="The type of work pool for which to get the default base job template.",
619 ),
620 file: str = typer.Option(
621 None, "-f", "--file", help="If set, write the output to a file."
622 ),
623):
624 """
625 Get the default base job template for a given work pool type.
627 \b
628 Examples:
629 $ prefect work-pool get-default-base-job-template --type kubernetes
630 """
631 base_job_template = await get_default_base_job_template_for_infrastructure_type(
632 type
633 )
634 if base_job_template is None:
635 exit_with_error(
636 f"Unknown work pool type {type!r}. "
637 "Please choose from"
638 f" {', '.join(await get_available_work_pool_types())}."
639 )
641 if file is None:
642 print(json.dumps(base_job_template, indent=2))
643 else:
644 with open(file, mode="w") as f:
645 json.dump(base_job_template, fp=f, indent=2)
648@work_pool_app.command() 1a
649async def preview( 1a
650 name: str = typer.Argument(None, help="The name or ID of the work pool to preview"),
651 hours: int = typer.Option(
652 None,
653 "-h",
654 "--hours",
655 help="The number of hours to look ahead; defaults to 1 hour",
656 ),
657):
658 """
659 Preview the work pool's scheduled work for all queues.
661 \b
662 Examples:
663 $ prefect work-pool preview "my-pool" --hours 24
665 """
666 if hours is None:
667 hours = 1
669 async with get_client() as client:
670 try:
671 responses = await client.get_scheduled_flow_runs_for_work_pool(
672 work_pool_name=name,
673 )
674 except ObjectNotFound as exc:
675 exit_with_error(exc)
677 runs = [response.flow_run for response in responses]
678 table = Table(caption="(**) denotes a late run", caption_style="red")
680 table.add_column(
681 "Scheduled Start Time", justify="left", style="yellow", no_wrap=True
682 )
683 table.add_column("Run ID", justify="left", style="cyan", no_wrap=True)
684 table.add_column("Name", style="green", no_wrap=True)
685 table.add_column("Deployment ID", style="blue", no_wrap=True)
687 now = now_fn("UTC")
689 def sort_by_created_key(r: FlowRun) -> datetime.timedelta:
690 assert r.created is not None
691 return now - r.created
693 for run in sorted(runs, key=sort_by_created_key):
694 table.add_row(
695 (
696 f"{run.expected_start_time} [red](**)"
697 if run.expected_start_time and run.expected_start_time < now
698 else f"{run.expected_start_time}"
699 ),
700 str(run.id),
701 run.name,
702 str(run.deployment_id),
703 )
705 if runs:
706 app.console.print(table)
707 else:
708 app.console.print(
709 (
710 "No runs found - try increasing how far into the future you preview"
711 " with the --hours flag"
712 ),
713 style="yellow",
714 )
717# --------------------------------------------------------------------------
718# Work Pool Storage Configuration
719# --------------------------------------------------------------------------
721work_pool_storage_app: PrefectTyper = PrefectTyper( 1a
722 name="storage", help="EXPERIMENTAL: Manage work pool storage."
723)
724work_pool_app.add_typer(work_pool_storage_app) 1a
727def _determine_storage_type(storage_config: WorkPoolStorageConfiguration) -> str | None: 1a
728 if storage_config.bundle_upload_step is None:
729 return None
730 if storage_config.bundle_upload_step and any(
731 "prefect_aws" in step for step in storage_config.bundle_upload_step.keys()
732 ):
733 return "S3"
734 if storage_config.bundle_upload_step and any(
735 "prefect_gcp" in step for step in storage_config.bundle_upload_step.keys()
736 ):
737 return "GCS"
738 if storage_config.bundle_upload_step and any(
739 "prefect_azure" in step for step in storage_config.bundle_upload_step.keys()
740 ):
741 return "Azure Blob Storage"
742 return "Unknown"
745@work_pool_storage_app.command(name="inspect") 1a
746async def storage_inspect( 1a
747 work_pool_name: Annotated[
748 str,
749 typer.Argument(
750 ..., help="The name of the work pool to display storage configuration for."
751 ),
752 ],
753 output: Optional[str] = typer.Option(
754 None,
755 "--output",
756 "-o",
757 help="Specify an output format. Currently supports: json",
758 ),
759):
760 """
761 EXPERIMENTAL: Inspect the storage configuration for a work pool.
763 Examples:
764 $ prefect work-pool storage inspect "my-pool"
765 $ prefect work-pool storage inspect "my-pool" --output json
766 """
767 if output and output.lower() != "json":
768 exit_with_error("Only 'json' output format is supported.")
770 async with get_client() as client:
771 try:
772 work_pool = await client.read_work_pool(work_pool_name=work_pool_name)
773 from rich.panel import Panel
774 from rich.table import Table
776 storage_table = Table(show_header=True, header_style="bold")
777 storage_table.add_column("Setting", style="cyan")
778 storage_table.add_column("Value")
780 storage_type = _determine_storage_type(work_pool.storage_configuration)
781 if not storage_type:
782 if output and output.lower() == "json":
783 app.console.print("{}")
784 else:
785 app.console.print(
786 f"No storage configuration found for work pool {work_pool_name!r}",
787 style="yellow",
788 )
789 return
791 if output and output.lower() == "json":
792 storage_data = {"type": storage_type}
793 if work_pool.storage_configuration.bundle_upload_step is not None:
794 fqn = list(
795 work_pool.storage_configuration.bundle_upload_step.keys()
796 )[0]
797 config_values = work_pool.storage_configuration.bundle_upload_step[
798 fqn
799 ]
800 storage_data.update(config_values)
802 json_output = orjson.dumps(
803 storage_data, option=orjson.OPT_INDENT_2
804 ).decode()
805 app.console.print(json_output)
806 else:
807 storage_table.add_row("type", storage_type)
809 # Add other storage settings, filtering out None values
810 if work_pool.storage_configuration.bundle_upload_step is not None:
811 fqn = list(
812 work_pool.storage_configuration.bundle_upload_step.keys()
813 )[0]
814 config_values = work_pool.storage_configuration.bundle_upload_step[
815 fqn
816 ]
817 for key, value in config_values.items():
818 storage_table.add_row(key, str(value))
820 panel = Panel(
821 storage_table,
822 title=f"[bold]Storage Configuration for {work_pool_name}[/bold]",
823 expand=False,
824 )
826 app.console.print(panel)
828 except ObjectNotFound:
829 exit_with_error(f"Work pool {work_pool_name!r} does not exist.")
832async def _create_or_update_result_storage_block( 1a
833 client: PrefectClient,
834 block_document_name: str,
835 block_document_data: dict[str, Any],
836 block_type_slug: str,
837 missing_block_definition_error: str,
838) -> BlockDocument:
839 try:
840 existing_block_document = await client.read_block_document_by_name(
841 name=block_document_name, block_type_slug=block_type_slug
842 )
843 except ObjectNotFound:
844 existing_block_document = None
846 if existing_block_document is not None:
847 await client.update_block_document(
848 block_document_id=existing_block_document.id,
849 block_document=BlockDocumentUpdate(
850 data=block_document_data,
851 ),
852 )
853 block_document = existing_block_document
854 else:
855 try:
856 block_type = await client.read_block_type_by_slug(slug=block_type_slug)
857 block_schema = await client.get_most_recent_block_schema_for_block_type(
858 block_type_id=block_type.id
859 )
860 except ObjectNotFound:
861 exit_with_error(missing_block_definition_error)
862 else:
863 if block_schema is None:
864 exit_with_error(missing_block_definition_error)
866 block_document = await client.create_block_document(
867 block_document=BlockDocumentCreate(
868 name=block_document_name,
869 block_type_id=block_type.id,
870 block_schema_id=block_schema.id,
871 data=block_document_data,
872 )
873 )
875 return block_document
878work_pool_storage_configure_app: PrefectTyper = PrefectTyper( 1a
879 name="configure", help="EXPERIMENTAL: Configure work pool storage."
880)
881work_pool_storage_app.add_typer(work_pool_storage_configure_app) 1a
884@work_pool_storage_configure_app.command() 1a
885async def s3( 1a
886 work_pool_name: str = typer.Argument(
887 ...,
888 help="The name of the work pool to configure storage for.",
889 show_default=False,
890 ),
891 bucket: str = typer.Option(
892 ...,
893 "--bucket",
894 help="The name of the S3 bucket to use.",
895 show_default=False,
896 prompt="Enter the name of the S3 bucket to use",
897 ),
898 credentials_block_name: str = typer.Option(
899 ...,
900 "--aws-credentials-block-name",
901 help="The name of the AWS credentials block to use.",
902 show_default=False,
903 prompt="Enter the name of the AWS credentials block to use",
904 ),
905):
906 """
907 EXPERIMENTAL: Configure AWS S3 storage for a work pool.
909 \b
910 Examples:
911 $ prefect work-pool storage configure s3 "my-pool" --bucket my-bucket --aws-credentials-block-name my-credentials
912 """
913 # TODO: Allow passing in AWS keys and creating a block for the user.
914 async with get_client() as client:
915 try:
916 credentials_block_document = await client.read_block_document_by_name(
917 name=credentials_block_name, block_type_slug="aws-credentials"
918 )
919 except ObjectNotFound:
920 exit_with_error(
921 f"AWS credentials block {credentials_block_name!r} does not exist. Please create one using `prefect block create aws-credentials`."
922 )
924 result_storage_block_document_name = f"default-{work_pool_name}-result-storage"
925 block_data = {
926 "bucket_name": bucket,
927 "bucket_folder": "results",
928 "credentials": {
929 "$ref": {"block_document_id": credentials_block_document.id}
930 },
931 }
933 block_document = await _create_or_update_result_storage_block(
934 client=client,
935 block_document_name=result_storage_block_document_name,
936 block_document_data=block_data,
937 block_type_slug="s3-bucket",
938 missing_block_definition_error="S3 bucket block definition does not exist server-side. Please install `prefect-aws` and run `prefect blocks register -m prefect_aws`.",
939 )
941 try:
942 await client.update_work_pool(
943 work_pool_name=work_pool_name,
944 work_pool=WorkPoolUpdate(
945 storage_configuration=WorkPoolStorageConfiguration(
946 bundle_upload_step={
947 "prefect_aws.experimental.bundles.upload": {
948 "requires": "prefect-aws",
949 "bucket": bucket,
950 "aws_credentials_block_name": credentials_block_name,
951 }
952 },
953 bundle_execution_step={
954 "prefect_aws.experimental.bundles.execute": {
955 "requires": "prefect-aws",
956 "bucket": bucket,
957 "aws_credentials_block_name": credentials_block_name,
958 }
959 },
960 default_result_storage_block_id=block_document.id,
961 ),
962 ),
963 )
964 except ObjectNotFound:
965 exit_with_error(f"Work pool {work_pool_name!r} does not exist.")
967 exit_with_success(f"Configured S3 storage for work pool {work_pool_name!r}")
970@work_pool_storage_configure_app.command() 1a
971async def gcs( 1a
972 work_pool_name: str = typer.Argument(
973 ...,
974 help="The name of the work pool to configure storage for.",
975 show_default=False,
976 ),
977 bucket: str = typer.Option(
978 ...,
979 "--bucket",
980 help="The name of the Google Cloud Storage bucket to use.",
981 show_default=False,
982 prompt="Enter the name of the Google Cloud Storage bucket to use",
983 ),
984 credentials_block_name: str = typer.Option(
985 ...,
986 "--gcp-credentials-block-name",
987 help="The name of the Google Cloud credentials block to use.",
988 show_default=False,
989 prompt="Enter the name of the Google Cloud credentials block to use",
990 ),
991):
992 """
993 EXPERIMENTAL: Configure Google Cloud storage for a work pool.
995 \b
996 Examples:
997 $ prefect work-pool storage configure gcs "my-pool" --bucket my-bucket --gcp-credentials-block-name my-credentials
998 """
999 async with get_client() as client:
1000 try:
1001 credentials_block_document = await client.read_block_document_by_name(
1002 name=credentials_block_name, block_type_slug="gcp-credentials"
1003 )
1004 except ObjectNotFound:
1005 exit_with_error(
1006 f"GCS credentials block {credentials_block_name!r} does not exist. Please create one using `prefect block create gcp-credentials`."
1007 )
1009 result_storage_block_document_name = f"default-{work_pool_name}-result-storage"
1010 block_data = {
1011 "bucket_name": bucket,
1012 "bucket_folder": "results",
1013 "credentials": {
1014 "$ref": {"block_document_id": credentials_block_document.id}
1015 },
1016 }
1018 block_document = await _create_or_update_result_storage_block(
1019 client=client,
1020 block_document_name=result_storage_block_document_name,
1021 block_document_data=block_data,
1022 block_type_slug="gcs-bucket",
1023 missing_block_definition_error="GCS bucket block definition does not exist server-side. Please install `prefect-gcp` and run `prefect blocks register -m prefect_gcp`.",
1024 )
1026 try:
1027 await client.update_work_pool(
1028 work_pool_name=work_pool_name,
1029 work_pool=WorkPoolUpdate(
1030 storage_configuration=WorkPoolStorageConfiguration(
1031 bundle_upload_step={
1032 "prefect_gcp.experimental.bundles.upload": {
1033 "requires": "prefect-gcp",
1034 "bucket": bucket,
1035 "gcp_credentials_block_name": credentials_block_name,
1036 }
1037 },
1038 bundle_execution_step={
1039 "prefect_gcp.experimental.bundles.execute": {
1040 "requires": "prefect-gcp",
1041 "bucket": bucket,
1042 "gcp_credentials_block_name": credentials_block_name,
1043 }
1044 },
1045 default_result_storage_block_id=block_document.id,
1046 ),
1047 ),
1048 )
1049 except ObjectNotFound:
1050 exit_with_error(f"Work pool {work_pool_name!r} does not exist.")
1052 exit_with_success(f"Configured GCS storage for work pool {work_pool_name!r}")
1055@work_pool_storage_configure_app.command() 1a
1056async def azure_blob_storage( 1a
1057 work_pool_name: str = typer.Argument(
1058 ...,
1059 help="The name of the work pool to configure storage for.",
1060 show_default=False,
1061 ),
1062 container: str = typer.Option(
1063 ...,
1064 "--container",
1065 help="The name of the Azure Blob Storage container to use.",
1066 show_default=False,
1067 prompt="Enter the name of the Azure Blob Storage container to use",
1068 ),
1069 credentials_block_name: str = typer.Option(
1070 ...,
1071 "--azure-blob-storage-credentials-block-name",
1072 help="The name of the Azure Blob Storage credentials block to use.",
1073 show_default=False,
1074 prompt="Enter the name of the Azure Blob Storage credentials block to use",
1075 ),
1076):
1077 """
1078 EXPERIMENTAL: Configure Azure Blob Storage for a work pool.
1080 \b
1081 Examples:
1082 $ prefect work-pool storage configure azure-blob-storage "my-pool" --container my-container --azure-blob-storage-credentials-block-name my-credentials
1083 """
1084 async with get_client() as client:
1085 try:
1086 credentials_block_document = await client.read_block_document_by_name(
1087 name=credentials_block_name,
1088 block_type_slug="azure-blob-storage-credentials",
1089 )
1090 except ObjectNotFound:
1091 exit_with_error(
1092 f"Azure Blob Storage credentials block {credentials_block_name!r} does not exist. Please create one using `prefect block create azure-blob-storage-credentials`."
1093 )
1095 result_storage_block_document_name = f"default-{work_pool_name}-result-storage"
1096 block_data = {
1097 "container_name": container,
1098 "credentials": {
1099 "$ref": {"block_document_id": credentials_block_document.id}
1100 },
1101 }
1103 block_document = await _create_or_update_result_storage_block(
1104 client=client,
1105 block_document_name=result_storage_block_document_name,
1106 block_document_data=block_data,
1107 block_type_slug="azure-blob-storage-container",
1108 missing_block_definition_error="Azure Blob Storage container block definition does not exist server-side. Please install `prefect-azure[storage]` and run `prefect blocks register -m prefect_azure`.",
1109 )
1111 try:
1112 await client.update_work_pool(
1113 work_pool_name=work_pool_name,
1114 work_pool=WorkPoolUpdate(
1115 storage_configuration=WorkPoolStorageConfiguration(
1116 bundle_upload_step={
1117 "prefect_azure.experimental.bundles.upload": {
1118 "requires": "prefect-azure",
1119 "container": container,
1120 "azure_blob_storage_credentials_block_name": credentials_block_name,
1121 }
1122 },
1123 bundle_execution_step={
1124 "prefect_azure.experimental.bundles.execute": {
1125 "requires": "prefect-azure",
1126 "container": container,
1127 "azure_blob_storage_credentials_block_name": credentials_block_name,
1128 }
1129 },
1130 default_result_storage_block_id=block_document.id,
1131 ),
1132 ),
1133 )
1134 except ObjectNotFound:
1135 exit_with_error(f"Work pool {work_pool_name!r} does not exist.")
1137 exit_with_success(
1138 f"Configured Azure Blob Storage for work pool {work_pool_name!r}"
1139 )