Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_config.py: 7%
236 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import json 1a
4import re 1a
5from copy import deepcopy 1a
6from pathlib import Path 1a
7from typing import Any, Optional 1a
9import yaml 1a
10from pydantic import ValidationError 1a
11from yaml.error import YAMLError 1a
13import prefect.cli.root as root 1a
14from prefect.cli.root import app 1a
15from prefect.utilities.annotations import NotSet 1a
17from ._models import PrefectYamlModel 1a
20def _format_validation_error(exc: ValidationError, raw_data: dict[str, Any]) -> str: 1a
21 """Format Pydantic validation errors into user-friendly messages."""
22 deployment_errors: dict[str, set[str]] = {}
23 top_level_errors: list[tuple[str, str]] = []
25 for error in exc.errors():
26 loc = error.get("loc", ())
27 msg = error.get("msg", "Invalid value")
29 # Handle deployment-level errors
30 if len(loc) >= 2 and loc[0] == "deployments" and isinstance(loc[1], int):
31 idx = loc[1]
32 deployments = raw_data.get("deployments", [])
33 name = (
34 deployments[idx].get("name", f"#{idx}")
35 if idx < len(deployments)
36 else f"#{idx}"
37 )
39 # Get field path (only include string field names, not indices or type names)
40 field_parts = []
41 for part in loc[2:]:
42 if isinstance(part, str) and not part.startswith("function-"):
43 # Assume lowercase names are field names, not type names
44 if part[0].islower():
45 field_parts.append(part)
47 if field_parts:
48 field = field_parts[0] # Just use the top-level field
49 if name not in deployment_errors:
50 deployment_errors[name] = set()
51 deployment_errors[name].add(field)
52 # Handle top-level field errors (prefect-version, name, build, push, pull, etc.)
53 elif len(loc) >= 1 and isinstance(loc[0], str):
54 field_name = loc[0]
55 top_level_errors.append((field_name, msg))
57 if not deployment_errors and not top_level_errors:
58 return "Validation error in config file"
60 lines = []
62 # Format top-level errors
63 if top_level_errors:
64 lines.append("Invalid top-level fields in config file:\n")
65 for field_name, msg in top_level_errors:
66 lines.append(f" • {field_name}: {msg}")
67 lines.append(
68 "\nFor valid prefect.yaml fields, see: https://docs.prefect.io/v3/how-to-guides/deployments/prefect-yaml"
69 )
71 # Format deployment-level errors
72 if deployment_errors:
73 if top_level_errors:
74 lines.append("") # blank line separator
75 lines.append("Invalid fields in deployments:\n")
76 for name, fields in sorted(deployment_errors.items()):
77 lines.append(f" • {name}: {', '.join(sorted(fields))}")
78 lines.append(
79 "\nFor valid deployment fields and examples, go to: https://docs.prefect.io/v3/concepts/deployments#deployment-schema"
80 )
82 return "\n".join(lines)
85def _merge_with_default_deploy_config(deploy_config: dict[str, Any]) -> dict[str, Any]: 1a
86 deploy_config = deepcopy(deploy_config)
87 DEFAULT_DEPLOY_CONFIG: dict[str, Any] = {
88 "name": None,
89 "version": None,
90 "tags": [],
91 "concurrency_limit": None,
92 "description": None,
93 "flow_name": None,
94 "entrypoint": None,
95 "parameters": {},
96 "work_pool": {
97 "name": None,
98 "work_queue_name": None,
99 "job_variables": {},
100 },
101 }
103 for key, value in DEFAULT_DEPLOY_CONFIG.items():
104 if key not in deploy_config:
105 deploy_config[key] = value
106 if isinstance(value, dict):
107 for k, v in value.items():
108 if k not in deploy_config[key]:
109 deploy_config[key][k] = v
111 return deploy_config
114def _load_deploy_configs_and_actions( 1a
115 prefect_file: Path,
116) -> tuple[list[dict[str, Any]], dict[str, Any]]:
117 """
118 Load and validate a prefect.yaml using Pydantic models.
120 Returns a tuple of (deployment_configs, actions_dict) where deployments are
121 dictionaries compatible with existing CLI code and actions contains
122 top-level build/push/pull lists from the file.
123 """
124 raw: dict[str, Any] = {}
125 try:
126 with prefect_file.open("r") as f:
127 loaded = yaml.safe_load(f)
128 except (FileNotFoundError, IsADirectoryError, YAMLError) as exc:
129 app.console.print(
130 f"Unable to read the specified config file. Reason: {exc}. Skipping.",
131 style="yellow",
132 )
133 loaded = {}
135 if isinstance(loaded, dict):
136 raw = loaded
137 else:
138 app.console.print(
139 "Unable to parse the specified config file. Skipping.",
140 style="yellow",
141 )
143 try:
144 model = PrefectYamlModel.model_validate(raw)
145 except ValidationError as exc:
146 # Format and display validation errors
147 error_message = _format_validation_error(exc, raw)
148 app.console.print(error_message, style="yellow")
149 app.console.print(
150 "\nSkipping deployment configuration due to validation errors.",
151 style="yellow",
152 )
153 model = PrefectYamlModel()
155 actions: dict[str, Any] = {
156 "build": model.build or [],
157 "push": model.push or [],
158 "pull": model.pull or [],
159 }
160 # Convert Pydantic models to plain dicts for downstream consumption,
161 # excluding keys that were not provided by users to preserve legacy semantics
162 deploy_configs: list[dict[str, Any]] = [
163 d.model_dump(exclude_unset=True, mode="json") for d in model.deployments
164 ]
165 return deploy_configs, actions
168def _extract_variable(variable: str) -> dict[str, Any]: 1a
169 """
170 Extracts a variable from a string. Variables can be in the format
171 key=value or a JSON object.
172 """
173 try:
174 key, value = variable.split("=", 1)
175 except ValueError:
176 pass
177 else:
178 return {key: value}
180 try:
181 # Only key=value strings and JSON objexcts are valid inputs for
182 # variables, not arrays or strings, so we attempt to convert the parsed
183 # object to a dict.
184 return dict(json.loads(variable))
185 except (ValueError, TypeError) as e:
186 raise ValueError(
187 f'Could not parse variable: "{variable}". Please ensure variables are'
188 " either in the format `key=value` or are strings containing a valid JSON"
189 " object."
190 ) from e
193def _apply_cli_options_to_deploy_config( 1a
194 deploy_config: dict[str, Any], cli_options: dict[str, Any]
195) -> dict[str, Any]:
196 """
197 Applies CLI options to a deploy config. CLI options take
198 precedence over values in the deploy config.
200 Args:
201 deploy_config: A deploy config
202 cli_options: A dictionary of CLI options
204 Returns:
205 Dict: a deploy config with CLI options applied
206 """
207 deploy_config = deepcopy(deploy_config)
209 # verification
210 if cli_options.get("param") and (cli_options.get("params") is not None):
211 raise ValueError("Can only pass one of `param` or `params` options")
213 # If there's more than one name, we can't set the name of the deploy config.
214 # The user will be prompted if running in interactive mode.
215 if len(cli_options.get("names", [])) == 1:
216 deploy_config["name"] = cli_options["names"][0]
218 variable_overrides: dict[str, Any] = {}
219 for cli_option, cli_value in cli_options.items():
220 if (
221 cli_option
222 in [
223 "description",
224 "entrypoint",
225 "version",
226 "tags",
227 "concurrency_limit",
228 "flow_name",
229 "enforce_parameter_schema",
230 ]
231 and cli_value is not None
232 ):
233 deploy_config[cli_option] = cli_value
235 elif (
236 cli_option in ["work_pool_name", "work_queue_name", "variables"]
237 and cli_value
238 ):
239 if not isinstance(deploy_config.get("work_pool"), dict):
240 deploy_config["work_pool"] = {}
241 if cli_option == "work_pool_name":
242 deploy_config["work_pool"]["name"] = cli_value
243 elif cli_option == "variables":
244 for variable in cli_value or []:
245 variable_overrides.update(**_extract_variable(variable))
246 if not isinstance(deploy_config["work_pool"].get("variables"), dict):
247 deploy_config["work_pool"]["job_variables"] = {}
248 deploy_config["work_pool"]["job_variables"].update(variable_overrides)
249 else:
250 deploy_config["work_pool"][cli_option] = cli_value
252 elif cli_option in ["cron", "interval", "rrule"] and cli_value:
253 if not isinstance(deploy_config.get("schedules"), list):
254 deploy_config["schedules"] = []
256 for value in cli_value:
257 deploy_config["schedules"].append({cli_option: value})
259 elif cli_option in ["param", "params"] and cli_value:
260 parameters: dict[str, Any] = {}
261 if cli_option == "param":
262 for p in cli_value or []:
263 k, unparsed_value = p.split("=", 1)
264 try:
265 v = json.loads(unparsed_value)
266 app.console.print(
267 f"The parameter value {unparsed_value} is parsed as a JSON"
268 " string"
269 )
270 except json.JSONDecodeError:
271 v = unparsed_value
272 parameters[k] = v
274 if cli_option == "params" and cli_value is not None:
275 parameters = json.loads(cli_value)
277 if not isinstance(deploy_config.get("parameters"), dict):
278 deploy_config["parameters"] = {}
279 deploy_config["parameters"].update(parameters)
281 anchor_date = cli_options.get("anchor_date")
282 timezone = cli_options.get("timezone")
284 # Apply anchor_date and timezone to new and existing schedules
285 for schedule_config in deploy_config.get("schedules") or []:
286 if anchor_date and schedule_config.get("interval"):
287 schedule_config["anchor_date"] = anchor_date
288 if timezone:
289 schedule_config["timezone"] = timezone
291 return deploy_config, variable_overrides
294def _handle_pick_deploy_without_name( 1a
295 deploy_configs: list[dict[str, Any]],
296) -> list[dict[str, Any]]:
297 from prefect.cli._prompts import prompt_select_from_table
299 selectable_deploy_configs = [
300 deploy_config for deploy_config in deploy_configs if deploy_config.get("name")
301 ]
302 if not selectable_deploy_configs:
303 return []
304 selected_deploy_config = prompt_select_from_table(
305 app.console,
306 "Would you like to use an existing deployment configuration?",
307 [
308 {"header": "Name", "key": "name"},
309 {"header": "Entrypoint", "key": "entrypoint"},
310 {"header": "Description", "key": "description"},
311 ],
312 selectable_deploy_configs,
313 opt_out_message="No, configure a new deployment",
314 opt_out_response=None,
315 )
316 return [selected_deploy_config] if selected_deploy_config else []
319def _log_missing_deployment_names(missing_names, matched_deploy_configs, names): 1a
320 if missing_names:
321 app.console.print(
322 (
323 "The following deployment(s) could not be found and will not be"
324 f" deployed: {', '.join(list(sorted(missing_names)))}"
325 ),
326 style="yellow",
327 )
328 if not matched_deploy_configs:
329 app.console.print(
330 (
331 "Could not find any deployment configurations with the given"
332 f" name(s): {', '.join(names)}. Your flow will be deployed with a"
333 " new deployment configuration."
334 ),
335 style="yellow",
336 )
339def _filter_matching_deploy_config( 1a
340 name: str, deploy_configs: list[dict[str, Any]]
341) -> list[dict[str, Any]]:
342 matching_deployments: list[dict[str, Any]] = []
343 if "/" in name:
344 flow_name, deployment_name = name.split("/")
345 flow_name = flow_name.replace("-", "_")
346 matching_deployments = [
347 deploy_config
348 for deploy_config in deploy_configs
349 if deploy_config.get("name") == deployment_name
350 and deploy_config.get("entrypoint", "").split(":")[-1] == flow_name
351 ]
352 else:
353 matching_deployments = [
354 deploy_config
355 for deploy_config in deploy_configs
356 if deploy_config.get("name") == name
357 ]
358 return matching_deployments
361def _parse_name_from_pattern( 1a
362 deploy_configs: list[dict[str, Any]], name_pattern: str
363) -> list[str]:
364 parsed_names: list[str] = []
365 name_pattern = re.escape(name_pattern).replace(r"\*", ".*")
367 if "/" in name_pattern:
368 flow_name, deploy_name = name_pattern.split("/", 1)
369 flow_name = (
370 re.compile(flow_name.replace("*", ".*"))
371 if "*" in flow_name
372 else re.compile(flow_name)
373 )
374 deploy_name = (
375 re.compile(deploy_name.replace("*", ".*"))
376 if "*" in deploy_name
377 else re.compile(deploy_name)
378 )
379 else:
380 flow_name = None
381 deploy_name = re.compile(name_pattern.replace("*", ".*"))
383 for deploy_config in deploy_configs:
384 if not deploy_config.get("entrypoint"):
385 continue
386 entrypoint = deploy_config.get("entrypoint").split(":")[-1].replace("_", "-")
387 deployment_name = deploy_config.get("name")
388 flow_match = flow_name.fullmatch(entrypoint) if flow_name else True
389 deploy_match = deploy_name.fullmatch(deployment_name)
390 if flow_match and deploy_match:
391 parsed_names.append(deployment_name)
393 return parsed_names
396def _handle_pick_deploy_with_name( 1a
397 deploy_configs: list[dict[str, Any]],
398 names: list[str],
399) -> list[dict[str, Any]]:
400 from prefect.cli._prompts import prompt_select_from_table
402 matched_deploy_configs: list[dict[str, Any]] = []
403 deployment_names: list[str] = []
404 for name in names:
405 matching_deployments = _filter_matching_deploy_config(name, deploy_configs)
407 if len(matching_deployments) > 1 and root.is_interactive():
408 user_selected_matching_deployment = prompt_select_from_table(
409 app.console,
410 (
411 "Found multiple deployment configurations with the name"
412 f" [yellow]{name}[/yellow]. Please select the one you would"
413 " like to deploy:"
414 ),
415 [
416 {"header": "Name", "key": "name"},
417 {"header": "Entrypoint", "key": "entrypoint"},
418 {"header": "Description", "key": "description"},
419 ],
420 matching_deployments,
421 )
422 matched_deploy_configs.append(user_selected_matching_deployment)
423 elif matching_deployments:
424 matched_deploy_configs.extend(matching_deployments)
426 deployment_names.append(name.split("/")[-1])
428 unfound_names = set(deployment_names) - {
429 deploy_config.get("name") for deploy_config in matched_deploy_configs
430 }
431 _log_missing_deployment_names(unfound_names, matched_deploy_configs, names)
433 return matched_deploy_configs
436def _pick_deploy_configs( 1a
437 deploy_configs: list[dict[str, Any]],
438 names: Optional[list[str]] = None,
439 deploy_all: bool = False,
440) -> list[dict[str, Any]]:
441 names = names or []
443 if deploy_all and names:
444 raise ValueError(
445 "Cannot use both `--all` and `--name` at the same time. Use only one."
446 )
448 if not deploy_configs:
449 if not root.is_interactive():
450 return [
451 _merge_with_default_deploy_config({}),
452 ]
453 selected_deploy_config = _handle_pick_deploy_without_name(deploy_configs)
454 if not selected_deploy_config:
455 return [
456 _merge_with_default_deploy_config({}),
457 ]
458 return selected_deploy_config
460 # Original behavior (pre-refactor): in non-interactive mode, if there is
461 # exactly one deploy config and at most one name provided, proceed with the
462 # single deploy config even if the provided name does not match. This allows
463 # users/tests to override the name via CLI while still inheriting templated
464 # fields (e.g., version, tags, description) from the config.
465 if (not root.is_interactive()) and len(deploy_configs) == 1 and len(names) <= 1:
466 return [
467 _merge_with_default_deploy_config(deploy_configs[0]),
468 ]
470 if not names and not deploy_all:
471 if not root.is_interactive():
472 if len(deploy_configs) == 1:
473 return [
474 _merge_with_default_deploy_config(deploy_configs[0]),
475 ]
476 # Mirror original behavior: error when multiple configs present and no
477 # explicit name provided in non-interactive mode.
478 raise ValueError(
479 "Discovered one or more deployment configurations, but no name was"
480 " given. Please specify the name of at least one deployment to"
481 " create or update."
482 )
483 selected_deploy_config = _handle_pick_deploy_without_name(deploy_configs)
484 if not selected_deploy_config:
485 return [
486 _merge_with_default_deploy_config({}),
487 ]
488 return selected_deploy_config
490 if names:
491 matched_deploy_configs = _handle_pick_deploy_with_name(deploy_configs, names)
492 return matched_deploy_configs
494 if deploy_all:
495 return [
496 _merge_with_default_deploy_config(deploy_config)
497 for deploy_config in deploy_configs
498 ]
500 raise ValueError("Invalid selection. Please try again.")
503def _handle_deprecated_schedule_fields(deploy_config: dict[str, Any]): 1a
504 deploy_config = deepcopy(deploy_config)
506 legacy_schedule = deploy_config.get("schedule", NotSet)
507 schedule_configs = deploy_config.get("schedules", NotSet)
509 if (
510 legacy_schedule
511 and legacy_schedule is not NotSet
512 and schedule_configs is not NotSet
513 ):
514 raise ValueError(
515 "Both 'schedule' and 'schedules' keys are present in the deployment"
516 " configuration. Please use only use `schedules`."
517 )
519 if legacy_schedule and isinstance(legacy_schedule, dict):
520 deploy_config["schedules"] = [deploy_config["schedule"]]
522 return deploy_config