Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/profile.py: 16%

213 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Command line interface for working with profiles. 

3""" 

4 

5import os 1a

6import shutil 1a

7import textwrap 1a

8from typing import Optional 1a

9 

10import httpx 1a

11import orjson 1a

12import typer 1a

13from rich.progress import Progress, SpinnerColumn, TextColumn 1a

14from rich.table import Table 1a

15 

16import prefect.context 1a

17import prefect.settings 1a

18import prefect.settings.profiles 1a

19from prefect.cli._types import PrefectTyper 1a

20from prefect.cli._utilities import exit_with_error, exit_with_success 1a

21from prefect.cli.cloud import CloudUnauthorizedError, get_cloud_client 1a

22from prefect.cli.root import app, is_interactive 1a

23from prefect.client.base import determine_server_type 1a

24from prefect.client.orchestration import ServerType, get_client 1a

25from prefect.context import use_profile 1a

26from prefect.settings import ProfilesCollection 1a

27from prefect.utilities.collections import AutoEnum 1a

28 

29profile_app: PrefectTyper = PrefectTyper( 1a

30 name="profile", help="Select and manage Prefect profiles." 

31) 

32app.add_typer(profile_app, aliases=["profiles"]) 1a

33 

34_OLD_MINIMAL_DEFAULT_PROFILE_CONTENT: str = """active = "default" 1a

35 

36[profiles.default]""" 

37 

38 

39@profile_app.command() 1a

40def ls(): 1a

41 """ 

42 List profile names. 

43 """ 

44 profiles = prefect.settings.load_profiles(include_defaults=False) 

45 current_profile = prefect.context.get_settings_context().profile 

46 current_name = current_profile.name if current_profile is not None else None 

47 

48 table = Table(caption="* active profile") 

49 table.add_column( 

50 "[#024dfd]Available Profiles:", justify="right", style="#8ea0ae", no_wrap=True 

51 ) 

52 

53 for name in profiles: 

54 if name == current_name: 

55 table.add_row(f"[green] * {name}[/green]") 

56 else: 

57 table.add_row(f" {name}") 

58 app.console.print(table) 

59 

60 

61@profile_app.command() 1a

62def create( 1a

63 name: str, 

64 from_name: str = typer.Option(None, "--from", help="Copy an existing profile."), 

65): 

66 """ 

67 Create a new profile. 

68 """ 

69 

70 profiles = prefect.settings.load_profiles(include_defaults=False) 

71 if name in profiles: 

72 app.console.print( 

73 textwrap.dedent( 

74 f""" 

75 [red]Profile {name!r} already exists.[/red] 

76 To create a new profile, remove the existing profile first: 

77 

78 prefect profile delete {name!r} 

79 """ 

80 ).strip() 

81 ) 

82 raise typer.Exit(1) 

83 

84 if from_name: 

85 if from_name not in profiles: 

86 exit_with_error(f"Profile {from_name!r} not found.") 

87 

88 # Create a copy of the profile with a new name and add to the collection 

89 profiles.add_profile(profiles[from_name].model_copy(update={"name": name})) 

90 else: 

91 profiles.add_profile(prefect.settings.Profile(name=name, settings={})) 

92 

93 prefect.settings.save_profiles(profiles) 

94 

95 app.console.print( 

96 textwrap.dedent( 

97 f""" 

98 Created profile with properties: 

99 name - {name!r} 

100 from name - {from_name or None} 

101 

102 Use created profile for future, subsequent commands: 

103 prefect profile use {name!r} 

104 

105 Use created profile temporarily for a single command: 

106 prefect -p {name!r} config view 

107 """ 

108 ) 

109 ) 

110 

111 

112@profile_app.command() 1a

113async def use(name: str): 1a

114 """ 

115 Set the given profile to active. 

116 """ 

117 status_messages = { 

118 ConnectionStatus.CLOUD_CONNECTED: ( 

119 exit_with_success, 

120 f"Connected to Prefect Cloud using profile {name!r}", 

121 ), 

122 ConnectionStatus.CLOUD_ERROR: ( 

123 exit_with_error, 

124 f"Error connecting to Prefect Cloud using profile {name!r}", 

125 ), 

126 ConnectionStatus.CLOUD_UNAUTHORIZED: ( 

127 exit_with_error, 

128 f"Error authenticating with Prefect Cloud using profile {name!r}", 

129 ), 

130 ConnectionStatus.SERVER_CONNECTED: ( 

131 exit_with_success, 

132 f"Connected to Prefect server using profile {name!r}", 

133 ), 

134 ConnectionStatus.SERVER_ERROR: ( 

135 exit_with_error, 

136 f"Error connecting to Prefect server using profile {name!r}", 

137 ), 

138 ConnectionStatus.EPHEMERAL: ( 

139 exit_with_success, 

140 ( 

141 f"No Prefect server specified using profile {name!r} - the API will run" 

142 " in ephemeral mode." 

143 ), 

144 ), 

145 ConnectionStatus.UNCONFIGURED: ( 

146 exit_with_error, 

147 ( 

148 f"Prefect server URL not configured using profile {name!r} - please" 

149 " configure the server URL or enable ephemeral mode." 

150 ), 

151 ), 

152 ConnectionStatus.INVALID_API: ( 

153 exit_with_error, 

154 "Error connecting to Prefect API URL", 

155 ), 

156 } 

157 

158 profiles = prefect.settings.load_profiles() 

159 if name not in profiles.names: 

160 exit_with_error(f"Profile {name!r} not found.") 

161 

162 profiles.set_active(name) 

163 prefect.settings.save_profiles(profiles) 

164 

165 with Progress( 

166 SpinnerColumn(), 

167 TextColumn("[progress.description]{task.description}"), 

168 transient=False, 

169 ) as progress: 

170 progress.add_task( 

171 description="Checking API connectivity...", 

172 total=None, 

173 ) 

174 

175 with use_profile(name, include_current_context=False): 

176 connection_status = await check_server_connection() 

177 

178 exit_method, msg = status_messages[connection_status] 

179 

180 exit_method(msg) 

181 

182 

183@profile_app.command() 1a

184def delete(name: str): 1a

185 """ 

186 Delete the given profile. 

187 """ 

188 profiles = prefect.settings.load_profiles() 

189 if name not in profiles: 

190 exit_with_error(f"Profile {name!r} not found.") 

191 

192 current_profile = prefect.context.get_settings_context().profile 

193 if current_profile.name == name: 

194 exit_with_error( 

195 f"Profile {name!r} is the active profile. You must switch profiles before" 

196 " it can be deleted." 

197 ) 

198 if is_interactive() and not typer.confirm( 

199 (f"Are you sure you want to delete profile with name {name!r}?"), 

200 default=False, 

201 ): 

202 exit_with_error("Deletion aborted.") 

203 profiles.remove_profile(name) 

204 

205 prefect.settings.save_profiles(profiles) 

206 exit_with_success(f"Removed profile {name!r}.") 

207 

208 

209@profile_app.command() 1a

210def rename(name: str, new_name: str): 1a

211 """ 

212 Change the name of a profile. 

213 """ 

214 profiles = prefect.settings.load_profiles(include_defaults=False) 

215 if name not in profiles: 

216 exit_with_error(f"Profile {name!r} not found.") 

217 

218 if new_name in profiles: 

219 exit_with_error(f"Profile {new_name!r} already exists.") 

220 

221 profiles.add_profile(profiles[name].model_copy(update={"name": new_name})) 

222 profiles.remove_profile(name) 

223 

224 # If the active profile was renamed switch the active profile to the new name. 

225 prefect.context.get_settings_context().profile 

226 if profiles.active_name == name: 

227 profiles.set_active(new_name) 

228 if os.environ.get("PREFECT_PROFILE") == name: 

229 app.console.print( 

230 f"You have set your current profile to {name!r} with the " 

231 "PREFECT_PROFILE environment variable. You must update this variable to " 

232 f"{new_name!r} to continue using the profile." 

233 ) 

234 

235 prefect.settings.save_profiles(profiles) 

236 exit_with_success(f"Renamed profile {name!r} to {new_name!r}.") 

237 

238 

239@profile_app.command() 1a

240def inspect( 1a

241 name: Optional[str] = typer.Argument( 

242 None, help="Name of profile to inspect; defaults to active profile." 

243 ), 

244 output: Optional[str] = typer.Option( 

245 None, 

246 "--output", 

247 "-o", 

248 help="Specify an output format. Currently supports: json", 

249 ), 

250): 

251 """ 

252 Display settings from a given profile; defaults to active. 

253 """ 

254 if output and output.lower() != "json": 

255 exit_with_error("Only 'json' output format is supported.") 

256 

257 profiles = prefect.settings.load_profiles() 

258 if name is None: 

259 current_profile = prefect.context.get_settings_context().profile 

260 if not current_profile: 

261 exit_with_error("No active profile set - please provide a name to inspect.") 

262 name = current_profile.name 

263 print(f"No name provided, defaulting to {name!r}") 

264 if name not in profiles: 

265 exit_with_error(f"Profile {name!r} not found.") 

266 

267 if not profiles[name].settings: 

268 # TODO: Consider instructing on how to add settings. 

269 if output and output.lower() == "json": 

270 app.console.print("{}") 

271 else: 

272 print(f"Profile {name!r} is empty.") 

273 return 

274 

275 if output and output.lower() == "json": 

276 profile_data = { 

277 setting.name: value for setting, value in profiles[name].settings.items() 

278 } 

279 json_output = orjson.dumps(profile_data, option=orjson.OPT_INDENT_2).decode() 

280 app.console.print(json_output) 

281 else: 

282 for setting, value in profiles[name].settings.items(): 

283 app.console.print(f"{setting.name}='{value}'") 

284 

285 

286def show_profile_changes( 1a

287 user_profiles: ProfilesCollection, default_profiles: ProfilesCollection 

288) -> bool: 

289 changes: list[tuple[str, str]] = [] 

290 

291 for name in default_profiles.names: 

292 if name not in user_profiles: 

293 changes.append(("add", name)) 

294 

295 if not changes: 

296 app.console.print( 

297 "[green]No changes needed. All profiles are up to date.[/green]" 

298 ) 

299 return False 

300 

301 app.console.print("\n[bold cyan]Proposed Changes:[/bold cyan]") 

302 for change in changes: 

303 if change[0] == "migrate": 

304 app.console.print( 

305 f" [yellow]•[/yellow] Migrate '{change[1]}' to '{change[2]}'" 

306 ) 

307 elif change[0] == "add": 

308 app.console.print(f" [blue]•[/blue] Add '{change[1]}'") 

309 

310 return True 

311 

312 

313@profile_app.command() 1a

314def populate_defaults(): 1a

315 """Populate the profiles configuration with default base profiles, preserving existing user profiles.""" 

316 from prefect.settings.profiles import ( 

317 _read_profiles_from, # type: ignore[reportPrivateUsage] 

318 _write_profiles_to, # type: ignore[reportPrivateUsage] 

319 ) 

320 

321 user_path = prefect.settings.PREFECT_PROFILES_PATH.value() 

322 default_profiles = _read_profiles_from(prefect.settings.DEFAULT_PROFILES_PATH) 

323 

324 if user_path.exists(): 

325 user_profiles = _read_profiles_from(user_path) 

326 

327 if not show_profile_changes(user_profiles, default_profiles): 

328 return 

329 

330 # Backup prompt 

331 if typer.confirm(f"\nBack up existing profiles to {user_path}.bak?"): 

332 shutil.copy(user_path, f"{user_path}.bak") 

333 app.console.print(f"Profiles backed up to {user_path}.bak") 

334 else: 

335 user_profiles = ProfilesCollection([]) 

336 app.console.print( 

337 "\n[bold]Creating new profiles file with default profiles.[/bold]" 

338 ) 

339 show_profile_changes(user_profiles, default_profiles) 

340 

341 # Update prompt 

342 if not typer.confirm(f"\nUpdate profiles at {user_path}?"): 

343 app.console.print("Operation cancelled.") 

344 return 

345 

346 for name, profile in default_profiles.items(): 

347 if name not in user_profiles: 

348 user_profiles.add_profile(profile) 

349 

350 _write_profiles_to(user_path, user_profiles) 

351 app.console.print(f"\nProfiles updated in [green]{user_path}[/green]") 

352 app.console.print( 

353 "\nUse with [green]prefect profile use[/green] [blue][PROFILE-NAME][/blue]" 

354 ) 

355 app.console.print("\nAvailable profiles:") 

356 for name in user_profiles.names: 

357 app.console.print(f" - {name}") 

358 

359 

360class ConnectionStatus(AutoEnum): 1a

361 CLOUD_CONNECTED = AutoEnum.auto() 1a

362 CLOUD_ERROR = AutoEnum.auto() 1a

363 CLOUD_UNAUTHORIZED = AutoEnum.auto() 1a

364 SERVER_CONNECTED = AutoEnum.auto() 1a

365 SERVER_ERROR = AutoEnum.auto() 1a

366 UNCONFIGURED = AutoEnum.auto() 1a

367 EPHEMERAL = AutoEnum.auto() 1a

368 INVALID_API = AutoEnum.auto() 1a

369 

370 

371async def check_server_connection() -> ConnectionStatus: 1a

372 httpx_settings = dict(timeout=3) 

373 try: 

374 # First determine the server type based on the URL 

375 server_type = determine_server_type() 

376 

377 # Only try to connect to Cloud if the URL looks like a Cloud URL 

378 if server_type == ServerType.CLOUD: 

379 try: 

380 cloud_client = get_cloud_client( 

381 httpx_settings=httpx_settings, infer_cloud_url=True 

382 ) 

383 async with cloud_client: 

384 await cloud_client.api_healthcheck() 

385 return ConnectionStatus.CLOUD_CONNECTED 

386 except CloudUnauthorizedError: 

387 # if the Cloud API exists and fails to authenticate, notify the user 

388 return ConnectionStatus.CLOUD_UNAUTHORIZED 

389 except (httpx.HTTPStatusError, Exception): 

390 return ConnectionStatus.CLOUD_ERROR 

391 

392 # For non-Cloud URLs, try to connect as a hosted Prefect instance 

393 if server_type == ServerType.EPHEMERAL: 

394 return ConnectionStatus.EPHEMERAL 

395 elif server_type == ServerType.UNCONFIGURED: 

396 return ConnectionStatus.UNCONFIGURED 

397 

398 # Try to connect to the server 

399 try: 

400 client = get_client(httpx_settings=httpx_settings) 

401 async with client: 

402 connect_error = await client.api_healthcheck() 

403 if connect_error is not None: 

404 return ConnectionStatus.SERVER_ERROR 

405 else: 

406 return ConnectionStatus.SERVER_CONNECTED 

407 except Exception: 

408 return ConnectionStatus.SERVER_ERROR 

409 except TypeError: 

410 # if no Prefect API URL has been set, httpx will throw a TypeError 

411 try: 

412 # try to connect with the client anyway, it will likely use an 

413 # ephemeral Prefect instance 

414 server_type = determine_server_type() 

415 if server_type == ServerType.EPHEMERAL: 

416 return ConnectionStatus.EPHEMERAL 

417 elif server_type == ServerType.UNCONFIGURED: 

418 return ConnectionStatus.UNCONFIGURED 

419 client = get_client(httpx_settings=httpx_settings) 

420 if client.server_type == ServerType.EPHEMERAL: 

421 return ConnectionStatus.EPHEMERAL 

422 async with client: 

423 connect_error = await client.api_healthcheck() 

424 if connect_error is not None: 

425 return ConnectionStatus.SERVER_ERROR 

426 else: 

427 return ConnectionStatus.SERVER_CONNECTED 

428 except Exception: 

429 return ConnectionStatus.SERVER_ERROR 

430 except (httpx.ConnectError, httpx.UnsupportedProtocol): 

431 return ConnectionStatus.INVALID_API