Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/cloud/__init__.py: 17%

329 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Command line interface for interacting with Prefect Cloud 

3""" 

4 

5from __future__ import annotations 1a

6 

7import os 1a

8import traceback 1a

9import uuid 1a

10import urllib.parse 1a

11import warnings 1a

12import webbrowser 1a

13from contextlib import asynccontextmanager 1a

14from typing import ( 1a

15 TYPE_CHECKING, 

16 Iterable, 

17 Optional, 

18 TypeVar, 

19 Union, 

20 overload, 

21) 

22 

23import anyio 1a

24import anyio.abc 1a

25import httpx 1a

26import readchar 1a

27import typer 1a

28import uvicorn 1a

29from fastapi import FastAPI 1a

30from fastapi.middleware.cors import CORSMiddleware 1a

31 

32from rich.console import Console 1a

33from rich.live import Live 1a

34from rich.table import Table 1a

35from typing_extensions import Literal 1a

36 

37import prefect.context 1a

38from prefect.cli._types import PrefectTyper 1a

39from prefect.cli._utilities import exit_with_error, exit_with_success 1a

40from prefect.cli.root import app, is_interactive 1a

41from prefect.client.cloud import CloudUnauthorizedError, get_cloud_client 1a

42from prefect.client.schemas import Workspace 1a

43from prefect.context import get_settings_context 1a

44from prefect.settings import ( 1a

45 PREFECT_API_KEY, 

46 PREFECT_API_URL, 

47 PREFECT_CLOUD_UI_URL, 

48 load_profiles, 

49 save_profiles, 

50 update_current_profile, 

51) 

52from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a

53from prefect.utilities.collections import listrepr 1a

54 

55from pydantic import BaseModel 1a

56 

57T = TypeVar("T") 1a

58 

59# Set up the `prefect cloud` and `prefect cloud workspaces` CLI applications 

60cloud_app: PrefectTyper = PrefectTyper( 1a

61 name="cloud", help="Authenticate and interact with Prefect Cloud" 

62) 

63workspace_app: PrefectTyper = PrefectTyper( 1a

64 name="workspace", help="View and set Prefect Cloud Workspaces" 

65) 

66cloud_app.add_typer(workspace_app, aliases=["workspaces"]) 1a

67app.add_typer(cloud_app) 1a

68 

69 

70def set_login_api_ready_event() -> None: 1a

71 login_api.extra["ready-event"].set() 

72 

73 

74@asynccontextmanager 1a

75async def lifespan(app: FastAPI): 1a

76 try: 

77 set_login_api_ready_event() 

78 yield 

79 finally: 

80 pass 

81 

82 

83login_api: FastAPI = FastAPI(lifespan=lifespan) 1a

84""" 1a

85This small API server is used for data transmission for browser-based log in. 

86""" 

87 

88login_api.add_middleware( 1a

89 CORSMiddleware, 

90 allow_origins=["*"], 

91 allow_methods=["*"], 

92 allow_headers=["*"], 

93) 

94 

95 

96class LoginSuccess(BaseModel): 1a

97 api_key: str 1a

98 

99 

100class LoginFailed(BaseModel): 1a

101 reason: str 1a

102 

103 

104class LoginResult(BaseModel): 1a

105 type: Literal["success", "failure"] 1a

106 content: Union[LoginSuccess, LoginFailed] 1a

107 

108 

109class ServerExit(Exception): 1a

110 pass 1a

111 

112 

113@login_api.post("/success") 1a

114def receive_login(payload: LoginSuccess) -> None: 1a

115 login_api.extra["result"] = LoginResult(type="success", content=payload) 

116 login_api.extra["result-event"].set() 

117 

118 

119@login_api.post("/failure") 1a

120def receive_failure(payload: LoginFailed) -> None: 1a

121 login_api.extra["result"] = LoginResult(type="failure", content=payload) 

122 login_api.extra["result-event"].set() 

123 

124 

125async def serve_login_api( 1a

126 cancel_scope: anyio.CancelScope, task_status: anyio.abc.TaskStatus[uvicorn.Server] 

127) -> None: 

128 config = uvicorn.Config(login_api, port=0, log_level="critical") 

129 server = uvicorn.Server(config) 

130 

131 try: 

132 # Yield the server object 

133 task_status.started(server) 

134 with warnings.catch_warnings(): 

135 # Uvicorn uses the deprecated pieces of websockets, filter out 

136 # the warnings until uvicorn has its dependencies updated 

137 warnings.filterwarnings( 

138 "ignore", category=DeprecationWarning, module="websockets" 

139 ) 

140 warnings.filterwarnings( 

141 "ignore", 

142 category=DeprecationWarning, 

143 module="uvicorn.protocols.websockets", 

144 ) 

145 await server.serve() 

146 except anyio.get_cancelled_exc_class(): 

147 pass # Already cancelled, do not cancel again 

148 except SystemExit as exc: 

149 # If uvicorn is misconfigured, it will throw a system exit and hide the exc 

150 app.console.print("[red][bold]X Error starting login service!") 

151 cause = exc.__context__ # Hide the system exit 

152 if TYPE_CHECKING: 

153 assert isinstance(cause, BaseException) 

154 traceback.print_exception(type(cause), value=cause, tb=cause.__traceback__) 

155 cancel_scope.cancel() 

156 else: 

157 # Exit if we are done serving the API 

158 # Uvicorn overrides signal handlers so without this Ctrl-C is broken 

159 cancel_scope.cancel() 

160 

161 

162def confirm_logged_in() -> None: 1a

163 if not PREFECT_API_KEY: 

164 profile = prefect.context.get_settings_context().profile 

165 exit_with_error( 

166 f"Currently not authenticated in profile {profile.name!r}. " 

167 "Please log in with `prefect cloud login`." 

168 ) 

169 

170 

171def get_current_workspace(workspaces: Iterable[Workspace]) -> Workspace | None: 1a

172 current_api_url = PREFECT_API_URL.value() 

173 

174 if not current_api_url: 

175 return None 

176 

177 for workspace in workspaces: 

178 if workspace.api_url() == current_api_url: 

179 return workspace 

180 

181 return None 

182 

183 

184@overload 1a

185def prompt_select_from_list( 185 ↛ exitline 185 didn't return from function 'prompt_select_from_list' because 1a

186 console: Console, prompt: str, options: list[str] 

187) -> str: ... 

188 

189 

190@overload 1a

191def prompt_select_from_list( 191 ↛ exitline 191 didn't return from function 'prompt_select_from_list' because 1a

192 console: Console, prompt: str, options: list[tuple[T, str]] 

193) -> T: ... 

194 

195 

196def prompt_select_from_list( 1a

197 console: Console, prompt: str, options: list[str] | list[tuple[T, str]] 

198) -> str | T: 

199 """ 

200 Given a list of options, display the values to user in a table and prompt them 

201 to select one. 

202 

203 Args: 

204 options: A list of options to present to the user. 

205 A list of tuples can be passed as key value pairs. If a value is chosen, the 

206 key will be returned. 

207 

208 Returns: 

209 str: the selected option 

210 """ 

211 current_idx = 0 

212 selected_option = None 

213 

214 def build_table() -> Table: 

215 """ 

216 Generate a table of options. The `current_idx` will be highlighted. 

217 """ 

218 

219 table = Table(box=None, header_style=None, padding=(0, 0)) 

220 table.add_column( 

221 f"? [bold]{prompt}[/] [bright_blue][Use arrows to move; enter to select]", 

222 justify="left", 

223 no_wrap=True, 

224 ) 

225 

226 for i, option in enumerate(options): 

227 if isinstance(option, tuple): 

228 option = option[1] 

229 

230 if i == current_idx: 

231 # Use blue for selected options 

232 table.add_row("[bold][blue]> " + option) 

233 else: 

234 table.add_row(" " + option) 

235 return table 

236 

237 with Live(build_table(), auto_refresh=False, console=console) as live: 

238 while selected_option is None: 

239 key = readchar.readkey() 

240 

241 if key == readchar.key.UP: 

242 current_idx = current_idx - 1 

243 # wrap to bottom if at the top 

244 if current_idx < 0: 

245 current_idx = len(options) - 1 

246 elif key == readchar.key.DOWN: 

247 current_idx = current_idx + 1 

248 # wrap to top if at the bottom 

249 if current_idx >= len(options): 

250 current_idx = 0 

251 elif key == readchar.key.CTRL_C: 

252 # gracefully exit with no message 

253 exit_with_error("") 

254 elif key == readchar.key.ENTER or key == readchar.key.CR: 

255 selected_option = options[current_idx] 

256 # Break out of the loop immediately after setting selected_option 

257 break 

258 

259 live.update(build_table(), refresh=True) 

260 

261 # Convert tuple to its first element if needed 

262 if isinstance(selected_option, tuple): 

263 selected_option = selected_option[0] 

264 

265 return selected_option 

266 

267 

268async def login_with_browser() -> str: 1a

269 """ 

270 Perform login using the browser. 

271 

272 On failure, this function will exit the process. 

273 On success, it will return an API key. 

274 """ 

275 

276 # Set up an event that the login API will toggle on startup 

277 ready_event = login_api.extra["ready-event"] = anyio.Event() 

278 

279 # Set up an event that the login API will set when a response comes from the UI 

280 result_event = login_api.extra["result-event"] = anyio.Event() 

281 

282 timeout_scope = None 

283 async with anyio.create_task_group() as tg: 

284 # Run a server in the background to get payload from the browser 

285 server = await tg.start(serve_login_api, tg.cancel_scope) 

286 

287 # Wait for the login server to be ready 

288 with anyio.fail_after(10): 

289 await ready_event.wait() 

290 

291 # The server may not actually be serving as the lifespan is started first 

292 while not server.started: 

293 await anyio.sleep(0) 

294 

295 # Get the port the server is using 

296 server_port = server.servers[0].sockets[0].getsockname()[1] 

297 callback = urllib.parse.quote(f"http://localhost:{server_port}") 

298 ui_login_url = ( 

299 PREFECT_CLOUD_UI_URL.value() + f"/auth/client?callback={callback}" 

300 ) 

301 

302 # Then open the authorization page in a new browser tab 

303 app.console.print("Opening browser...") 

304 await run_sync_in_worker_thread(webbrowser.open_new_tab, ui_login_url) 

305 

306 # Wait for the response from the browser, 

307 with anyio.move_on_after(120) as timeout_scope: 

308 app.console.print("Waiting for response...") 

309 await result_event.wait() 

310 

311 # Shut down the background uvicorn server 

312 tg.cancel_scope.cancel() 

313 

314 result = login_api.extra.get("result") 

315 if not result: 

316 if timeout_scope and timeout_scope.cancel_called: 

317 exit_with_error("Timed out while waiting for authorization.") 

318 else: 

319 exit_with_error("Aborted.") 

320 

321 if result.type == "success": 

322 return result.content.api_key 

323 else: 

324 exit_with_error(f"Failed to log in. {result.content.reason}") 

325 

326 

327async def check_key_is_valid_for_login(key: str) -> bool: 1a

328 """ 

329 Attempt to use a key to see if it is valid 

330 """ 

331 async with get_cloud_client(api_key=key) as client: 

332 try: 

333 await client.read_workspaces() 

334 return True 

335 except CloudUnauthorizedError: 

336 return False 

337 

338 

339async def _prompt_for_account_and_workspace( 1a

340 workspaces: list[Workspace], 

341) -> tuple[Workspace | None, bool]: 

342 if len(workspaces) > 10: 

343 # Group workspaces by account_id 

344 workspace_by_account: dict[uuid.UUID, list[Workspace]] = {} 

345 for workspace in workspaces: 

346 workspace_by_account.setdefault(workspace.account_id, []).append(workspace) 

347 

348 if len(workspace_by_account) == 1: 

349 account_id = next(iter(workspace_by_account.keys())) 

350 workspaces = workspace_by_account[account_id] 

351 else: 

352 accounts = [ 

353 { 

354 "account_id": account_id, 

355 "account_handle": workspace_by_account[account_id][ 

356 0 

357 ].account_handle, 

358 } 

359 for account_id in workspace_by_account.keys() 

360 ] 

361 account_options = [ 

362 (account, str(account["account_handle"])) for account in accounts 

363 ] 

364 account = prompt_select_from_list( 

365 app.console, 

366 "Which account would you like to use?", 

367 options=account_options, 

368 ) 

369 account_id = account["account_id"] 

370 if TYPE_CHECKING: 

371 assert isinstance(account_id, uuid.UUID) 

372 workspaces = workspace_by_account[account_id] 

373 

374 workspace_options: list[tuple[Workspace | None, str]] = [ 

375 (workspace, workspace.handle) for workspace in workspaces 

376 ] 

377 go_back_option = ( 

378 None, 

379 "[bold]Go back to account selection[/bold]", 

380 ) 

381 

382 result = prompt_select_from_list( 

383 app.console, 

384 "Which workspace would you like to use?", 

385 options=workspace_options + [go_back_option], 

386 ) 

387 

388 if not result: 

389 return None, True 

390 else: 

391 return result, False 

392 

393 

394@cloud_app.command() 1a

395async def login( 1a

396 key: Optional[str] = typer.Option( 

397 None, "--key", "-k", help="API Key to authenticate with Prefect" 

398 ), 

399 workspace_handle: Optional[str] = typer.Option( 

400 None, 

401 "--workspace", 

402 "-w", 

403 help=( 

404 "Full handle of workspace, in format '<account_handle>/<workspace_handle>'" 

405 ), 

406 ), 

407): 

408 """ 

409 Log in to Prefect Cloud. 

410 Creates a new profile configured to use the specified PREFECT_API_KEY. 

411 Uses a previously configured profile if it exists. 

412 """ 

413 if not is_interactive() and (not key or not workspace_handle): 

414 exit_with_error( 

415 "When not using an interactive terminal, you must supply a `--key` and" 

416 " `--workspace`." 

417 ) 

418 

419 profiles = load_profiles() 

420 current_profile = get_settings_context().profile 

421 env_var_api_key = os.getenv("PREFECT_API_KEY") 

422 selected_workspace = None 

423 

424 if env_var_api_key and key and env_var_api_key != key: 

425 exit_with_error( 

426 "Cannot log in with a key when a different PREFECT_API_KEY is present as an" 

427 " environment variable that will override it." 

428 ) 

429 

430 if key and env_var_api_key and env_var_api_key == key: 

431 is_valid_key = await check_key_is_valid_for_login(key) 

432 is_correct_key_format = key.startswith("pnu_") or key.startswith("pnb_") 

433 if not is_valid_key: 

434 help_message = "Please ensure your credentials are correct and unexpired." 

435 if not is_correct_key_format: 

436 help_message = "Your key is not in our expected format." 

437 exit_with_error( 

438 f"Unable to authenticate with Prefect Cloud. {help_message}" 

439 ) 

440 

441 already_logged_in_profiles: list[str] = [] 

442 for name, profile in profiles.items(): 

443 profile_key = profile.settings.get(PREFECT_API_KEY) 

444 if ( 

445 # If a key is provided, only show profiles with the same key 

446 (key and profile_key == key) 

447 # Otherwise, show all profiles with a key set 

448 or (not key and profile_key is not None) 

449 # Check that the key is usable to avoid suggesting unauthenticated profiles 

450 and await check_key_is_valid_for_login(profile_key) 

451 ): 

452 already_logged_in_profiles.append(name) 

453 

454 current_profile_is_logged_in = current_profile.name in already_logged_in_profiles 

455 

456 if current_profile_is_logged_in: 

457 app.console.print("It looks like you're already authenticated on this profile.") 

458 if is_interactive(): 

459 should_reauth = typer.confirm( 

460 "? Would you like to reauthenticate?", default=False 

461 ) 

462 else: 

463 should_reauth = True 

464 

465 if not should_reauth: 

466 app.console.print("Using the existing authentication on this profile.") 

467 key = PREFECT_API_KEY.value() 

468 

469 elif already_logged_in_profiles: 

470 app.console.print( 

471 "It looks like you're already authenticated with another profile." 

472 ) 

473 if typer.confirm( 

474 "? Would you like to switch profiles?", 

475 default=True, 

476 ): 

477 profile_name = prompt_select_from_list( 

478 app.console, 

479 "Which authenticated profile would you like to switch to?", 

480 already_logged_in_profiles, 

481 ) 

482 

483 profiles.set_active(profile_name) 

484 save_profiles(profiles) 

485 exit_with_success(f"Switched to authenticated profile {profile_name!r}.") 

486 

487 if not key: 

488 choice = prompt_select_from_list( 

489 app.console, 

490 "How would you like to authenticate?", 

491 [ 

492 ("browser", "Log in with a web browser"), 

493 ("key", "Paste an API key"), 

494 ], 

495 ) 

496 

497 if choice == "key": 

498 key = typer.prompt("Paste your API key", hide_input=True) 

499 elif choice == "browser": 

500 key = await login_with_browser() 

501 

502 if TYPE_CHECKING: 

503 assert isinstance(key, str) 

504 

505 async with get_cloud_client(api_key=key) as client: 

506 try: 

507 workspaces = await client.read_workspaces() 

508 current_workspace = get_current_workspace(workspaces) 

509 prompt_switch_workspace = False 

510 except CloudUnauthorizedError: 

511 if key.startswith("pcu"): 

512 help_message = ( 

513 "It looks like you're using API key from Cloud 1" 

514 " (https://cloud.prefect.io). Make sure that you generate API key" 

515 " using Cloud 2 (https://app.prefect.cloud)" 

516 ) 

517 elif not key.startswith("pnu_") and not key.startswith("pnb_"): 

518 help_message = ( 

519 "Your key is not in our expected format: 'pnu_' or 'pnb_'." 

520 ) 

521 else: 

522 help_message = ( 

523 "Please ensure your credentials are correct and unexpired." 

524 ) 

525 exit_with_error( 

526 f"Unable to authenticate with Prefect Cloud. {help_message}" 

527 ) 

528 except httpx.HTTPStatusError as exc: 

529 exit_with_error(f"Error connecting to Prefect Cloud: {exc!r}") 

530 

531 if workspace_handle: 

532 # Search for the given workspace 

533 for workspace in workspaces: 

534 if workspace.handle == workspace_handle: 

535 selected_workspace = workspace 

536 break 

537 else: 

538 if workspaces: 

539 hint = ( 

540 " Available workspaces:" 

541 f" {listrepr((w.handle for w in workspaces), ', ')}" 

542 ) 

543 else: 

544 hint = "" 

545 

546 exit_with_error(f"Workspace {workspace_handle!r} not found." + hint) 

547 else: 

548 # Prompt a switch if the number of workspaces is greater than one 

549 prompt_switch_workspace = len(workspaces) > 1 

550 

551 # Confirm that we want to switch if the current profile is already logged in 

552 if ( 

553 current_profile_is_logged_in and current_workspace is not None 

554 ) and prompt_switch_workspace: 

555 app.console.print( 

556 f"You are currently using workspace {current_workspace.handle!r}." 

557 ) 

558 prompt_switch_workspace = typer.confirm( 

559 "? Would you like to switch workspaces?", default=False 

560 ) 

561 if prompt_switch_workspace: 

562 go_back = True 

563 while go_back: 

564 selected_workspace, go_back = await _prompt_for_account_and_workspace( 

565 workspaces 

566 ) 

567 if selected_workspace is None: 

568 exit_with_error("No workspace selected.") 

569 

570 elif not selected_workspace and not workspace_handle: 

571 if current_workspace: 

572 selected_workspace = current_workspace 

573 elif len(workspaces) > 0: 

574 selected_workspace = workspaces[0] 

575 else: 

576 exit_with_error( 

577 "No workspaces found! Create a workspace at" 

578 f" {PREFECT_CLOUD_UI_URL.value()} and try again." 

579 ) 

580 

581 if TYPE_CHECKING: 

582 assert isinstance(selected_workspace, Workspace) 

583 

584 update_current_profile( 

585 { 

586 PREFECT_API_KEY: key, 

587 PREFECT_API_URL: selected_workspace.api_url(), 

588 } 

589 ) 

590 

591 exit_with_success( 

592 f"Authenticated with Prefect Cloud! Using workspace {selected_workspace.handle!r}." 

593 ) 

594 

595 

596@cloud_app.command() 1a

597async def logout(): 1a

598 """ 

599 Logout the current workspace. 

600 Reset PREFECT_API_KEY and PREFECT_API_URL to default. 

601 """ 

602 current_profile = prefect.context.get_settings_context().profile 

603 

604 if current_profile.settings.get(PREFECT_API_KEY) is None: 

605 exit_with_error("Current profile is not logged into Prefect Cloud.") 

606 

607 update_current_profile( 

608 { 

609 PREFECT_API_URL: None, 

610 PREFECT_API_KEY: None, 

611 }, 

612 ) 

613 

614 exit_with_success("Logged out from Prefect Cloud.") 

615 

616 

617@workspace_app.command() 1a

618async def ls(): 1a

619 """List available workspaces.""" 

620 

621 confirm_logged_in() 

622 

623 async with get_cloud_client() as client: 

624 try: 

625 workspaces = await client.read_workspaces() 

626 except CloudUnauthorizedError: 

627 exit_with_error( 

628 "Unable to authenticate. Please ensure your credentials are correct." 

629 ) 

630 

631 current_workspace = get_current_workspace(workspaces) 

632 

633 table = Table(caption="* active workspace") 

634 table.add_column( 

635 "[#024dfd]Workspaces:", justify="left", style="#8ea0ae", no_wrap=True 

636 ) 

637 

638 for workspace_handle in sorted(workspace.handle for workspace in workspaces): 

639 if current_workspace and workspace_handle == current_workspace.handle: 

640 table.add_row(f"[green]* {workspace_handle}[/green]") 

641 else: 

642 table.add_row(f" {workspace_handle}") 

643 

644 app.console.print(table) 

645 

646 

647@workspace_app.command() 1a

648async def set( 1a

649 workspace_handle: str = typer.Option( 

650 None, 

651 "--workspace", 

652 "-w", 

653 help=( 

654 "Full handle of workspace, in format '<account_handle>/<workspace_handle>'" 

655 ), 

656 ), 

657): 

658 """Set current workspace. Shows a workspace picker if no workspace is specified.""" 

659 confirm_logged_in() 

660 async with get_cloud_client() as client: 

661 try: 

662 workspaces = await client.read_workspaces() 

663 except CloudUnauthorizedError: 

664 exit_with_error( 

665 "Unable to authenticate. Please ensure your credentials are correct." 

666 ) 

667 

668 if workspace_handle: 

669 # Search for the given workspace 

670 for workspace in workspaces: 

671 if workspace.handle == workspace_handle: 

672 break 

673 else: 

674 exit_with_error(f"Workspace {workspace_handle!r} not found.") 

675 else: 

676 if not workspaces: 

677 exit_with_error("No workspaces found in the selected account.") 

678 

679 # Store the original list of workspaces 

680 original_workspaces = workspaces.copy() 

681 

682 go_back = True 

683 workspace = None 

684 loop_count = 0 

685 

686 while go_back: 

687 loop_count += 1 

688 

689 # If we're going back, use the original list of workspaces 

690 if loop_count > 1: 

691 workspaces = original_workspaces.copy() 

692 

693 workspace, go_back = await _prompt_for_account_and_workspace(workspaces) 

694 

695 if workspace is None: 

696 exit_with_error("No workspace selected.") 

697 

698 profile = update_current_profile({PREFECT_API_URL: workspace.api_url()}) 

699 exit_with_success( 

700 f"Successfully set workspace to {workspace.handle!r} in profile" 

701 f" {profile.name!r}." 

702 )