Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/_prompts.py: 20%

329 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Utilities for prompting the user for input 

3""" 

4 

5from __future__ import annotations 1a

6 

7import asyncio 1a

8import os 1a

9import shutil 1a

10from datetime import timedelta 1a

11from getpass import GetPassWarning 1a

12from typing import ( 1a

13 TYPE_CHECKING, 

14 Any, 

15 Coroutine, 

16 Optional, 

17 TypeVar, 

18 Union, 

19 overload, 

20) 

21 

22import anyio 1a

23import readchar 1a

24from rich.console import Console, Group, RenderableType 1a

25from rich.live import Live 1a

26from rich.progress import Progress, SpinnerColumn, TextColumn 1a

27from rich.prompt import Confirm, InvalidResponse, Prompt, PromptBase 1a

28from rich.table import Table 1a

29 

30from prefect._internal.installation import ainstall_packages 1a

31from prefect.cli._utilities import exit_with_error 1a

32from prefect.client.collections import get_collections_metadata_client 1a

33from prefect.client.schemas.actions import ( 1a

34 BlockDocumentCreate, 

35 WorkPoolCreate, 

36) 

37from prefect.client.schemas.schedules import ( 1a

38 CronSchedule, 

39 IntervalSchedule, 

40 RRuleSchedule, 

41 is_valid_timezone, 

42) 

43from prefect.client.utilities import client_injector 1a

44from prefect.exceptions import ObjectAlreadyExists, ObjectNotFound 1a

45from prefect.flows import load_flow_from_entrypoint 1a

46from prefect.logging.loggers import get_logger 1a

47from prefect.utilities import urls 1a

48from prefect.utilities._ast import find_flow_functions_in_file 1a

49from prefect.utilities._git import get_git_remote_origin_url 1a

50from prefect.utilities.filesystem import filter_files 1a

51from prefect.utilities.slugify import slugify 1a

52 

53if TYPE_CHECKING: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true1a

54 from prefect.client.orchestration import PrefectClient 

55 

56T = TypeVar("T", bound=RenderableType) 1a

57 

58STORAGE_PROVIDER_TO_CREDS_BLOCK = { 1a

59 "s3": "aws-credentials", 

60 "gcs": "gcp-credentials", 

61 "azure_blob_storage": "azure-blob-storage-credentials", 

62} 

63 

64REQUIRED_FIELDS_FOR_CREDS_BLOCK = { 1a

65 "aws-credentials": ["aws_access_key_id", "aws_secret_access_key"], 

66 "gcp-credentials": ["project", "service_account_file"], 

67 "azure-blob-storage-credentials": ["account_url", "connection_string"], 

68} 

69 

70 

71logger = get_logger(__name__) 1a

72 

73 

74async def search_for_flow_functions( 1a

75 directory: str = ".", exclude_patterns: list[str] | None = None 

76) -> list[dict[str, str]]: 

77 """ 

78 Search for flow functions in the provided directory. If no directory is provided, 

79 the current working directory is used. 

80 

81 Args: 

82 directory: The directory to search in 

83 exclude_patterns: List of patterns to exclude from the search, defaults to 

84 ["**/site-packages/**"] 

85 

86 Returns: 

87 List[Dict]: the flow name, function name, and filepath of all flow functions found 

88 """ 

89 path = anyio.Path(directory) 

90 exclude_patterns = exclude_patterns or ["**/site-packages/**"] 

91 coros: list[Coroutine[list[dict[str, str]], Any, Any]] = [] 

92 

93 try: 

94 for file in filter_files( 

95 root=str(path), 

96 ignore_patterns=["*", "!**/*.py", *exclude_patterns], 

97 include_dirs=False, 

98 ): 

99 coros.append(find_flow_functions_in_file(anyio.Path(str(path / file)))) 

100 

101 except (PermissionError, OSError) as e: 

102 logger.error(f"Error searching for flow functions: {e}") 

103 return [] 

104 

105 return [fn for file_fns in await asyncio.gather(*coros) for fn in file_fns] 

106 

107 

108def prompt(message: str, **kwargs: Any) -> str: 1a

109 """Utility to prompt the user for input with consistent styling""" 

110 return Prompt.ask(f"[bold][green]?[/] {message}[/]", **kwargs) 

111 

112 

113def confirm(message: str, **kwargs: Any) -> bool: 1a

114 """Utility to prompt the user for confirmation with consistent styling""" 

115 return Confirm.ask(f"[bold][green]?[/] {message}[/]", **kwargs) 

116 

117 

118@overload 1a

119def prompt_select_from_table( 119 ↛ exitline 119 didn't return from function 'prompt_select_from_table' because 1a

120 console: Console, 

121 prompt: str, 

122 columns: list[dict[str, str]], 

123 data: list[dict[str, T]], 

124 table_kwargs: dict[str, Any] | None = None, 

125 opt_out_message: None = None, 

126 opt_out_response: Any = None, 

127) -> dict[str, T]: ... 

128 

129 

130@overload 1a

131def prompt_select_from_table( 131 ↛ exitline 131 didn't return from function 'prompt_select_from_table' because 1a

132 console: Console, 

133 prompt: str, 

134 columns: list[dict[str, str]], 

135 data: list[dict[str, T]], 

136 table_kwargs: dict[str, Any] | None = None, 

137 opt_out_message: str = "", 

138 opt_out_response: Any = None, 

139) -> dict[str, T] | None: ... 

140 

141 

142def prompt_select_from_table( 1a

143 console: Console, 

144 prompt: str, 

145 columns: list[dict[str, str]], 

146 data: list[dict[str, T]], 

147 table_kwargs: dict[str, Any] | None = None, 

148 opt_out_message: str | None = None, 

149 opt_out_response: Any = None, 

150) -> dict[str, T] | None: 

151 """ 

152 Given a list of columns and some data, display options to user in a table 

153 and prompt them to select one. 

154 

155 Args: 

156 prompt: A prompt to display to the user before the table. 

157 columns: A list of dicts with keys `header` and `key` to display in 

158 the table. The `header` value will be displayed in the table header 

159 and the `key` value will be used to lookup the value for each row 

160 in the provided data. 

161 data: A list of dicts with keys corresponding to the `key` values in 

162 the `columns` argument. 

163 table_kwargs: Additional kwargs to pass to the `rich.Table` constructor. 

164 Returns: 

165 dict: Data representation of the selected row 

166 """ 

167 current_idx = 0 

168 selected_row = None 

169 table_kwargs = table_kwargs or {} 

170 visible_rows = min(10, console.height - 4) # Adjust number of visible rows 

171 scroll_offset = 0 

172 total_options = len(data) + (1 if opt_out_message else 0) 

173 

174 def build_table() -> Union[Table, Group]: 

175 nonlocal scroll_offset 

176 table = Table(**table_kwargs) 

177 table.add_column() 

178 for column in columns: 

179 table.add_column(column.get("header", "")) 

180 

181 # Adjust scroll_offset if necessary 

182 if current_idx < scroll_offset: 

183 scroll_offset = current_idx 

184 elif current_idx >= scroll_offset + visible_rows: 

185 scroll_offset = current_idx - visible_rows + 1 

186 

187 for i, item in enumerate(data[scroll_offset : scroll_offset + visible_rows]): 

188 row = [item.get(column.get("key", "")) for column in columns] 

189 if i + scroll_offset == current_idx: 

190 table.add_row( 

191 "[bold][blue]>", *[f"[bold][blue]{cell}[/]" for cell in row] 

192 ) 

193 else: 

194 table.add_row(" ", *row) 

195 

196 if opt_out_message: 

197 opt_out_row = [""] * (len(columns) - 1) + [opt_out_message] 

198 if current_idx == len(data): 

199 table.add_row( 

200 "[bold][blue]>", *[f"[bold][blue]{cell}[/]" for cell in opt_out_row] 

201 ) 

202 else: 

203 table.add_row(" ", *opt_out_row) 

204 

205 return table 

206 

207 with Live(build_table(), auto_refresh=False, console=console) as live: 

208 instructions_message = ( 

209 f"[bold][green]?[/] {prompt} [bright_blue][Use arrows to move; enter to" 

210 " select" 

211 ) 

212 if opt_out_message: 

213 instructions_message += "; n to select none" 

214 instructions_message += "]" 

215 live.console.print(instructions_message) 

216 while selected_row is None: 

217 key = readchar.readkey() 

218 

219 if key == readchar.key.UP: 

220 current_idx = (current_idx - 1) % total_options 

221 elif key == readchar.key.DOWN: 

222 current_idx = (current_idx + 1) % total_options 

223 elif key == readchar.key.CTRL_C: 

224 # gracefully exit with no message 

225 exit_with_error("") 

226 elif key == readchar.key.ENTER or key == readchar.key.CR: 

227 if current_idx == len(data): 

228 return opt_out_response 

229 else: 

230 selected_row = data[current_idx] 

231 elif key == "n" and opt_out_message: 

232 return opt_out_response 

233 

234 live.update(build_table(), refresh=True) 

235 

236 return selected_row 

237 

238 

239# Interval schedule prompting utilities 

240class IntervalValuePrompt(PromptBase[timedelta]): 1a

241 response_type: type[timedelta] = timedelta 1a

242 validate_error_message = ( 1a

243 "[prompt.invalid]Please enter a valid interval denoted in seconds" 

244 ) 

245 

246 def process_response(self, value: str) -> timedelta: 1a

247 try: 

248 int_value = int(value) 

249 if int_value <= 0: 

250 raise InvalidResponse("[prompt.invalid]Interval must be greater than 0") 

251 return timedelta(seconds=int_value) 

252 except ValueError: 

253 raise InvalidResponse(self.validate_error_message) 

254 

255 

256def prompt_interval_schedule(console: Console) -> IntervalSchedule: 1a

257 """ 

258 Prompt the user for an interval in seconds. 

259 """ 

260 default_seconds = 3600 

261 default_duration = timedelta(seconds=default_seconds) 

262 

263 # We show the default in the prompt message rather than enabling `show_default=True` here because `rich` displays timedeltas in hours 

264 # rather than seconds, which would confuse users since we ask them to enter the interval in seconds. 

265 interval = IntervalValuePrompt.ask( 

266 f"[bold][green]?[/] Seconds between scheduled runs ({default_seconds})", 

267 console=console, 

268 default=default_duration, 

269 show_default=False, 

270 ) 

271 

272 return IntervalSchedule(interval=interval) 

273 

274 

275# Cron schedule prompting utilities 

276 

277 

278class CronStringPrompt(PromptBase[str]): 1a

279 response_type: type[str] = str 1a

280 validate_error_message = "[prompt.invalid]Please enter a valid cron string" 1a

281 

282 def process_response(self, value: str) -> str: 1a

283 try: 

284 CronSchedule.valid_cron_string(value) 

285 return value 

286 except ValueError: 

287 raise InvalidResponse(self.validate_error_message) 

288 

289 

290class CronTimezonePrompt(PromptBase[str]): 1a

291 response_type: type[str] = str 1a

292 validate_error_message = "[prompt.invalid]Please enter a valid timezone." 1a

293 

294 def process_response(self, value: str) -> str: 1a

295 try: 

296 CronSchedule.valid_timezone(value) 

297 return value 

298 except ValueError: 

299 raise InvalidResponse(self.validate_error_message) 

300 

301 

302def prompt_cron_schedule(console: Console) -> CronSchedule: 1a

303 """ 

304 Prompt the user for a cron string and timezone. 

305 """ 

306 cron = CronStringPrompt.ask( 

307 "[bold][green]?[/] Cron string", 

308 console=console, 

309 default="0 0 * * *", 

310 ) 

311 timezone = CronTimezonePrompt.ask( 

312 "[bold][green]?[/] Timezone", console=console, default="UTC" 

313 ) 

314 return CronSchedule(cron=cron, timezone=timezone) 

315 

316 

317# RRule schedule prompting utilities 

318 

319 

320class RRuleStringPrompt(PromptBase[str]): 1a

321 response_type: type[str] = str 1a

322 validate_error_message = "[prompt.invalid]Please enter a valid RRule string" 1a

323 

324 def process_response(self, value: str) -> str: 1a

325 try: 

326 RRuleSchedule.validate_rrule_str(value) 

327 return value 

328 except ValueError: 

329 raise InvalidResponse(self.validate_error_message) 

330 

331 

332class RRuleTimezonePrompt(PromptBase[str]): 1a

333 response_type: type[str] = str 1a

334 validate_error_message = "[prompt.invalid]Please enter a valid timezone." 1a

335 

336 def process_response(self, value: str) -> str: 1a

337 try: 

338 is_valid_timezone(value) 

339 return value 

340 except ValueError: 

341 raise InvalidResponse(self.validate_error_message) 

342 

343 

344def prompt_rrule_schedule(console: Console) -> RRuleSchedule: 1a

345 """ 

346 Prompts the user to enter an RRule string and timezone. 

347 """ 

348 rrule = RRuleStringPrompt.ask( 

349 "[bold][green]?[/] RRule string", 

350 console=console, 

351 default="RRULE:FREQ=DAILY;INTERVAL=1", 

352 ) 

353 timezone = CronTimezonePrompt.ask( 

354 "[bold][green]?[/] Timezone", console=console, default="UTC" 

355 ) 

356 return RRuleSchedule(rrule=rrule, timezone=timezone) 

357 

358 

359# Schedule type prompting utilities 

360 

361 

362def prompt_schedule_type(console: Console) -> str: 1a

363 """ 

364 Prompts the user to select a schedule type from a list of options. 

365 """ 

366 selection = prompt_select_from_table( 

367 console, 

368 "What type of schedule would you like to use?", 

369 [ 

370 {"header": "Schedule Type", "key": "type"}, 

371 {"header": "Description", "key": "description"}, 

372 ], 

373 [ 

374 { 

375 "type": "Interval", 

376 "description": ( 

377 "Allows you to set flow runs to be executed at fixed time" 

378 " intervals." 

379 ), 

380 }, 

381 { 

382 "type": "Cron", 

383 "description": ( 

384 "Allows you to define recurring flow runs based on a specified" 

385 " pattern using cron syntax." 

386 ), 

387 }, 

388 { 

389 "type": "RRule", 

390 "description": ( 

391 "Allows you to define recurring flow runs using RFC 2445 recurrence" 

392 " rules." 

393 ), 

394 }, 

395 ], 

396 ) 

397 return selection["type"] 

398 

399 

400def prompt_schedules(console: Console) -> list[dict[str, Any]]: 1a

401 """ 

402 Prompt the user to configure schedules for a deployment. 

403 """ 

404 schedules: list[dict[str, Any]] = [] 

405 

406 if confirm( 

407 "Would you like to configure schedules for this deployment?", default=True 

408 ): 

409 add_schedule = True 

410 while add_schedule: 

411 schedule_type = prompt_schedule_type(console) 

412 if schedule_type == "Cron": 

413 schedule = prompt_cron_schedule(console) 

414 elif schedule_type == "Interval": 

415 schedule = prompt_interval_schedule(console) 

416 elif schedule_type == "RRule": 

417 schedule = prompt_rrule_schedule(console) 

418 else: 

419 raise Exception("Invalid schedule type") 

420 

421 is_schedule_active = confirm( 

422 "Would you like to activate this schedule?", default=True 

423 ) 

424 

425 schedules.append({"schedule": schedule, "active": is_schedule_active}) 

426 

427 add_schedule = confirm( 

428 "Would you like to add another schedule?", default=False 

429 ) 

430 

431 return schedules 

432 

433 

434@client_injector 1a

435async def prompt_select_work_pool( 1a

436 client: "PrefectClient", 

437 console: Console, 

438 prompt: str = "Which work pool would you like to deploy this flow to?", 

439) -> str: 

440 work_pools = await client.read_work_pools() 

441 work_pool_options = [ 

442 work_pool.model_dump() 

443 for work_pool in work_pools 

444 if work_pool.type != "prefect-agent" 

445 ] 

446 if not work_pool_options: 

447 work_pool = await prompt_create_work_pool(console) 

448 return work_pool.name 

449 else: 

450 selected_work_pool_row = prompt_select_from_table( 

451 console, 

452 prompt, 

453 [ 

454 {"header": "Work Pool Name", "key": "name"}, 

455 {"header": "Infrastructure Type", "key": "type"}, 

456 {"header": "Description", "key": "description"}, 

457 ], 

458 work_pool_options, 

459 ) 

460 return selected_work_pool_row["name"] 

461 

462 

463async def prompt_build_custom_docker_image( 1a

464 console: Console, 

465 deployment_config: dict[str, Any], 

466) -> dict[str, Any] | None: 

467 if not confirm( 

468 "Would you like to build a custom Docker image for this deployment?", 

469 console=console, 

470 default=False, 

471 ): 

472 return 

473 

474 build_step = { 

475 "requires": "prefect-docker>=0.3.1", 

476 "id": "build-image", 

477 } 

478 

479 if os.path.exists("Dockerfile"): 

480 if confirm( 

481 "Would you like to use the Dockerfile in the current directory?", 

482 console=console, 

483 default=True, 

484 ): 

485 build_step["dockerfile"] = "Dockerfile" 

486 else: 

487 if confirm( 

488 "A Dockerfile exists. You chose not to use it. A temporary Dockerfile" 

489 " will be automatically built during the deployment build step. If" 

490 " another file named 'Dockerfile' already exists at that time, the" 

491 " build step will fail. Would you like to rename your existing" 

492 " Dockerfile?" 

493 ): 

494 new_dockerfile_name = prompt( 

495 "New Dockerfile name", default="Dockerfile.backup" 

496 ) 

497 shutil.move("Dockerfile", new_dockerfile_name) 

498 build_step["dockerfile"] = "auto" 

499 else: 

500 # this will otherwise raise when build steps are run as the auto-build feature 

501 # executed in the build_docker_image step will create a temporary Dockerfile 

502 raise ValueError( 

503 "A Dockerfile already exists. Please remove or rename the existing" 

504 " one." 

505 ) 

506 else: 

507 build_step["dockerfile"] = "auto" 

508 

509 repo_name = prompt("Repository name (e.g. your Docker Hub username)").rstrip("/") 

510 image_name = prompt("Image name", default=deployment_config["name"]) 

511 build_step["image_name"] = f"{repo_name}/{image_name}" 

512 build_step["tag"] = prompt("Image tag", default="latest") 

513 

514 console.print( 

515 "Image" 

516 f" [bold][yellow]{build_step['image_name']}:{build_step['tag']}[/yellow][/bold]" 

517 " will be built." 

518 ) 

519 

520 return {"prefect_docker.deployments.steps.build_docker_image": build_step} 

521 

522 

523async def prompt_push_custom_docker_image( 1a

524 console: Console, 

525 deployment_config: dict[str, Any], 

526 build_docker_image_step: dict[str, Any], 

527) -> tuple[dict[str, Any] | None, dict[str, Any]]: 

528 if not confirm( 

529 "Would you like to push this image to a remote registry?", 

530 console=console, 

531 default=False, 

532 ): 

533 return None, build_docker_image_step 

534 

535 push_step = { 

536 "requires": "prefect-docker>=0.3.1", 

537 "image_name": "{{ build-image.image_name }}", 

538 "tag": "{{ build-image.tag }}", 

539 } 

540 

541 registry_url = prompt("Registry URL", default="docker.io").rstrip("/") 

542 

543 repo_and_image_name = build_docker_image_step[ 

544 "prefect_docker.deployments.steps.build_docker_image" 

545 ]["image_name"] 

546 full_image_name = f"{registry_url}/{repo_and_image_name}" 

547 build_docker_image_step["prefect_docker.deployments.steps.build_docker_image"][ 

548 "image_name" 

549 ] = full_image_name 

550 

551 if confirm("Is this a private registry?", console=console): 

552 docker_credentials = {} 

553 docker_credentials["registry_url"] = registry_url 

554 

555 if confirm( 

556 "Would you like use prefect-docker to manage Docker registry credentials?", 

557 console=console, 

558 default=False, 

559 ): 

560 try: 

561 import prefect_docker 

562 except ImportError: 

563 console.print("Installing prefect-docker...") 

564 await ainstall_packages(["prefect[docker]"], stream_output=True) 

565 import prefect_docker 

566 

567 credentials_block = prefect_docker.DockerRegistryCredentials 

568 push_step["credentials"] = ( 

569 "{{ prefect_docker.docker-registry-credentials.docker_registry_creds_name }}" 

570 ) 

571 docker_registry_creds_name = f"deployment-{slugify(deployment_config['name'])}-{slugify(deployment_config['work_pool']['name'])}-registry-creds" 

572 create_new_block = False 

573 try: 

574 await credentials_block.aload(docker_registry_creds_name) 

575 if not confirm( 

576 ( 

577 "Would you like to use the existing Docker registry credentials" 

578 f" block {docker_registry_creds_name}?" 

579 ), 

580 console=console, 

581 default=True, 

582 ): 

583 create_new_block = True 

584 except ValueError: 

585 create_new_block = True 

586 

587 if create_new_block: 

588 docker_credentials["username"] = prompt( 

589 "Docker registry username", console=console 

590 ) 

591 try: 

592 docker_credentials["password"] = prompt( 

593 "Docker registry password", 

594 console=console, 

595 password=True, 

596 ) 

597 except GetPassWarning: 

598 docker_credentials["password"] = prompt( 

599 "Docker registry password", 

600 console=console, 

601 ) 

602 

603 new_creds_block = credentials_block( 

604 username=docker_credentials["username"], 

605 password=docker_credentials["password"], 

606 registry_url=docker_credentials["registry_url"], 

607 ) 

608 coro = new_creds_block.save( 

609 name=docker_registry_creds_name, overwrite=True 

610 ) 

611 if TYPE_CHECKING: 

612 assert asyncio.iscoroutine(coro) 

613 await coro 

614 return { 

615 "prefect_docker.deployments.steps.push_docker_image": push_step 

616 }, build_docker_image_step 

617 

618 

619@client_injector 1a

620async def prompt_create_work_pool( 1a

621 client: "PrefectClient", 

622 console: Console, 

623): 

624 if not confirm( 

625 ( 

626 "Looks like you don't have any work pools this flow can be deployed to." 

627 " Would you like to create one?" 

628 ), 

629 default=True, 

630 console=console, 

631 ): 

632 raise ValueError( 

633 "A work pool is required to deploy this flow. Please specify a work pool" 

634 " name via the '--pool' flag or in your prefect.yaml file." 

635 ) 

636 async with get_collections_metadata_client() as collections_client: 

637 worker_metadata = await collections_client.read_worker_metadata() 

638 selected_worker_row = prompt_select_from_table( 

639 console, 

640 prompt="What infrastructure type would you like to use for your new work pool?", 

641 columns=[ 

642 {"header": "Type", "key": "type"}, 

643 {"header": "Description", "key": "description"}, 

644 ], 

645 data=[ 

646 worker 

647 for collection in worker_metadata.values() 

648 for worker in collection.values() 

649 if worker["type"] != "prefect-agent" 

650 ], 

651 table_kwargs={"show_lines": True}, 

652 ) 

653 work_pool_name = prompt("Work pool name") 

654 work_pool = await client.create_work_pool( 

655 WorkPoolCreate(name=work_pool_name, type=selected_worker_row["type"]) 

656 ) 

657 console.print(f"Your work pool {work_pool.name!r} has been created!", style="green") 

658 return work_pool 

659 

660 

661class EntrypointPrompt(PromptBase[str]): 1a

662 response_type: type[str] = str 1a

663 validate_error_message = "[prompt.invalid]Please enter a valid flow entrypoint." 1a

664 

665 def process_response(self, value: str) -> str: 1a

666 try: 

667 value.rsplit(":", 1) 

668 except ValueError: 

669 raise InvalidResponse(self.validate_error_message) 

670 

671 try: 

672 load_flow_from_entrypoint(value) 

673 except Exception: 

674 raise InvalidResponse( 

675 f"[prompt.invalid]Failed to load flow from entrypoint {value!r}." 

676 f" {self.validate_error_message}" 

677 ) 

678 return value 

679 

680 

681async def prompt_entrypoint(console: Console) -> str: 1a

682 """ 

683 Prompt the user for a flow entrypoint. Will search for flow functions in the 

684 current working directory and nested subdirectories to prompt the user to select 

685 from a list of discovered flows. If no flows are found, the user will be prompted 

686 to enter a flow entrypoint manually. 

687 """ 

688 with Progress( 

689 SpinnerColumn(), 

690 TextColumn("[progress.description]{task.description}"), 

691 transient=True, 

692 ) as progress: 

693 task_id = progress.add_task( 

694 description="Scanning for flows...", 

695 total=1, 

696 ) 

697 discovered_flows = await search_for_flow_functions() 

698 progress.update(task_id, completed=1) 

699 if not discovered_flows: 

700 return EntrypointPrompt.ask( 

701 ( 

702 "[bold][green]?[/] Flow entrypoint (expected format" 

703 " path/to/file.py:function_name)" 

704 ), 

705 console=console, 

706 ) 

707 selected_flow = prompt_select_from_table( 

708 console, 

709 prompt="Select a flow to deploy", 

710 columns=[ 

711 {"header": "Flow Name", "key": "flow_name"}, 

712 {"header": "Location", "key": "filepath"}, 

713 ], 

714 data=discovered_flows, 

715 opt_out_message="Enter a flow entrypoint manually", 

716 ) 

717 if selected_flow is None: 

718 return EntrypointPrompt.ask( 

719 ( 

720 "[bold][green]?[/] Flow entrypoint (expected format" 

721 " path/to/file.py:function_name)" 

722 ), 

723 console=console, 

724 ) 

725 return f"{selected_flow['filepath']}:{selected_flow['function_name']}" 

726 

727 

728@client_injector 1a

729async def prompt_select_remote_flow_storage( 1a

730 client: "PrefectClient", console: Console 

731) -> Optional[str]: 

732 valid_slugs_for_context: set[str] = set() 

733 

734 for ( 

735 storage_provider, 

736 creds_block_type_slug, 

737 ) in STORAGE_PROVIDER_TO_CREDS_BLOCK.items(): 

738 try: 

739 # only return storage options for which the user has a credentials 

740 # block type 

741 await client.read_block_type_by_slug(creds_block_type_slug) 

742 valid_slugs_for_context.add(storage_provider) 

743 except ObjectNotFound: 

744 pass 

745 

746 if get_git_remote_origin_url(): 

747 valid_slugs_for_context.add("git") 

748 

749 flow_storage_options = [ 

750 { 

751 "type": "Git Repo", 

752 "slug": "git", 

753 "description": "Use a Git repository [bold](recommended).", 

754 }, 

755 { 

756 "type": "S3", 

757 "slug": "s3", 

758 "description": "Use an AWS S3 bucket.", 

759 }, 

760 { 

761 "type": "GCS", 

762 "slug": "gcs", 

763 "description": "Use a Google Cloud Storage bucket.", 

764 }, 

765 { 

766 "type": "Azure Blob Storage", 

767 "slug": "azure_blob_storage", 

768 "description": "Use an Azure Blob Storage bucket.", 

769 }, 

770 ] 

771 

772 valid_storage_options_for_context = [ 

773 row for row in flow_storage_options if row["slug"] in valid_slugs_for_context 

774 ] 

775 

776 selected_flow_storage_row = prompt_select_from_table( 

777 console, 

778 prompt="Please select a remote code storage option.", 

779 columns=[ 

780 {"header": "Storage Type", "key": "type"}, 

781 {"header": "Description", "key": "description"}, 

782 ], 

783 data=valid_storage_options_for_context, 

784 ) 

785 

786 return selected_flow_storage_row["slug"] 

787 

788 

789@client_injector 1a

790async def prompt_select_blob_storage_credentials( 1a

791 client: "PrefectClient", 

792 console: Console, 

793 storage_provider: str, 

794) -> str: 

795 """ 

796 Prompt the user for blob storage credentials. 

797 

798 Returns a jinja template string that references a credentials block. 

799 """ 

800 

801 storage_provider_slug = storage_provider.replace("_", "-") 

802 pretty_storage_provider = storage_provider.replace("_", " ").upper() 

803 

804 creds_block_type_slug = STORAGE_PROVIDER_TO_CREDS_BLOCK[storage_provider] 

805 pretty_creds_block_type = creds_block_type_slug.replace("-", " ").title() 

806 

807 existing_credentials_blocks = await client.read_block_documents_by_type( 

808 block_type_slug=creds_block_type_slug 

809 ) 

810 

811 if existing_credentials_blocks: 

812 selected_credentials_block = prompt_select_from_table( 

813 console, 

814 prompt=( 

815 f"Select from your existing {pretty_creds_block_type} credential blocks" 

816 ), 

817 columns=[ 

818 { 

819 "header": f"{pretty_storage_provider} Credentials Blocks", 

820 "key": "name", 

821 } 

822 ], 

823 data=[ 

824 {"name": block.name} 

825 for block in existing_credentials_blocks 

826 if block.name is not None 

827 ], 

828 opt_out_message="Create a new credentials block", 

829 ) 

830 

831 if selected_credentials_block and ( 

832 selected_block := selected_credentials_block.get("name") 

833 ): 

834 return f"{{{{ prefect.blocks.{creds_block_type_slug}.{selected_block} }}}}" 

835 

836 credentials_block_type = await client.read_block_type_by_slug(creds_block_type_slug) 

837 

838 credentials_block_schema = await client.get_most_recent_block_schema_for_block_type( 

839 block_type_id=credentials_block_type.id 

840 ) 

841 if credentials_block_schema is None: 

842 raise ValueError(f"No schema found for {pretty_creds_block_type} block") 

843 

844 console.print( 

845 f"\nProvide details on your new {pretty_storage_provider} credentials:" 

846 ) 

847 

848 hydrated_fields = { 

849 field_name: prompt(f"{field_name} [yellow]({props.get('type')})[/]") 

850 for field_name, props in credentials_block_schema.fields.get( 

851 "properties", {} 

852 ).items() 

853 if field_name in REQUIRED_FIELDS_FOR_CREDS_BLOCK[creds_block_type_slug] 

854 } 

855 

856 console.print(f"[blue]\n{pretty_storage_provider} credentials specified![/]\n") 

857 

858 while True: 

859 credentials_block_name = prompt( 

860 "Give a name to your new credentials block", 

861 default=f"{storage_provider_slug}-storage-credentials", 

862 ) 

863 

864 try: 

865 new_block_document = await client.create_block_document( 

866 block_document=BlockDocumentCreate( 

867 name=credentials_block_name, 

868 data=hydrated_fields, 

869 block_schema_id=credentials_block_schema.id, 

870 block_type_id=credentials_block_type.id, 

871 ) 

872 ) 

873 break 

874 except ObjectAlreadyExists: 

875 console.print( 

876 f"A {pretty_creds_block_type!r} block named" 

877 f" {credentials_block_name!r} already exists. Please choose another" 

878 " name" 

879 ) 

880 

881 url = urls.url_for(new_block_document) 

882 if url: 

883 console.print( 

884 f"\nView/Edit your new credentials block in the UI:\n[blue]{url}[/]\n", 

885 soft_wrap=True, 

886 ) 

887 return f"{{{{ prefect.blocks.{creds_block_type_slug}.{new_block_document.name} }}}}"