Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/cloud/__init__.py: 17%
329 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Command line interface for interacting with Prefect Cloud
3"""
5from __future__ import annotations 1a
7import os 1a
8import traceback 1a
9import uuid 1a
10import urllib.parse 1a
11import warnings 1a
12import webbrowser 1a
13from contextlib import asynccontextmanager 1a
14from typing import ( 1a
15 TYPE_CHECKING,
16 Iterable,
17 Optional,
18 TypeVar,
19 Union,
20 overload,
21)
23import anyio 1a
24import anyio.abc 1a
25import httpx 1a
26import readchar 1a
27import typer 1a
28import uvicorn 1a
29from fastapi import FastAPI 1a
30from fastapi.middleware.cors import CORSMiddleware 1a
32from rich.console import Console 1a
33from rich.live import Live 1a
34from rich.table import Table 1a
35from typing_extensions import Literal 1a
37import prefect.context 1a
38from prefect.cli._types import PrefectTyper 1a
39from prefect.cli._utilities import exit_with_error, exit_with_success 1a
40from prefect.cli.root import app, is_interactive 1a
41from prefect.client.cloud import CloudUnauthorizedError, get_cloud_client 1a
42from prefect.client.schemas import Workspace 1a
43from prefect.context import get_settings_context 1a
44from prefect.settings import ( 1a
45 PREFECT_API_KEY,
46 PREFECT_API_URL,
47 PREFECT_CLOUD_UI_URL,
48 load_profiles,
49 save_profiles,
50 update_current_profile,
51)
52from prefect.utilities.asyncutils import run_sync_in_worker_thread 1a
53from prefect.utilities.collections import listrepr 1a
55from pydantic import BaseModel 1a
57T = TypeVar("T") 1a
59# Set up the `prefect cloud` and `prefect cloud workspaces` CLI applications
60cloud_app: PrefectTyper = PrefectTyper( 1a
61 name="cloud", help="Authenticate and interact with Prefect Cloud"
62)
63workspace_app: PrefectTyper = PrefectTyper( 1a
64 name="workspace", help="View and set Prefect Cloud Workspaces"
65)
66cloud_app.add_typer(workspace_app, aliases=["workspaces"]) 1a
67app.add_typer(cloud_app) 1a
70def set_login_api_ready_event() -> None: 1a
71 login_api.extra["ready-event"].set()
74@asynccontextmanager 1a
75async def lifespan(app: FastAPI): 1a
76 try:
77 set_login_api_ready_event()
78 yield
79 finally:
80 pass
83login_api: FastAPI = FastAPI(lifespan=lifespan) 1a
84""" 1a
85This small API server is used for data transmission for browser-based log in.
86"""
88login_api.add_middleware( 1a
89 CORSMiddleware,
90 allow_origins=["*"],
91 allow_methods=["*"],
92 allow_headers=["*"],
93)
96class LoginSuccess(BaseModel): 1a
97 api_key: str 1a
100class LoginFailed(BaseModel): 1a
101 reason: str 1a
104class LoginResult(BaseModel): 1a
105 type: Literal["success", "failure"] 1a
106 content: Union[LoginSuccess, LoginFailed] 1a
109class ServerExit(Exception): 1a
110 pass 1a
113@login_api.post("/success") 1a
114def receive_login(payload: LoginSuccess) -> None: 1a
115 login_api.extra["result"] = LoginResult(type="success", content=payload)
116 login_api.extra["result-event"].set()
119@login_api.post("/failure") 1a
120def receive_failure(payload: LoginFailed) -> None: 1a
121 login_api.extra["result"] = LoginResult(type="failure", content=payload)
122 login_api.extra["result-event"].set()
125async def serve_login_api( 1a
126 cancel_scope: anyio.CancelScope, task_status: anyio.abc.TaskStatus[uvicorn.Server]
127) -> None:
128 config = uvicorn.Config(login_api, port=0, log_level="critical")
129 server = uvicorn.Server(config)
131 try:
132 # Yield the server object
133 task_status.started(server)
134 with warnings.catch_warnings():
135 # Uvicorn uses the deprecated pieces of websockets, filter out
136 # the warnings until uvicorn has its dependencies updated
137 warnings.filterwarnings(
138 "ignore", category=DeprecationWarning, module="websockets"
139 )
140 warnings.filterwarnings(
141 "ignore",
142 category=DeprecationWarning,
143 module="uvicorn.protocols.websockets",
144 )
145 await server.serve()
146 except anyio.get_cancelled_exc_class():
147 pass # Already cancelled, do not cancel again
148 except SystemExit as exc:
149 # If uvicorn is misconfigured, it will throw a system exit and hide the exc
150 app.console.print("[red][bold]X Error starting login service!")
151 cause = exc.__context__ # Hide the system exit
152 if TYPE_CHECKING:
153 assert isinstance(cause, BaseException)
154 traceback.print_exception(type(cause), value=cause, tb=cause.__traceback__)
155 cancel_scope.cancel()
156 else:
157 # Exit if we are done serving the API
158 # Uvicorn overrides signal handlers so without this Ctrl-C is broken
159 cancel_scope.cancel()
162def confirm_logged_in() -> None: 1a
163 if not PREFECT_API_KEY:
164 profile = prefect.context.get_settings_context().profile
165 exit_with_error(
166 f"Currently not authenticated in profile {profile.name!r}. "
167 "Please log in with `prefect cloud login`."
168 )
171def get_current_workspace(workspaces: Iterable[Workspace]) -> Workspace | None: 1a
172 current_api_url = PREFECT_API_URL.value()
174 if not current_api_url:
175 return None
177 for workspace in workspaces:
178 if workspace.api_url() == current_api_url:
179 return workspace
181 return None
184@overload 1a
185def prompt_select_from_list( 185 ↛ exitline 185 didn't return from function 'prompt_select_from_list' because 1a
186 console: Console, prompt: str, options: list[str]
187) -> str: ...
190@overload 1a
191def prompt_select_from_list( 191 ↛ exitline 191 didn't return from function 'prompt_select_from_list' because 1a
192 console: Console, prompt: str, options: list[tuple[T, str]]
193) -> T: ...
196def prompt_select_from_list( 1a
197 console: Console, prompt: str, options: list[str] | list[tuple[T, str]]
198) -> str | T:
199 """
200 Given a list of options, display the values to user in a table and prompt them
201 to select one.
203 Args:
204 options: A list of options to present to the user.
205 A list of tuples can be passed as key value pairs. If a value is chosen, the
206 key will be returned.
208 Returns:
209 str: the selected option
210 """
211 current_idx = 0
212 selected_option = None
214 def build_table() -> Table:
215 """
216 Generate a table of options. The `current_idx` will be highlighted.
217 """
219 table = Table(box=None, header_style=None, padding=(0, 0))
220 table.add_column(
221 f"? [bold]{prompt}[/] [bright_blue][Use arrows to move; enter to select]",
222 justify="left",
223 no_wrap=True,
224 )
226 for i, option in enumerate(options):
227 if isinstance(option, tuple):
228 option = option[1]
230 if i == current_idx:
231 # Use blue for selected options
232 table.add_row("[bold][blue]> " + option)
233 else:
234 table.add_row(" " + option)
235 return table
237 with Live(build_table(), auto_refresh=False, console=console) as live:
238 while selected_option is None:
239 key = readchar.readkey()
241 if key == readchar.key.UP:
242 current_idx = current_idx - 1
243 # wrap to bottom if at the top
244 if current_idx < 0:
245 current_idx = len(options) - 1
246 elif key == readchar.key.DOWN:
247 current_idx = current_idx + 1
248 # wrap to top if at the bottom
249 if current_idx >= len(options):
250 current_idx = 0
251 elif key == readchar.key.CTRL_C:
252 # gracefully exit with no message
253 exit_with_error("")
254 elif key == readchar.key.ENTER or key == readchar.key.CR:
255 selected_option = options[current_idx]
256 # Break out of the loop immediately after setting selected_option
257 break
259 live.update(build_table(), refresh=True)
261 # Convert tuple to its first element if needed
262 if isinstance(selected_option, tuple):
263 selected_option = selected_option[0]
265 return selected_option
268async def login_with_browser() -> str: 1a
269 """
270 Perform login using the browser.
272 On failure, this function will exit the process.
273 On success, it will return an API key.
274 """
276 # Set up an event that the login API will toggle on startup
277 ready_event = login_api.extra["ready-event"] = anyio.Event()
279 # Set up an event that the login API will set when a response comes from the UI
280 result_event = login_api.extra["result-event"] = anyio.Event()
282 timeout_scope = None
283 async with anyio.create_task_group() as tg:
284 # Run a server in the background to get payload from the browser
285 server = await tg.start(serve_login_api, tg.cancel_scope)
287 # Wait for the login server to be ready
288 with anyio.fail_after(10):
289 await ready_event.wait()
291 # The server may not actually be serving as the lifespan is started first
292 while not server.started:
293 await anyio.sleep(0)
295 # Get the port the server is using
296 server_port = server.servers[0].sockets[0].getsockname()[1]
297 callback = urllib.parse.quote(f"http://localhost:{server_port}")
298 ui_login_url = (
299 PREFECT_CLOUD_UI_URL.value() + f"/auth/client?callback={callback}"
300 )
302 # Then open the authorization page in a new browser tab
303 app.console.print("Opening browser...")
304 await run_sync_in_worker_thread(webbrowser.open_new_tab, ui_login_url)
306 # Wait for the response from the browser,
307 with anyio.move_on_after(120) as timeout_scope:
308 app.console.print("Waiting for response...")
309 await result_event.wait()
311 # Shut down the background uvicorn server
312 tg.cancel_scope.cancel()
314 result = login_api.extra.get("result")
315 if not result:
316 if timeout_scope and timeout_scope.cancel_called:
317 exit_with_error("Timed out while waiting for authorization.")
318 else:
319 exit_with_error("Aborted.")
321 if result.type == "success":
322 return result.content.api_key
323 else:
324 exit_with_error(f"Failed to log in. {result.content.reason}")
327async def check_key_is_valid_for_login(key: str) -> bool: 1a
328 """
329 Attempt to use a key to see if it is valid
330 """
331 async with get_cloud_client(api_key=key) as client:
332 try:
333 await client.read_workspaces()
334 return True
335 except CloudUnauthorizedError:
336 return False
339async def _prompt_for_account_and_workspace( 1a
340 workspaces: list[Workspace],
341) -> tuple[Workspace | None, bool]:
342 if len(workspaces) > 10:
343 # Group workspaces by account_id
344 workspace_by_account: dict[uuid.UUID, list[Workspace]] = {}
345 for workspace in workspaces:
346 workspace_by_account.setdefault(workspace.account_id, []).append(workspace)
348 if len(workspace_by_account) == 1:
349 account_id = next(iter(workspace_by_account.keys()))
350 workspaces = workspace_by_account[account_id]
351 else:
352 accounts = [
353 {
354 "account_id": account_id,
355 "account_handle": workspace_by_account[account_id][
356 0
357 ].account_handle,
358 }
359 for account_id in workspace_by_account.keys()
360 ]
361 account_options = [
362 (account, str(account["account_handle"])) for account in accounts
363 ]
364 account = prompt_select_from_list(
365 app.console,
366 "Which account would you like to use?",
367 options=account_options,
368 )
369 account_id = account["account_id"]
370 if TYPE_CHECKING:
371 assert isinstance(account_id, uuid.UUID)
372 workspaces = workspace_by_account[account_id]
374 workspace_options: list[tuple[Workspace | None, str]] = [
375 (workspace, workspace.handle) for workspace in workspaces
376 ]
377 go_back_option = (
378 None,
379 "[bold]Go back to account selection[/bold]",
380 )
382 result = prompt_select_from_list(
383 app.console,
384 "Which workspace would you like to use?",
385 options=workspace_options + [go_back_option],
386 )
388 if not result:
389 return None, True
390 else:
391 return result, False
394@cloud_app.command() 1a
395async def login( 1a
396 key: Optional[str] = typer.Option(
397 None, "--key", "-k", help="API Key to authenticate with Prefect"
398 ),
399 workspace_handle: Optional[str] = typer.Option(
400 None,
401 "--workspace",
402 "-w",
403 help=(
404 "Full handle of workspace, in format '<account_handle>/<workspace_handle>'"
405 ),
406 ),
407):
408 """
409 Log in to Prefect Cloud.
410 Creates a new profile configured to use the specified PREFECT_API_KEY.
411 Uses a previously configured profile if it exists.
412 """
413 if not is_interactive() and (not key or not workspace_handle):
414 exit_with_error(
415 "When not using an interactive terminal, you must supply a `--key` and"
416 " `--workspace`."
417 )
419 profiles = load_profiles()
420 current_profile = get_settings_context().profile
421 env_var_api_key = os.getenv("PREFECT_API_KEY")
422 selected_workspace = None
424 if env_var_api_key and key and env_var_api_key != key:
425 exit_with_error(
426 "Cannot log in with a key when a different PREFECT_API_KEY is present as an"
427 " environment variable that will override it."
428 )
430 if key and env_var_api_key and env_var_api_key == key:
431 is_valid_key = await check_key_is_valid_for_login(key)
432 is_correct_key_format = key.startswith("pnu_") or key.startswith("pnb_")
433 if not is_valid_key:
434 help_message = "Please ensure your credentials are correct and unexpired."
435 if not is_correct_key_format:
436 help_message = "Your key is not in our expected format."
437 exit_with_error(
438 f"Unable to authenticate with Prefect Cloud. {help_message}"
439 )
441 already_logged_in_profiles: list[str] = []
442 for name, profile in profiles.items():
443 profile_key = profile.settings.get(PREFECT_API_KEY)
444 if (
445 # If a key is provided, only show profiles with the same key
446 (key and profile_key == key)
447 # Otherwise, show all profiles with a key set
448 or (not key and profile_key is not None)
449 # Check that the key is usable to avoid suggesting unauthenticated profiles
450 and await check_key_is_valid_for_login(profile_key)
451 ):
452 already_logged_in_profiles.append(name)
454 current_profile_is_logged_in = current_profile.name in already_logged_in_profiles
456 if current_profile_is_logged_in:
457 app.console.print("It looks like you're already authenticated on this profile.")
458 if is_interactive():
459 should_reauth = typer.confirm(
460 "? Would you like to reauthenticate?", default=False
461 )
462 else:
463 should_reauth = True
465 if not should_reauth:
466 app.console.print("Using the existing authentication on this profile.")
467 key = PREFECT_API_KEY.value()
469 elif already_logged_in_profiles:
470 app.console.print(
471 "It looks like you're already authenticated with another profile."
472 )
473 if typer.confirm(
474 "? Would you like to switch profiles?",
475 default=True,
476 ):
477 profile_name = prompt_select_from_list(
478 app.console,
479 "Which authenticated profile would you like to switch to?",
480 already_logged_in_profiles,
481 )
483 profiles.set_active(profile_name)
484 save_profiles(profiles)
485 exit_with_success(f"Switched to authenticated profile {profile_name!r}.")
487 if not key:
488 choice = prompt_select_from_list(
489 app.console,
490 "How would you like to authenticate?",
491 [
492 ("browser", "Log in with a web browser"),
493 ("key", "Paste an API key"),
494 ],
495 )
497 if choice == "key":
498 key = typer.prompt("Paste your API key", hide_input=True)
499 elif choice == "browser":
500 key = await login_with_browser()
502 if TYPE_CHECKING:
503 assert isinstance(key, str)
505 async with get_cloud_client(api_key=key) as client:
506 try:
507 workspaces = await client.read_workspaces()
508 current_workspace = get_current_workspace(workspaces)
509 prompt_switch_workspace = False
510 except CloudUnauthorizedError:
511 if key.startswith("pcu"):
512 help_message = (
513 "It looks like you're using API key from Cloud 1"
514 " (https://cloud.prefect.io). Make sure that you generate API key"
515 " using Cloud 2 (https://app.prefect.cloud)"
516 )
517 elif not key.startswith("pnu_") and not key.startswith("pnb_"):
518 help_message = (
519 "Your key is not in our expected format: 'pnu_' or 'pnb_'."
520 )
521 else:
522 help_message = (
523 "Please ensure your credentials are correct and unexpired."
524 )
525 exit_with_error(
526 f"Unable to authenticate with Prefect Cloud. {help_message}"
527 )
528 except httpx.HTTPStatusError as exc:
529 exit_with_error(f"Error connecting to Prefect Cloud: {exc!r}")
531 if workspace_handle:
532 # Search for the given workspace
533 for workspace in workspaces:
534 if workspace.handle == workspace_handle:
535 selected_workspace = workspace
536 break
537 else:
538 if workspaces:
539 hint = (
540 " Available workspaces:"
541 f" {listrepr((w.handle for w in workspaces), ', ')}"
542 )
543 else:
544 hint = ""
546 exit_with_error(f"Workspace {workspace_handle!r} not found." + hint)
547 else:
548 # Prompt a switch if the number of workspaces is greater than one
549 prompt_switch_workspace = len(workspaces) > 1
551 # Confirm that we want to switch if the current profile is already logged in
552 if (
553 current_profile_is_logged_in and current_workspace is not None
554 ) and prompt_switch_workspace:
555 app.console.print(
556 f"You are currently using workspace {current_workspace.handle!r}."
557 )
558 prompt_switch_workspace = typer.confirm(
559 "? Would you like to switch workspaces?", default=False
560 )
561 if prompt_switch_workspace:
562 go_back = True
563 while go_back:
564 selected_workspace, go_back = await _prompt_for_account_and_workspace(
565 workspaces
566 )
567 if selected_workspace is None:
568 exit_with_error("No workspace selected.")
570 elif not selected_workspace and not workspace_handle:
571 if current_workspace:
572 selected_workspace = current_workspace
573 elif len(workspaces) > 0:
574 selected_workspace = workspaces[0]
575 else:
576 exit_with_error(
577 "No workspaces found! Create a workspace at"
578 f" {PREFECT_CLOUD_UI_URL.value()} and try again."
579 )
581 if TYPE_CHECKING:
582 assert isinstance(selected_workspace, Workspace)
584 update_current_profile(
585 {
586 PREFECT_API_KEY: key,
587 PREFECT_API_URL: selected_workspace.api_url(),
588 }
589 )
591 exit_with_success(
592 f"Authenticated with Prefect Cloud! Using workspace {selected_workspace.handle!r}."
593 )
596@cloud_app.command() 1a
597async def logout(): 1a
598 """
599 Logout the current workspace.
600 Reset PREFECT_API_KEY and PREFECT_API_URL to default.
601 """
602 current_profile = prefect.context.get_settings_context().profile
604 if current_profile.settings.get(PREFECT_API_KEY) is None:
605 exit_with_error("Current profile is not logged into Prefect Cloud.")
607 update_current_profile(
608 {
609 PREFECT_API_URL: None,
610 PREFECT_API_KEY: None,
611 },
612 )
614 exit_with_success("Logged out from Prefect Cloud.")
617@workspace_app.command() 1a
618async def ls(): 1a
619 """List available workspaces."""
621 confirm_logged_in()
623 async with get_cloud_client() as client:
624 try:
625 workspaces = await client.read_workspaces()
626 except CloudUnauthorizedError:
627 exit_with_error(
628 "Unable to authenticate. Please ensure your credentials are correct."
629 )
631 current_workspace = get_current_workspace(workspaces)
633 table = Table(caption="* active workspace")
634 table.add_column(
635 "[#024dfd]Workspaces:", justify="left", style="#8ea0ae", no_wrap=True
636 )
638 for workspace_handle in sorted(workspace.handle for workspace in workspaces):
639 if current_workspace and workspace_handle == current_workspace.handle:
640 table.add_row(f"[green]* {workspace_handle}[/green]")
641 else:
642 table.add_row(f" {workspace_handle}")
644 app.console.print(table)
647@workspace_app.command() 1a
648async def set( 1a
649 workspace_handle: str = typer.Option(
650 None,
651 "--workspace",
652 "-w",
653 help=(
654 "Full handle of workspace, in format '<account_handle>/<workspace_handle>'"
655 ),
656 ),
657):
658 """Set current workspace. Shows a workspace picker if no workspace is specified."""
659 confirm_logged_in()
660 async with get_cloud_client() as client:
661 try:
662 workspaces = await client.read_workspaces()
663 except CloudUnauthorizedError:
664 exit_with_error(
665 "Unable to authenticate. Please ensure your credentials are correct."
666 )
668 if workspace_handle:
669 # Search for the given workspace
670 for workspace in workspaces:
671 if workspace.handle == workspace_handle:
672 break
673 else:
674 exit_with_error(f"Workspace {workspace_handle!r} not found.")
675 else:
676 if not workspaces:
677 exit_with_error("No workspaces found in the selected account.")
679 # Store the original list of workspaces
680 original_workspaces = workspaces.copy()
682 go_back = True
683 workspace = None
684 loop_count = 0
686 while go_back:
687 loop_count += 1
689 # If we're going back, use the original list of workspaces
690 if loop_count > 1:
691 workspaces = original_workspaces.copy()
693 workspace, go_back = await _prompt_for_account_and_workspace(workspaces)
695 if workspace is None:
696 exit_with_error("No workspace selected.")
698 profile = update_current_profile({PREFECT_API_URL: workspace.api_url()})
699 exit_with_success(
700 f"Successfully set workspace to {workspace.handle!r} in profile"
701 f" {profile.name!r}."
702 )