Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/_prompts.py: 20%
329 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Utilities for prompting the user for input
3"""
5from __future__ import annotations 1a
7import asyncio 1a
8import os 1a
9import shutil 1a
10from datetime import timedelta 1a
11from getpass import GetPassWarning 1a
12from typing import ( 1a
13 TYPE_CHECKING,
14 Any,
15 Coroutine,
16 Optional,
17 TypeVar,
18 Union,
19 overload,
20)
22import anyio 1a
23import readchar 1a
24from rich.console import Console, Group, RenderableType 1a
25from rich.live import Live 1a
26from rich.progress import Progress, SpinnerColumn, TextColumn 1a
27from rich.prompt import Confirm, InvalidResponse, Prompt, PromptBase 1a
28from rich.table import Table 1a
30from prefect._internal.installation import ainstall_packages 1a
31from prefect.cli._utilities import exit_with_error 1a
32from prefect.client.collections import get_collections_metadata_client 1a
33from prefect.client.schemas.actions import ( 1a
34 BlockDocumentCreate,
35 WorkPoolCreate,
36)
37from prefect.client.schemas.schedules import ( 1a
38 CronSchedule,
39 IntervalSchedule,
40 RRuleSchedule,
41 is_valid_timezone,
42)
43from prefect.client.utilities import client_injector 1a
44from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a
45from prefect.flows import load_flow_from_entrypoint 1a
46from prefect.logging.loggers import get_logger 1a
47from prefect.utilities import urls 1a
48from prefect.utilities._ast import find_flow_functions_in_file 1a
49from prefect.utilities._git import get_git_remote_origin_url 1a
50from prefect.utilities.filesystem import filter_files 1a
51from prefect.utilities.slugify import slugify 1a
53if TYPE_CHECKING: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true1a
54 from prefect.client.orchestration import PrefectClient
56T = TypeVar("T", bound=RenderableType) 1a
58STORAGE_PROVIDER_TO_CREDS_BLOCK = { 1a
59 "s3": "aws-credentials",
60 "gcs": "gcp-credentials",
61 "azure_blob_storage": "azure-blob-storage-credentials",
62}
64REQUIRED_FIELDS_FOR_CREDS_BLOCK = { 1a
65 "aws-credentials": ["aws_access_key_id", "aws_secret_access_key"],
66 "gcp-credentials": ["project", "service_account_file"],
67 "azure-blob-storage-credentials": ["account_url", "connection_string"],
68}
71logger = get_logger(__name__) 1a
74async def search_for_flow_functions( 1a
75 directory: str = ".", exclude_patterns: list[str] | None = None
76) -> list[dict[str, str]]:
77 """
78 Search for flow functions in the provided directory. If no directory is provided,
79 the current working directory is used.
81 Args:
82 directory: The directory to search in
83 exclude_patterns: List of patterns to exclude from the search, defaults to
84 ["**/site-packages/**"]
86 Returns:
87 List[Dict]: the flow name, function name, and filepath of all flow functions found
88 """
89 path = anyio.Path(directory)
90 exclude_patterns = exclude_patterns or ["**/site-packages/**"]
91 coros: list[Coroutine[list[dict[str, str]], Any, Any]] = []
93 try:
94 for file in filter_files(
95 root=str(path),
96 ignore_patterns=["*", "!**/*.py", *exclude_patterns],
97 include_dirs=False,
98 ):
99 coros.append(find_flow_functions_in_file(anyio.Path(str(path / file))))
101 except (PermissionError, OSError) as e:
102 logger.error(f"Error searching for flow functions: {e}")
103 return []
105 return [fn for file_fns in await asyncio.gather(*coros) for fn in file_fns]
108def prompt(message: str, **kwargs: Any) -> str: 1a
109 """Utility to prompt the user for input with consistent styling"""
110 return Prompt.ask(f"[bold][green]?[/] {message}[/]", **kwargs)
113def confirm(message: str, **kwargs: Any) -> bool: 1a
114 """Utility to prompt the user for confirmation with consistent styling"""
115 return Confirm.ask(f"[bold][green]?[/] {message}[/]", **kwargs)
118@overload 1a
119def prompt_select_from_table( 119 ↛ exitline 119 didn't return from function 'prompt_select_from_table' because 1a
120 console: Console,
121 prompt: str,
122 columns: list[dict[str, str]],
123 data: list[dict[str, T]],
124 table_kwargs: dict[str, Any] | None = None,
125 opt_out_message: None = None,
126 opt_out_response: Any = None,
127) -> dict[str, T]: ...
130@overload 1a
131def prompt_select_from_table( 131 ↛ exitline 131 didn't return from function 'prompt_select_from_table' because 1a
132 console: Console,
133 prompt: str,
134 columns: list[dict[str, str]],
135 data: list[dict[str, T]],
136 table_kwargs: dict[str, Any] | None = None,
137 opt_out_message: str = "",
138 opt_out_response: Any = None,
139) -> dict[str, T] | None: ...
142def prompt_select_from_table( 1a
143 console: Console,
144 prompt: str,
145 columns: list[dict[str, str]],
146 data: list[dict[str, T]],
147 table_kwargs: dict[str, Any] | None = None,
148 opt_out_message: str | None = None,
149 opt_out_response: Any = None,
150) -> dict[str, T] | None:
151 """
152 Given a list of columns and some data, display options to user in a table
153 and prompt them to select one.
155 Args:
156 prompt: A prompt to display to the user before the table.
157 columns: A list of dicts with keys `header` and `key` to display in
158 the table. The `header` value will be displayed in the table header
159 and the `key` value will be used to lookup the value for each row
160 in the provided data.
161 data: A list of dicts with keys corresponding to the `key` values in
162 the `columns` argument.
163 table_kwargs: Additional kwargs to pass to the `rich.Table` constructor.
164 Returns:
165 dict: Data representation of the selected row
166 """
167 current_idx = 0
168 selected_row = None
169 table_kwargs = table_kwargs or {}
170 visible_rows = min(10, console.height - 4) # Adjust number of visible rows
171 scroll_offset = 0
172 total_options = len(data) + (1 if opt_out_message else 0)
174 def build_table() -> Union[Table, Group]:
175 nonlocal scroll_offset
176 table = Table(**table_kwargs)
177 table.add_column()
178 for column in columns:
179 table.add_column(column.get("header", ""))
181 # Adjust scroll_offset if necessary
182 if current_idx < scroll_offset:
183 scroll_offset = current_idx
184 elif current_idx >= scroll_offset + visible_rows:
185 scroll_offset = current_idx - visible_rows + 1
187 for i, item in enumerate(data[scroll_offset : scroll_offset + visible_rows]):
188 row = [item.get(column.get("key", "")) for column in columns]
189 if i + scroll_offset == current_idx:
190 table.add_row(
191 "[bold][blue]>", *[f"[bold][blue]{cell}[/]" for cell in row]
192 )
193 else:
194 table.add_row(" ", *row)
196 if opt_out_message:
197 opt_out_row = [""] * (len(columns) - 1) + [opt_out_message]
198 if current_idx == len(data):
199 table.add_row(
200 "[bold][blue]>", *[f"[bold][blue]{cell}[/]" for cell in opt_out_row]
201 )
202 else:
203 table.add_row(" ", *opt_out_row)
205 return table
207 with Live(build_table(), auto_refresh=False, console=console) as live:
208 instructions_message = (
209 f"[bold][green]?[/] {prompt} [bright_blue][Use arrows to move; enter to"
210 " select"
211 )
212 if opt_out_message:
213 instructions_message += "; n to select none"
214 instructions_message += "]"
215 live.console.print(instructions_message)
216 while selected_row is None:
217 key = readchar.readkey()
219 if key == readchar.key.UP:
220 current_idx = (current_idx - 1) % total_options
221 elif key == readchar.key.DOWN:
222 current_idx = (current_idx + 1) % total_options
223 elif key == readchar.key.CTRL_C:
224 # gracefully exit with no message
225 exit_with_error("")
226 elif key == readchar.key.ENTER or key == readchar.key.CR:
227 if current_idx == len(data):
228 return opt_out_response
229 else:
230 selected_row = data[current_idx]
231 elif key == "n" and opt_out_message:
232 return opt_out_response
234 live.update(build_table(), refresh=True)
236 return selected_row
239# Interval schedule prompting utilities
240class IntervalValuePrompt(PromptBase[timedelta]): 1a
241 response_type: type[timedelta] = timedelta 1a
242 validate_error_message = ( 1a
243 "[prompt.invalid]Please enter a valid interval denoted in seconds"
244 )
246 def process_response(self, value: str) -> timedelta: 1a
247 try:
248 int_value = int(value)
249 if int_value <= 0:
250 raise InvalidResponse("[prompt.invalid]Interval must be greater than 0")
251 return timedelta(seconds=int_value)
252 except ValueError:
253 raise InvalidResponse(self.validate_error_message)
256def prompt_interval_schedule(console: Console) -> IntervalSchedule: 1a
257 """
258 Prompt the user for an interval in seconds.
259 """
260 default_seconds = 3600
261 default_duration = timedelta(seconds=default_seconds)
263 # We show the default in the prompt message rather than enabling `show_default=True` here because `rich` displays timedeltas in hours
264 # rather than seconds, which would confuse users since we ask them to enter the interval in seconds.
265 interval = IntervalValuePrompt.ask(
266 f"[bold][green]?[/] Seconds between scheduled runs ({default_seconds})",
267 console=console,
268 default=default_duration,
269 show_default=False,
270 )
272 return IntervalSchedule(interval=interval)
275# Cron schedule prompting utilities
278class CronStringPrompt(PromptBase[str]): 1a
279 response_type: type[str] = str 1a
280 validate_error_message = "[prompt.invalid]Please enter a valid cron string" 1a
282 def process_response(self, value: str) -> str: 1a
283 try:
284 CronSchedule.valid_cron_string(value)
285 return value
286 except ValueError:
287 raise InvalidResponse(self.validate_error_message)
290class CronTimezonePrompt(PromptBase[str]): 1a
291 response_type: type[str] = str 1a
292 validate_error_message = "[prompt.invalid]Please enter a valid timezone." 1a
294 def process_response(self, value: str) -> str: 1a
295 try:
296 CronSchedule.valid_timezone(value)
297 return value
298 except ValueError:
299 raise InvalidResponse(self.validate_error_message)
302def prompt_cron_schedule(console: Console) -> CronSchedule: 1a
303 """
304 Prompt the user for a cron string and timezone.
305 """
306 cron = CronStringPrompt.ask(
307 "[bold][green]?[/] Cron string",
308 console=console,
309 default="0 0 * * *",
310 )
311 timezone = CronTimezonePrompt.ask(
312 "[bold][green]?[/] Timezone", console=console, default="UTC"
313 )
314 return CronSchedule(cron=cron, timezone=timezone)
317# RRule schedule prompting utilities
320class RRuleStringPrompt(PromptBase[str]): 1a
321 response_type: type[str] = str 1a
322 validate_error_message = "[prompt.invalid]Please enter a valid RRule string" 1a
324 def process_response(self, value: str) -> str: 1a
325 try:
326 RRuleSchedule.validate_rrule_str(value)
327 return value
328 except ValueError:
329 raise InvalidResponse(self.validate_error_message)
332class RRuleTimezonePrompt(PromptBase[str]): 1a
333 response_type: type[str] = str 1a
334 validate_error_message = "[prompt.invalid]Please enter a valid timezone." 1a
336 def process_response(self, value: str) -> str: 1a
337 try:
338 is_valid_timezone(value)
339 return value
340 except ValueError:
341 raise InvalidResponse(self.validate_error_message)
344def prompt_rrule_schedule(console: Console) -> RRuleSchedule: 1a
345 """
346 Prompts the user to enter an RRule string and timezone.
347 """
348 rrule = RRuleStringPrompt.ask(
349 "[bold][green]?[/] RRule string",
350 console=console,
351 default="RRULE:FREQ=DAILY;INTERVAL=1",
352 )
353 timezone = CronTimezonePrompt.ask(
354 "[bold][green]?[/] Timezone", console=console, default="UTC"
355 )
356 return RRuleSchedule(rrule=rrule, timezone=timezone)
359# Schedule type prompting utilities
362def prompt_schedule_type(console: Console) -> str: 1a
363 """
364 Prompts the user to select a schedule type from a list of options.
365 """
366 selection = prompt_select_from_table(
367 console,
368 "What type of schedule would you like to use?",
369 [
370 {"header": "Schedule Type", "key": "type"},
371 {"header": "Description", "key": "description"},
372 ],
373 [
374 {
375 "type": "Interval",
376 "description": (
377 "Allows you to set flow runs to be executed at fixed time"
378 " intervals."
379 ),
380 },
381 {
382 "type": "Cron",
383 "description": (
384 "Allows you to define recurring flow runs based on a specified"
385 " pattern using cron syntax."
386 ),
387 },
388 {
389 "type": "RRule",
390 "description": (
391 "Allows you to define recurring flow runs using RFC 2445 recurrence"
392 " rules."
393 ),
394 },
395 ],
396 )
397 return selection["type"]
400def prompt_schedules(console: Console) -> list[dict[str, Any]]: 1a
401 """
402 Prompt the user to configure schedules for a deployment.
403 """
404 schedules: list[dict[str, Any]] = []
406 if confirm(
407 "Would you like to configure schedules for this deployment?", default=True
408 ):
409 add_schedule = True
410 while add_schedule:
411 schedule_type = prompt_schedule_type(console)
412 if schedule_type == "Cron":
413 schedule = prompt_cron_schedule(console)
414 elif schedule_type == "Interval":
415 schedule = prompt_interval_schedule(console)
416 elif schedule_type == "RRule":
417 schedule = prompt_rrule_schedule(console)
418 else:
419 raise Exception("Invalid schedule type")
421 is_schedule_active = confirm(
422 "Would you like to activate this schedule?", default=True
423 )
425 schedules.append({"schedule": schedule, "active": is_schedule_active})
427 add_schedule = confirm(
428 "Would you like to add another schedule?", default=False
429 )
431 return schedules
434@client_injector 1a
435async def prompt_select_work_pool( 1a
436 client: "PrefectClient",
437 console: Console,
438 prompt: str = "Which work pool would you like to deploy this flow to?",
439) -> str:
440 work_pools = await client.read_work_pools()
441 work_pool_options = [
442 work_pool.model_dump()
443 for work_pool in work_pools
444 if work_pool.type != "prefect-agent"
445 ]
446 if not work_pool_options:
447 work_pool = await prompt_create_work_pool(console)
448 return work_pool.name
449 else:
450 selected_work_pool_row = prompt_select_from_table(
451 console,
452 prompt,
453 [
454 {"header": "Work Pool Name", "key": "name"},
455 {"header": "Infrastructure Type", "key": "type"},
456 {"header": "Description", "key": "description"},
457 ],
458 work_pool_options,
459 )
460 return selected_work_pool_row["name"]
463async def prompt_build_custom_docker_image( 1a
464 console: Console,
465 deployment_config: dict[str, Any],
466) -> dict[str, Any] | None:
467 if not confirm(
468 "Would you like to build a custom Docker image for this deployment?",
469 console=console,
470 default=False,
471 ):
472 return
474 build_step = {
475 "requires": "prefect-docker>=0.3.1",
476 "id": "build-image",
477 }
479 if os.path.exists("Dockerfile"):
480 if confirm(
481 "Would you like to use the Dockerfile in the current directory?",
482 console=console,
483 default=True,
484 ):
485 build_step["dockerfile"] = "Dockerfile"
486 else:
487 if confirm(
488 "A Dockerfile exists. You chose not to use it. A temporary Dockerfile"
489 " will be automatically built during the deployment build step. If"
490 " another file named 'Dockerfile' already exists at that time, the"
491 " build step will fail. Would you like to rename your existing"
492 " Dockerfile?"
493 ):
494 new_dockerfile_name = prompt(
495 "New Dockerfile name", default="Dockerfile.backup"
496 )
497 shutil.move("Dockerfile", new_dockerfile_name)
498 build_step["dockerfile"] = "auto"
499 else:
500 # this will otherwise raise when build steps are run as the auto-build feature
501 # executed in the build_docker_image step will create a temporary Dockerfile
502 raise ValueError(
503 "A Dockerfile already exists. Please remove or rename the existing"
504 " one."
505 )
506 else:
507 build_step["dockerfile"] = "auto"
509 repo_name = prompt("Repository name (e.g. your Docker Hub username)").rstrip("/")
510 image_name = prompt("Image name", default=deployment_config["name"])
511 build_step["image_name"] = f"{repo_name}/{image_name}"
512 build_step["tag"] = prompt("Image tag", default="latest")
514 console.print(
515 "Image"
516 f" [bold][yellow]{build_step['image_name']}:{build_step['tag']}[/yellow][/bold]"
517 " will be built."
518 )
520 return {"prefect_docker.deployments.steps.build_docker_image": build_step}
523async def prompt_push_custom_docker_image( 1a
524 console: Console,
525 deployment_config: dict[str, Any],
526 build_docker_image_step: dict[str, Any],
527) -> tuple[dict[str, Any] | None, dict[str, Any]]:
528 if not confirm(
529 "Would you like to push this image to a remote registry?",
530 console=console,
531 default=False,
532 ):
533 return None, build_docker_image_step
535 push_step = {
536 "requires": "prefect-docker>=0.3.1",
537 "image_name": "{{ build-image.image_name }}",
538 "tag": "{{ build-image.tag }}",
539 }
541 registry_url = prompt("Registry URL", default="docker.io").rstrip("/")
543 repo_and_image_name = build_docker_image_step[
544 "prefect_docker.deployments.steps.build_docker_image"
545 ]["image_name"]
546 full_image_name = f"{registry_url}/{repo_and_image_name}"
547 build_docker_image_step["prefect_docker.deployments.steps.build_docker_image"][
548 "image_name"
549 ] = full_image_name
551 if confirm("Is this a private registry?", console=console):
552 docker_credentials = {}
553 docker_credentials["registry_url"] = registry_url
555 if confirm(
556 "Would you like use prefect-docker to manage Docker registry credentials?",
557 console=console,
558 default=False,
559 ):
560 try:
561 import prefect_docker
562 except ImportError:
563 console.print("Installing prefect-docker...")
564 await ainstall_packages(["prefect[docker]"], stream_output=True)
565 import prefect_docker
567 credentials_block = prefect_docker.DockerRegistryCredentials
568 push_step["credentials"] = (
569 "{{ prefect_docker.docker-registry-credentials.docker_registry_creds_name }}"
570 )
571 docker_registry_creds_name = f"deployment-{slugify(deployment_config['name'])}-{slugify(deployment_config['work_pool']['name'])}-registry-creds"
572 create_new_block = False
573 try:
574 await credentials_block.aload(docker_registry_creds_name)
575 if not confirm(
576 (
577 "Would you like to use the existing Docker registry credentials"
578 f" block {docker_registry_creds_name}?"
579 ),
580 console=console,
581 default=True,
582 ):
583 create_new_block = True
584 except ValueError:
585 create_new_block = True
587 if create_new_block:
588 docker_credentials["username"] = prompt(
589 "Docker registry username", console=console
590 )
591 try:
592 docker_credentials["password"] = prompt(
593 "Docker registry password",
594 console=console,
595 password=True,
596 )
597 except GetPassWarning:
598 docker_credentials["password"] = prompt(
599 "Docker registry password",
600 console=console,
601 )
603 new_creds_block = credentials_block(
604 username=docker_credentials["username"],
605 password=docker_credentials["password"],
606 registry_url=docker_credentials["registry_url"],
607 )
608 coro = new_creds_block.save(
609 name=docker_registry_creds_name, overwrite=True
610 )
611 if TYPE_CHECKING:
612 assert asyncio.iscoroutine(coro)
613 await coro
614 return {
615 "prefect_docker.deployments.steps.push_docker_image": push_step
616 }, build_docker_image_step
619@client_injector 1a
620async def prompt_create_work_pool( 1a
621 client: "PrefectClient",
622 console: Console,
623):
624 if not confirm(
625 (
626 "Looks like you don't have any work pools this flow can be deployed to."
627 " Would you like to create one?"
628 ),
629 default=True,
630 console=console,
631 ):
632 raise ValueError(
633 "A work pool is required to deploy this flow. Please specify a work pool"
634 " name via the '--pool' flag or in your prefect.yaml file."
635 )
636 async with get_collections_metadata_client() as collections_client:
637 worker_metadata = await collections_client.read_worker_metadata()
638 selected_worker_row = prompt_select_from_table(
639 console,
640 prompt="What infrastructure type would you like to use for your new work pool?",
641 columns=[
642 {"header": "Type", "key": "type"},
643 {"header": "Description", "key": "description"},
644 ],
645 data=[
646 worker
647 for collection in worker_metadata.values()
648 for worker in collection.values()
649 if worker["type"] != "prefect-agent"
650 ],
651 table_kwargs={"show_lines": True},
652 )
653 work_pool_name = prompt("Work pool name")
654 work_pool = await client.create_work_pool(
655 WorkPoolCreate(name=work_pool_name, type=selected_worker_row["type"])
656 )
657 console.print(f"Your work pool {work_pool.name!r} has been created!", style="green")
658 return work_pool
661class EntrypointPrompt(PromptBase[str]): 1a
662 response_type: type[str] = str 1a
663 validate_error_message = "[prompt.invalid]Please enter a valid flow entrypoint." 1a
665 def process_response(self, value: str) -> str: 1a
666 try:
667 value.rsplit(":", 1)
668 except ValueError:
669 raise InvalidResponse(self.validate_error_message)
671 try:
672 load_flow_from_entrypoint(value)
673 except Exception:
674 raise InvalidResponse(
675 f"[prompt.invalid]Failed to load flow from entrypoint {value!r}."
676 f" {self.validate_error_message}"
677 )
678 return value
681async def prompt_entrypoint(console: Console) -> str: 1a
682 """
683 Prompt the user for a flow entrypoint. Will search for flow functions in the
684 current working directory and nested subdirectories to prompt the user to select
685 from a list of discovered flows. If no flows are found, the user will be prompted
686 to enter a flow entrypoint manually.
687 """
688 with Progress(
689 SpinnerColumn(),
690 TextColumn("[progress.description]{task.description}"),
691 transient=True,
692 ) as progress:
693 task_id = progress.add_task(
694 description="Scanning for flows...",
695 total=1,
696 )
697 discovered_flows = await search_for_flow_functions()
698 progress.update(task_id, completed=1)
699 if not discovered_flows:
700 return EntrypointPrompt.ask(
701 (
702 "[bold][green]?[/] Flow entrypoint (expected format"
703 " path/to/file.py:function_name)"
704 ),
705 console=console,
706 )
707 selected_flow = prompt_select_from_table(
708 console,
709 prompt="Select a flow to deploy",
710 columns=[
711 {"header": "Flow Name", "key": "flow_name"},
712 {"header": "Location", "key": "filepath"},
713 ],
714 data=discovered_flows,
715 opt_out_message="Enter a flow entrypoint manually",
716 )
717 if selected_flow is None:
718 return EntrypointPrompt.ask(
719 (
720 "[bold][green]?[/] Flow entrypoint (expected format"
721 " path/to/file.py:function_name)"
722 ),
723 console=console,
724 )
725 return f"{selected_flow['filepath']}:{selected_flow['function_name']}"
728@client_injector 1a
729async def prompt_select_remote_flow_storage( 1a
730 client: "PrefectClient", console: Console
731) -> Optional[str]:
732 valid_slugs_for_context: set[str] = set()
734 for (
735 storage_provider,
736 creds_block_type_slug,
737 ) in STORAGE_PROVIDER_TO_CREDS_BLOCK.items():
738 try:
739 # only return storage options for which the user has a credentials
740 # block type
741 await client.read_block_type_by_slug(creds_block_type_slug)
742 valid_slugs_for_context.add(storage_provider)
743 except ObjectNotFound:
744 pass
746 if get_git_remote_origin_url():
747 valid_slugs_for_context.add("git")
749 flow_storage_options = [
750 {
751 "type": "Git Repo",
752 "slug": "git",
753 "description": "Use a Git repository [bold](recommended).",
754 },
755 {
756 "type": "S3",
757 "slug": "s3",
758 "description": "Use an AWS S3 bucket.",
759 },
760 {
761 "type": "GCS",
762 "slug": "gcs",
763 "description": "Use a Google Cloud Storage bucket.",
764 },
765 {
766 "type": "Azure Blob Storage",
767 "slug": "azure_blob_storage",
768 "description": "Use an Azure Blob Storage bucket.",
769 },
770 ]
772 valid_storage_options_for_context = [
773 row for row in flow_storage_options if row["slug"] in valid_slugs_for_context
774 ]
776 selected_flow_storage_row = prompt_select_from_table(
777 console,
778 prompt="Please select a remote code storage option.",
779 columns=[
780 {"header": "Storage Type", "key": "type"},
781 {"header": "Description", "key": "description"},
782 ],
783 data=valid_storage_options_for_context,
784 )
786 return selected_flow_storage_row["slug"]
789@client_injector 1a
790async def prompt_select_blob_storage_credentials( 1a
791 client: "PrefectClient",
792 console: Console,
793 storage_provider: str,
794) -> str:
795 """
796 Prompt the user for blob storage credentials.
798 Returns a jinja template string that references a credentials block.
799 """
801 storage_provider_slug = storage_provider.replace("_", "-")
802 pretty_storage_provider = storage_provider.replace("_", " ").upper()
804 creds_block_type_slug = STORAGE_PROVIDER_TO_CREDS_BLOCK[storage_provider]
805 pretty_creds_block_type = creds_block_type_slug.replace("-", " ").title()
807 existing_credentials_blocks = await client.read_block_documents_by_type(
808 block_type_slug=creds_block_type_slug
809 )
811 if existing_credentials_blocks:
812 selected_credentials_block = prompt_select_from_table(
813 console,
814 prompt=(
815 f"Select from your existing {pretty_creds_block_type} credential blocks"
816 ),
817 columns=[
818 {
819 "header": f"{pretty_storage_provider} Credentials Blocks",
820 "key": "name",
821 }
822 ],
823 data=[
824 {"name": block.name}
825 for block in existing_credentials_blocks
826 if block.name is not None
827 ],
828 opt_out_message="Create a new credentials block",
829 )
831 if selected_credentials_block and (
832 selected_block := selected_credentials_block.get("name")
833 ):
834 return f"{{{{ prefect.blocks.{creds_block_type_slug}.{selected_block} }}}}"
836 credentials_block_type = await client.read_block_type_by_slug(creds_block_type_slug)
838 credentials_block_schema = await client.get_most_recent_block_schema_for_block_type(
839 block_type_id=credentials_block_type.id
840 )
841 if credentials_block_schema is None:
842 raise ValueError(f"No schema found for {pretty_creds_block_type} block")
844 console.print(
845 f"\nProvide details on your new {pretty_storage_provider} credentials:"
846 )
848 hydrated_fields = {
849 field_name: prompt(f"{field_name} [yellow]({props.get('type')})[/]")
850 for field_name, props in credentials_block_schema.fields.get(
851 "properties", {}
852 ).items()
853 if field_name in REQUIRED_FIELDS_FOR_CREDS_BLOCK[creds_block_type_slug]
854 }
856 console.print(f"[blue]\n{pretty_storage_provider} credentials specified![/]\n")
858 while True:
859 credentials_block_name = prompt(
860 "Give a name to your new credentials block",
861 default=f"{storage_provider_slug}-storage-credentials",
862 )
864 try:
865 new_block_document = await client.create_block_document(
866 block_document=BlockDocumentCreate(
867 name=credentials_block_name,
868 data=hydrated_fields,
869 block_schema_id=credentials_block_schema.id,
870 block_type_id=credentials_block_type.id,
871 )
872 )
873 break
874 except ObjectAlreadyExists:
875 console.print(
876 f"A {pretty_creds_block_type!r} block named"
877 f" {credentials_block_name!r} already exists. Please choose another"
878 " name"
879 )
881 url = urls.url_for(new_block_document)
882 if url:
883 console.print(
884 f"\nView/Edit your new credentials block in the UI:\n[blue]{url}[/]\n",
885 soft_wrap=True,
886 )
887 return f"{{{{ prefect.blocks.{creds_block_type_slug}.{new_block_document.name} }}}}"