Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/profile.py: 16%
213 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Command line interface for working with profiles.
3"""
5import os 1a
6import shutil 1a
7import textwrap 1a
8from typing import Optional 1a
10import httpx 1a
11import orjson 1a
12import typer 1a
13from rich.progress import Progress, SpinnerColumn, TextColumn 1a
14from rich.table import Table 1a
16import prefect.context 1a
17import prefect.settings 1a
18import prefect.settings.profiles 1a
19from prefect.cli._types import PrefectTyper 1a
20from prefect.cli._utilities import exit_with_error, exit_with_success 1a
21from prefect.cli.cloud import CloudUnauthorizedError, get_cloud_client 1a
22from prefect.cli.root import app, is_interactive 1a
23from prefect.client.base import determine_server_type 1a
24from prefect.client.orchestration import ServerType, get_client 1a
25from prefect.context import use_profile 1a
26from prefect.settings import ProfilesCollection 1a
27from prefect.utilities.collections import AutoEnum 1a
29profile_app: PrefectTyper = PrefectTyper( 1a
30 name="profile", help="Select and manage Prefect profiles."
31)
32app.add_typer(profile_app, aliases=["profiles"]) 1a
34_OLD_MINIMAL_DEFAULT_PROFILE_CONTENT: str = """active = "default" 1a
36[profiles.default]"""
39@profile_app.command() 1a
40def ls(): 1a
41 """
42 List profile names.
43 """
44 profiles = prefect.settings.load_profiles(include_defaults=False)
45 current_profile = prefect.context.get_settings_context().profile
46 current_name = current_profile.name if current_profile is not None else None
48 table = Table(caption="* active profile")
49 table.add_column(
50 "[#024dfd]Available Profiles:", justify="right", style="#8ea0ae", no_wrap=True
51 )
53 for name in profiles:
54 if name == current_name:
55 table.add_row(f"[green] * {name}[/green]")
56 else:
57 table.add_row(f" {name}")
58 app.console.print(table)
61@profile_app.command() 1a
62def create( 1a
63 name: str,
64 from_name: str = typer.Option(None, "--from", help="Copy an existing profile."),
65):
66 """
67 Create a new profile.
68 """
70 profiles = prefect.settings.load_profiles(include_defaults=False)
71 if name in profiles:
72 app.console.print(
73 textwrap.dedent(
74 f"""
75 [red]Profile {name!r} already exists.[/red]
76 To create a new profile, remove the existing profile first:
78 prefect profile delete {name!r}
79 """
80 ).strip()
81 )
82 raise typer.Exit(1)
84 if from_name:
85 if from_name not in profiles:
86 exit_with_error(f"Profile {from_name!r} not found.")
88 # Create a copy of the profile with a new name and add to the collection
89 profiles.add_profile(profiles[from_name].model_copy(update={"name": name}))
90 else:
91 profiles.add_profile(prefect.settings.Profile(name=name, settings={}))
93 prefect.settings.save_profiles(profiles)
95 app.console.print(
96 textwrap.dedent(
97 f"""
98 Created profile with properties:
99 name - {name!r}
100 from name - {from_name or None}
102 Use created profile for future, subsequent commands:
103 prefect profile use {name!r}
105 Use created profile temporarily for a single command:
106 prefect -p {name!r} config view
107 """
108 )
109 )
112@profile_app.command() 1a
113async def use(name: str): 1a
114 """
115 Set the given profile to active.
116 """
117 status_messages = {
118 ConnectionStatus.CLOUD_CONNECTED: (
119 exit_with_success,
120 f"Connected to Prefect Cloud using profile {name!r}",
121 ),
122 ConnectionStatus.CLOUD_ERROR: (
123 exit_with_error,
124 f"Error connecting to Prefect Cloud using profile {name!r}",
125 ),
126 ConnectionStatus.CLOUD_UNAUTHORIZED: (
127 exit_with_error,
128 f"Error authenticating with Prefect Cloud using profile {name!r}",
129 ),
130 ConnectionStatus.SERVER_CONNECTED: (
131 exit_with_success,
132 f"Connected to Prefect server using profile {name!r}",
133 ),
134 ConnectionStatus.SERVER_ERROR: (
135 exit_with_error,
136 f"Error connecting to Prefect server using profile {name!r}",
137 ),
138 ConnectionStatus.EPHEMERAL: (
139 exit_with_success,
140 (
141 f"No Prefect server specified using profile {name!r} - the API will run"
142 " in ephemeral mode."
143 ),
144 ),
145 ConnectionStatus.UNCONFIGURED: (
146 exit_with_error,
147 (
148 f"Prefect server URL not configured using profile {name!r} - please"
149 " configure the server URL or enable ephemeral mode."
150 ),
151 ),
152 ConnectionStatus.INVALID_API: (
153 exit_with_error,
154 "Error connecting to Prefect API URL",
155 ),
156 }
158 profiles = prefect.settings.load_profiles()
159 if name not in profiles.names:
160 exit_with_error(f"Profile {name!r} not found.")
162 profiles.set_active(name)
163 prefect.settings.save_profiles(profiles)
165 with Progress(
166 SpinnerColumn(),
167 TextColumn("[progress.description]{task.description}"),
168 transient=False,
169 ) as progress:
170 progress.add_task(
171 description="Checking API connectivity...",
172 total=None,
173 )
175 with use_profile(name, include_current_context=False):
176 connection_status = await check_server_connection()
178 exit_method, msg = status_messages[connection_status]
180 exit_method(msg)
183@profile_app.command() 1a
184def delete(name: str): 1a
185 """
186 Delete the given profile.
187 """
188 profiles = prefect.settings.load_profiles()
189 if name not in profiles:
190 exit_with_error(f"Profile {name!r} not found.")
192 current_profile = prefect.context.get_settings_context().profile
193 if current_profile.name == name:
194 exit_with_error(
195 f"Profile {name!r} is the active profile. You must switch profiles before"
196 " it can be deleted."
197 )
198 if is_interactive() and not typer.confirm(
199 (f"Are you sure you want to delete profile with name {name!r}?"),
200 default=False,
201 ):
202 exit_with_error("Deletion aborted.")
203 profiles.remove_profile(name)
205 prefect.settings.save_profiles(profiles)
206 exit_with_success(f"Removed profile {name!r}.")
209@profile_app.command() 1a
210def rename(name: str, new_name: str): 1a
211 """
212 Change the name of a profile.
213 """
214 profiles = prefect.settings.load_profiles(include_defaults=False)
215 if name not in profiles:
216 exit_with_error(f"Profile {name!r} not found.")
218 if new_name in profiles:
219 exit_with_error(f"Profile {new_name!r} already exists.")
221 profiles.add_profile(profiles[name].model_copy(update={"name": new_name}))
222 profiles.remove_profile(name)
224 # If the active profile was renamed switch the active profile to the new name.
225 prefect.context.get_settings_context().profile
226 if profiles.active_name == name:
227 profiles.set_active(new_name)
228 if os.environ.get("PREFECT_PROFILE") == name:
229 app.console.print(
230 f"You have set your current profile to {name!r} with the "
231 "PREFECT_PROFILE environment variable. You must update this variable to "
232 f"{new_name!r} to continue using the profile."
233 )
235 prefect.settings.save_profiles(profiles)
236 exit_with_success(f"Renamed profile {name!r} to {new_name!r}.")
239@profile_app.command() 1a
240def inspect( 1a
241 name: Optional[str] = typer.Argument(
242 None, help="Name of profile to inspect; defaults to active profile."
243 ),
244 output: Optional[str] = typer.Option(
245 None,
246 "--output",
247 "-o",
248 help="Specify an output format. Currently supports: json",
249 ),
250):
251 """
252 Display settings from a given profile; defaults to active.
253 """
254 if output and output.lower() != "json":
255 exit_with_error("Only 'json' output format is supported.")
257 profiles = prefect.settings.load_profiles()
258 if name is None:
259 current_profile = prefect.context.get_settings_context().profile
260 if not current_profile:
261 exit_with_error("No active profile set - please provide a name to inspect.")
262 name = current_profile.name
263 print(f"No name provided, defaulting to {name!r}")
264 if name not in profiles:
265 exit_with_error(f"Profile {name!r} not found.")
267 if not profiles[name].settings:
268 # TODO: Consider instructing on how to add settings.
269 if output and output.lower() == "json":
270 app.console.print("{}")
271 else:
272 print(f"Profile {name!r} is empty.")
273 return
275 if output and output.lower() == "json":
276 profile_data = {
277 setting.name: value for setting, value in profiles[name].settings.items()
278 }
279 json_output = orjson.dumps(profile_data, option=orjson.OPT_INDENT_2).decode()
280 app.console.print(json_output)
281 else:
282 for setting, value in profiles[name].settings.items():
283 app.console.print(f"{setting.name}='{value}'")
286def show_profile_changes( 1a
287 user_profiles: ProfilesCollection, default_profiles: ProfilesCollection
288) -> bool:
289 changes: list[tuple[str, str]] = []
291 for name in default_profiles.names:
292 if name not in user_profiles:
293 changes.append(("add", name))
295 if not changes:
296 app.console.print(
297 "[green]No changes needed. All profiles are up to date.[/green]"
298 )
299 return False
301 app.console.print("\n[bold cyan]Proposed Changes:[/bold cyan]")
302 for change in changes:
303 if change[0] == "migrate":
304 app.console.print(
305 f" [yellow]•[/yellow] Migrate '{change[1]}' to '{change[2]}'"
306 )
307 elif change[0] == "add":
308 app.console.print(f" [blue]•[/blue] Add '{change[1]}'")
310 return True
313@profile_app.command() 1a
314def populate_defaults(): 1a
315 """Populate the profiles configuration with default base profiles, preserving existing user profiles."""
316 from prefect.settings.profiles import (
317 _read_profiles_from, # type: ignore[reportPrivateUsage]
318 _write_profiles_to, # type: ignore[reportPrivateUsage]
319 )
321 user_path = prefect.settings.PREFECT_PROFILES_PATH.value()
322 default_profiles = _read_profiles_from(prefect.settings.DEFAULT_PROFILES_PATH)
324 if user_path.exists():
325 user_profiles = _read_profiles_from(user_path)
327 if not show_profile_changes(user_profiles, default_profiles):
328 return
330 # Backup prompt
331 if typer.confirm(f"\nBack up existing profiles to {user_path}.bak?"):
332 shutil.copy(user_path, f"{user_path}.bak")
333 app.console.print(f"Profiles backed up to {user_path}.bak")
334 else:
335 user_profiles = ProfilesCollection([])
336 app.console.print(
337 "\n[bold]Creating new profiles file with default profiles.[/bold]"
338 )
339 show_profile_changes(user_profiles, default_profiles)
341 # Update prompt
342 if not typer.confirm(f"\nUpdate profiles at {user_path}?"):
343 app.console.print("Operation cancelled.")
344 return
346 for name, profile in default_profiles.items():
347 if name not in user_profiles:
348 user_profiles.add_profile(profile)
350 _write_profiles_to(user_path, user_profiles)
351 app.console.print(f"\nProfiles updated in [green]{user_path}[/green]")
352 app.console.print(
353 "\nUse with [green]prefect profile use[/green] [blue][PROFILE-NAME][/blue]"
354 )
355 app.console.print("\nAvailable profiles:")
356 for name in user_profiles.names:
357 app.console.print(f" - {name}")
360class ConnectionStatus(AutoEnum): 1a
361 CLOUD_CONNECTED = AutoEnum.auto() 1a
362 CLOUD_ERROR = AutoEnum.auto() 1a
363 CLOUD_UNAUTHORIZED = AutoEnum.auto() 1a
364 SERVER_CONNECTED = AutoEnum.auto() 1a
365 SERVER_ERROR = AutoEnum.auto() 1a
366 UNCONFIGURED = AutoEnum.auto() 1a
367 EPHEMERAL = AutoEnum.auto() 1a
368 INVALID_API = AutoEnum.auto() 1a
371async def check_server_connection() -> ConnectionStatus: 1a
372 httpx_settings = dict(timeout=3)
373 try:
374 # First determine the server type based on the URL
375 server_type = determine_server_type()
377 # Only try to connect to Cloud if the URL looks like a Cloud URL
378 if server_type == ServerType.CLOUD:
379 try:
380 cloud_client = get_cloud_client(
381 httpx_settings=httpx_settings, infer_cloud_url=True
382 )
383 async with cloud_client:
384 await cloud_client.api_healthcheck()
385 return ConnectionStatus.CLOUD_CONNECTED
386 except CloudUnauthorizedError:
387 # if the Cloud API exists and fails to authenticate, notify the user
388 return ConnectionStatus.CLOUD_UNAUTHORIZED
389 except (httpx.HTTPStatusError, Exception):
390 return ConnectionStatus.CLOUD_ERROR
392 # For non-Cloud URLs, try to connect as a hosted Prefect instance
393 if server_type == ServerType.EPHEMERAL:
394 return ConnectionStatus.EPHEMERAL
395 elif server_type == ServerType.UNCONFIGURED:
396 return ConnectionStatus.UNCONFIGURED
398 # Try to connect to the server
399 try:
400 client = get_client(httpx_settings=httpx_settings)
401 async with client:
402 connect_error = await client.api_healthcheck()
403 if connect_error is not None:
404 return ConnectionStatus.SERVER_ERROR
405 else:
406 return ConnectionStatus.SERVER_CONNECTED
407 except Exception:
408 return ConnectionStatus.SERVER_ERROR
409 except TypeError:
410 # if no Prefect API URL has been set, httpx will throw a TypeError
411 try:
412 # try to connect with the client anyway, it will likely use an
413 # ephemeral Prefect instance
414 server_type = determine_server_type()
415 if server_type == ServerType.EPHEMERAL:
416 return ConnectionStatus.EPHEMERAL
417 elif server_type == ServerType.UNCONFIGURED:
418 return ConnectionStatus.UNCONFIGURED
419 client = get_client(httpx_settings=httpx_settings)
420 if client.server_type == ServerType.EPHEMERAL:
421 return ConnectionStatus.EPHEMERAL
422 async with client:
423 connect_error = await client.api_healthcheck()
424 if connect_error is not None:
425 return ConnectionStatus.SERVER_ERROR
426 else:
427 return ConnectionStatus.SERVER_CONNECTED
428 except Exception:
429 return ConnectionStatus.SERVER_ERROR
430 except (httpx.ConnectError, httpx.UnsupportedProtocol):
431 return ConnectionStatus.INVALID_API