Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_config.py: 7%

236 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3import json 1a

4import re 1a

5from copy import deepcopy 1a

6from pathlib import Path 1a

7from typing import Any, Optional 1a

8 

9import yaml 1a

10from pydantic import ValidationError 1a

11from yaml.error import YAMLError 1a

12 

13import prefect.cli.root as root 1a

14from prefect.cli.root import app 1a

15from prefect.utilities.annotations import NotSet 1a

16 

17from ._models import PrefectYamlModel 1a

18 

19 

20def _format_validation_error(exc: ValidationError, raw_data: dict[str, Any]) -> str: 1a

21 """Format Pydantic validation errors into user-friendly messages.""" 

22 deployment_errors: dict[str, set[str]] = {} 

23 top_level_errors: list[tuple[str, str]] = [] 

24 

25 for error in exc.errors(): 

26 loc = error.get("loc", ()) 

27 msg = error.get("msg", "Invalid value") 

28 

29 # Handle deployment-level errors 

30 if len(loc) >= 2 and loc[0] == "deployments" and isinstance(loc[1], int): 

31 idx = loc[1] 

32 deployments = raw_data.get("deployments", []) 

33 name = ( 

34 deployments[idx].get("name", f"#{idx}") 

35 if idx < len(deployments) 

36 else f"#{idx}" 

37 ) 

38 

39 # Get field path (only include string field names, not indices or type names) 

40 field_parts = [] 

41 for part in loc[2:]: 

42 if isinstance(part, str) and not part.startswith("function-"): 

43 # Assume lowercase names are field names, not type names 

44 if part[0].islower(): 

45 field_parts.append(part) 

46 

47 if field_parts: 

48 field = field_parts[0] # Just use the top-level field 

49 if name not in deployment_errors: 

50 deployment_errors[name] = set() 

51 deployment_errors[name].add(field) 

52 # Handle top-level field errors (prefect-version, name, build, push, pull, etc.) 

53 elif len(loc) >= 1 and isinstance(loc[0], str): 

54 field_name = loc[0] 

55 top_level_errors.append((field_name, msg)) 

56 

57 if not deployment_errors and not top_level_errors: 

58 return "Validation error in config file" 

59 

60 lines = [] 

61 

62 # Format top-level errors 

63 if top_level_errors: 

64 lines.append("Invalid top-level fields in config file:\n") 

65 for field_name, msg in top_level_errors: 

66 lines.append(f"{field_name}: {msg}") 

67 lines.append( 

68 "\nFor valid prefect.yaml fields, see: https://docs.prefect.io/v3/how-to-guides/deployments/prefect-yaml" 

69 ) 

70 

71 # Format deployment-level errors 

72 if deployment_errors: 

73 if top_level_errors: 

74 lines.append("") # blank line separator 

75 lines.append("Invalid fields in deployments:\n") 

76 for name, fields in sorted(deployment_errors.items()): 

77 lines.append(f"{name}: {', '.join(sorted(fields))}") 

78 lines.append( 

79 "\nFor valid deployment fields and examples, go to: https://docs.prefect.io/v3/concepts/deployments#deployment-schema" 

80 ) 

81 

82 return "\n".join(lines) 

83 

84 

85def _merge_with_default_deploy_config(deploy_config: dict[str, Any]) -> dict[str, Any]: 1a

86 deploy_config = deepcopy(deploy_config) 

87 DEFAULT_DEPLOY_CONFIG: dict[str, Any] = { 

88 "name": None, 

89 "version": None, 

90 "tags": [], 

91 "concurrency_limit": None, 

92 "description": None, 

93 "flow_name": None, 

94 "entrypoint": None, 

95 "parameters": {}, 

96 "work_pool": { 

97 "name": None, 

98 "work_queue_name": None, 

99 "job_variables": {}, 

100 }, 

101 } 

102 

103 for key, value in DEFAULT_DEPLOY_CONFIG.items(): 

104 if key not in deploy_config: 

105 deploy_config[key] = value 

106 if isinstance(value, dict): 

107 for k, v in value.items(): 

108 if k not in deploy_config[key]: 

109 deploy_config[key][k] = v 

110 

111 return deploy_config 

112 

113 

114def _load_deploy_configs_and_actions( 1a

115 prefect_file: Path, 

116) -> tuple[list[dict[str, Any]], dict[str, Any]]: 

117 """ 

118 Load and validate a prefect.yaml using Pydantic models. 

119 

120 Returns a tuple of (deployment_configs, actions_dict) where deployments are 

121 dictionaries compatible with existing CLI code and actions contains 

122 top-level build/push/pull lists from the file. 

123 """ 

124 raw: dict[str, Any] = {} 

125 try: 

126 with prefect_file.open("r") as f: 

127 loaded = yaml.safe_load(f) 

128 except (FileNotFoundError, IsADirectoryError, YAMLError) as exc: 

129 app.console.print( 

130 f"Unable to read the specified config file. Reason: {exc}. Skipping.", 

131 style="yellow", 

132 ) 

133 loaded = {} 

134 

135 if isinstance(loaded, dict): 

136 raw = loaded 

137 else: 

138 app.console.print( 

139 "Unable to parse the specified config file. Skipping.", 

140 style="yellow", 

141 ) 

142 

143 try: 

144 model = PrefectYamlModel.model_validate(raw) 

145 except ValidationError as exc: 

146 # Format and display validation errors 

147 error_message = _format_validation_error(exc, raw) 

148 app.console.print(error_message, style="yellow") 

149 app.console.print( 

150 "\nSkipping deployment configuration due to validation errors.", 

151 style="yellow", 

152 ) 

153 model = PrefectYamlModel() 

154 

155 actions: dict[str, Any] = { 

156 "build": model.build or [], 

157 "push": model.push or [], 

158 "pull": model.pull or [], 

159 } 

160 # Convert Pydantic models to plain dicts for downstream consumption, 

161 # excluding keys that were not provided by users to preserve legacy semantics 

162 deploy_configs: list[dict[str, Any]] = [ 

163 d.model_dump(exclude_unset=True, mode="json") for d in model.deployments 

164 ] 

165 return deploy_configs, actions 

166 

167 

168def _extract_variable(variable: str) -> dict[str, Any]: 1a

169 """ 

170 Extracts a variable from a string. Variables can be in the format 

171 key=value or a JSON object. 

172 """ 

173 try: 

174 key, value = variable.split("=", 1) 

175 except ValueError: 

176 pass 

177 else: 

178 return {key: value} 

179 

180 try: 

181 # Only key=value strings and JSON objexcts are valid inputs for 

182 # variables, not arrays or strings, so we attempt to convert the parsed 

183 # object to a dict. 

184 return dict(json.loads(variable)) 

185 except (ValueError, TypeError) as e: 

186 raise ValueError( 

187 f'Could not parse variable: "{variable}". Please ensure variables are' 

188 " either in the format `key=value` or are strings containing a valid JSON" 

189 " object." 

190 ) from e 

191 

192 

193def _apply_cli_options_to_deploy_config( 1a

194 deploy_config: dict[str, Any], cli_options: dict[str, Any] 

195) -> dict[str, Any]: 

196 """ 

197 Applies CLI options to a deploy config. CLI options take 

198 precedence over values in the deploy config. 

199 

200 Args: 

201 deploy_config: A deploy config 

202 cli_options: A dictionary of CLI options 

203 

204 Returns: 

205 Dict: a deploy config with CLI options applied 

206 """ 

207 deploy_config = deepcopy(deploy_config) 

208 

209 # verification 

210 if cli_options.get("param") and (cli_options.get("params") is not None): 

211 raise ValueError("Can only pass one of `param` or `params` options") 

212 

213 # If there's more than one name, we can't set the name of the deploy config. 

214 # The user will be prompted if running in interactive mode. 

215 if len(cli_options.get("names", [])) == 1: 

216 deploy_config["name"] = cli_options["names"][0] 

217 

218 variable_overrides: dict[str, Any] = {} 

219 for cli_option, cli_value in cli_options.items(): 

220 if ( 

221 cli_option 

222 in [ 

223 "description", 

224 "entrypoint", 

225 "version", 

226 "tags", 

227 "concurrency_limit", 

228 "flow_name", 

229 "enforce_parameter_schema", 

230 ] 

231 and cli_value is not None 

232 ): 

233 deploy_config[cli_option] = cli_value 

234 

235 elif ( 

236 cli_option in ["work_pool_name", "work_queue_name", "variables"] 

237 and cli_value 

238 ): 

239 if not isinstance(deploy_config.get("work_pool"), dict): 

240 deploy_config["work_pool"] = {} 

241 if cli_option == "work_pool_name": 

242 deploy_config["work_pool"]["name"] = cli_value 

243 elif cli_option == "variables": 

244 for variable in cli_value or []: 

245 variable_overrides.update(**_extract_variable(variable)) 

246 if not isinstance(deploy_config["work_pool"].get("variables"), dict): 

247 deploy_config["work_pool"]["job_variables"] = {} 

248 deploy_config["work_pool"]["job_variables"].update(variable_overrides) 

249 else: 

250 deploy_config["work_pool"][cli_option] = cli_value 

251 

252 elif cli_option in ["cron", "interval", "rrule"] and cli_value: 

253 if not isinstance(deploy_config.get("schedules"), list): 

254 deploy_config["schedules"] = [] 

255 

256 for value in cli_value: 

257 deploy_config["schedules"].append({cli_option: value}) 

258 

259 elif cli_option in ["param", "params"] and cli_value: 

260 parameters: dict[str, Any] = {} 

261 if cli_option == "param": 

262 for p in cli_value or []: 

263 k, unparsed_value = p.split("=", 1) 

264 try: 

265 v = json.loads(unparsed_value) 

266 app.console.print( 

267 f"The parameter value {unparsed_value} is parsed as a JSON" 

268 " string" 

269 ) 

270 except json.JSONDecodeError: 

271 v = unparsed_value 

272 parameters[k] = v 

273 

274 if cli_option == "params" and cli_value is not None: 

275 parameters = json.loads(cli_value) 

276 

277 if not isinstance(deploy_config.get("parameters"), dict): 

278 deploy_config["parameters"] = {} 

279 deploy_config["parameters"].update(parameters) 

280 

281 anchor_date = cli_options.get("anchor_date") 

282 timezone = cli_options.get("timezone") 

283 

284 # Apply anchor_date and timezone to new and existing schedules 

285 for schedule_config in deploy_config.get("schedules") or []: 

286 if anchor_date and schedule_config.get("interval"): 

287 schedule_config["anchor_date"] = anchor_date 

288 if timezone: 

289 schedule_config["timezone"] = timezone 

290 

291 return deploy_config, variable_overrides 

292 

293 

294def _handle_pick_deploy_without_name( 1a

295 deploy_configs: list[dict[str, Any]], 

296) -> list[dict[str, Any]]: 

297 from prefect.cli._prompts import prompt_select_from_table 

298 

299 selectable_deploy_configs = [ 

300 deploy_config for deploy_config in deploy_configs if deploy_config.get("name") 

301 ] 

302 if not selectable_deploy_configs: 

303 return [] 

304 selected_deploy_config = prompt_select_from_table( 

305 app.console, 

306 "Would you like to use an existing deployment configuration?", 

307 [ 

308 {"header": "Name", "key": "name"}, 

309 {"header": "Entrypoint", "key": "entrypoint"}, 

310 {"header": "Description", "key": "description"}, 

311 ], 

312 selectable_deploy_configs, 

313 opt_out_message="No, configure a new deployment", 

314 opt_out_response=None, 

315 ) 

316 return [selected_deploy_config] if selected_deploy_config else [] 

317 

318 

319def _log_missing_deployment_names(missing_names, matched_deploy_configs, names): 1a

320 if missing_names: 

321 app.console.print( 

322 ( 

323 "The following deployment(s) could not be found and will not be" 

324 f" deployed: {', '.join(list(sorted(missing_names)))}" 

325 ), 

326 style="yellow", 

327 ) 

328 if not matched_deploy_configs: 

329 app.console.print( 

330 ( 

331 "Could not find any deployment configurations with the given" 

332 f" name(s): {', '.join(names)}. Your flow will be deployed with a" 

333 " new deployment configuration." 

334 ), 

335 style="yellow", 

336 ) 

337 

338 

339def _filter_matching_deploy_config( 1a

340 name: str, deploy_configs: list[dict[str, Any]] 

341) -> list[dict[str, Any]]: 

342 matching_deployments: list[dict[str, Any]] = [] 

343 if "/" in name: 

344 flow_name, deployment_name = name.split("/") 

345 flow_name = flow_name.replace("-", "_") 

346 matching_deployments = [ 

347 deploy_config 

348 for deploy_config in deploy_configs 

349 if deploy_config.get("name") == deployment_name 

350 and deploy_config.get("entrypoint", "").split(":")[-1] == flow_name 

351 ] 

352 else: 

353 matching_deployments = [ 

354 deploy_config 

355 for deploy_config in deploy_configs 

356 if deploy_config.get("name") == name 

357 ] 

358 return matching_deployments 

359 

360 

361def _parse_name_from_pattern( 1a

362 deploy_configs: list[dict[str, Any]], name_pattern: str 

363) -> list[str]: 

364 parsed_names: list[str] = [] 

365 name_pattern = re.escape(name_pattern).replace(r"\*", ".*") 

366 

367 if "/" in name_pattern: 

368 flow_name, deploy_name = name_pattern.split("/", 1) 

369 flow_name = ( 

370 re.compile(flow_name.replace("*", ".*")) 

371 if "*" in flow_name 

372 else re.compile(flow_name) 

373 ) 

374 deploy_name = ( 

375 re.compile(deploy_name.replace("*", ".*")) 

376 if "*" in deploy_name 

377 else re.compile(deploy_name) 

378 ) 

379 else: 

380 flow_name = None 

381 deploy_name = re.compile(name_pattern.replace("*", ".*")) 

382 

383 for deploy_config in deploy_configs: 

384 if not deploy_config.get("entrypoint"): 

385 continue 

386 entrypoint = deploy_config.get("entrypoint").split(":")[-1].replace("_", "-") 

387 deployment_name = deploy_config.get("name") 

388 flow_match = flow_name.fullmatch(entrypoint) if flow_name else True 

389 deploy_match = deploy_name.fullmatch(deployment_name) 

390 if flow_match and deploy_match: 

391 parsed_names.append(deployment_name) 

392 

393 return parsed_names 

394 

395 

396def _handle_pick_deploy_with_name( 1a

397 deploy_configs: list[dict[str, Any]], 

398 names: list[str], 

399) -> list[dict[str, Any]]: 

400 from prefect.cli._prompts import prompt_select_from_table 

401 

402 matched_deploy_configs: list[dict[str, Any]] = [] 

403 deployment_names: list[str] = [] 

404 for name in names: 

405 matching_deployments = _filter_matching_deploy_config(name, deploy_configs) 

406 

407 if len(matching_deployments) > 1 and root.is_interactive(): 

408 user_selected_matching_deployment = prompt_select_from_table( 

409 app.console, 

410 ( 

411 "Found multiple deployment configurations with the name" 

412 f" [yellow]{name}[/yellow]. Please select the one you would" 

413 " like to deploy:" 

414 ), 

415 [ 

416 {"header": "Name", "key": "name"}, 

417 {"header": "Entrypoint", "key": "entrypoint"}, 

418 {"header": "Description", "key": "description"}, 

419 ], 

420 matching_deployments, 

421 ) 

422 matched_deploy_configs.append(user_selected_matching_deployment) 

423 elif matching_deployments: 

424 matched_deploy_configs.extend(matching_deployments) 

425 

426 deployment_names.append(name.split("/")[-1]) 

427 

428 unfound_names = set(deployment_names) - { 

429 deploy_config.get("name") for deploy_config in matched_deploy_configs 

430 } 

431 _log_missing_deployment_names(unfound_names, matched_deploy_configs, names) 

432 

433 return matched_deploy_configs 

434 

435 

436def _pick_deploy_configs( 1a

437 deploy_configs: list[dict[str, Any]], 

438 names: Optional[list[str]] = None, 

439 deploy_all: bool = False, 

440) -> list[dict[str, Any]]: 

441 names = names or [] 

442 

443 if deploy_all and names: 

444 raise ValueError( 

445 "Cannot use both `--all` and `--name` at the same time. Use only one." 

446 ) 

447 

448 if not deploy_configs: 

449 if not root.is_interactive(): 

450 return [ 

451 _merge_with_default_deploy_config({}), 

452 ] 

453 selected_deploy_config = _handle_pick_deploy_without_name(deploy_configs) 

454 if not selected_deploy_config: 

455 return [ 

456 _merge_with_default_deploy_config({}), 

457 ] 

458 return selected_deploy_config 

459 

460 # Original behavior (pre-refactor): in non-interactive mode, if there is 

461 # exactly one deploy config and at most one name provided, proceed with the 

462 # single deploy config even if the provided name does not match. This allows 

463 # users/tests to override the name via CLI while still inheriting templated 

464 # fields (e.g., version, tags, description) from the config. 

465 if (not root.is_interactive()) and len(deploy_configs) == 1 and len(names) <= 1: 

466 return [ 

467 _merge_with_default_deploy_config(deploy_configs[0]), 

468 ] 

469 

470 if not names and not deploy_all: 

471 if not root.is_interactive(): 

472 if len(deploy_configs) == 1: 

473 return [ 

474 _merge_with_default_deploy_config(deploy_configs[0]), 

475 ] 

476 # Mirror original behavior: error when multiple configs present and no 

477 # explicit name provided in non-interactive mode. 

478 raise ValueError( 

479 "Discovered one or more deployment configurations, but no name was" 

480 " given. Please specify the name of at least one deployment to" 

481 " create or update." 

482 ) 

483 selected_deploy_config = _handle_pick_deploy_without_name(deploy_configs) 

484 if not selected_deploy_config: 

485 return [ 

486 _merge_with_default_deploy_config({}), 

487 ] 

488 return selected_deploy_config 

489 

490 if names: 

491 matched_deploy_configs = _handle_pick_deploy_with_name(deploy_configs, names) 

492 return matched_deploy_configs 

493 

494 if deploy_all: 

495 return [ 

496 _merge_with_default_deploy_config(deploy_config) 

497 for deploy_config in deploy_configs 

498 ] 

499 

500 raise ValueError("Invalid selection. Please try again.") 

501 

502 

503def _handle_deprecated_schedule_fields(deploy_config: dict[str, Any]): 1a

504 deploy_config = deepcopy(deploy_config) 

505 

506 legacy_schedule = deploy_config.get("schedule", NotSet) 

507 schedule_configs = deploy_config.get("schedules", NotSet) 

508 

509 if ( 

510 legacy_schedule 

511 and legacy_schedule is not NotSet 

512 and schedule_configs is not NotSet 

513 ): 

514 raise ValueError( 

515 "Both 'schedule' and 'schedules' keys are present in the deployment" 

516 " configuration. Please use only use `schedules`." 

517 ) 

518 

519 if legacy_schedule and isinstance(legacy_schedule, dict): 

520 deploy_config["schedules"] = [deploy_config["schedule"]] 

521 

522 return deploy_config