Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_commands.py: 14%
90 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3from pathlib import Path 1a
4from typing import Any, List, Optional 1a
6import typer 1a
7import yaml 1a
8from rich.table import Table 1a
10import prefect 1a
11from prefect.cli._utilities import exit_with_error 1a
12from prefect.cli.root import app, is_interactive 1a
13from prefect.client.schemas.objects import ConcurrencyLimitConfig 1a
14from prefect.deployments import initialize_project 1a
15from prefect.settings import get_current_settings 1a
17from ._config import ( 1a
18 _load_deploy_configs_and_actions,
19 _parse_name_from_pattern,
20 _pick_deploy_configs,
21)
22from ._core import _run_multi_deploy, _run_single_deploy 1a
25@app.command() 1a
26async def init( 1a
27 name: Optional[str] = None,
28 recipe: Optional[str] = None,
29 fields: Optional[list[str]] = typer.Option(
30 None,
31 "-f",
32 "--field",
33 help=(
34 "One or more fields to pass to the recipe (e.g., image_name) in the format"
35 " of key=value."
36 ),
37 ),
38):
39 inputs: dict[str, Any] = {}
40 fields = fields or []
41 recipe_paths = prefect.__module_path__ / "deployments" / "recipes"
43 for field in fields:
44 key, value = field.split("=")
45 inputs[key] = value
47 from prefect.cli._prompts import prompt_select_from_table
49 if not recipe and is_interactive():
50 recipe_paths = prefect.__module_path__ / "deployments" / "recipes"
51 recipes: list[dict[str, Any]] = []
52 for r in recipe_paths.iterdir():
53 if r.is_dir() and (r / "prefect.yaml").exists():
54 with open(r / "prefect.yaml") as f:
55 recipe_data = yaml.safe_load(f)
56 recipe_name = r.name
57 recipe_description = recipe_data.get(
58 "description", "(no description available)"
59 )
60 recipe_dict = {
61 "name": recipe_name,
62 "description": recipe_description,
63 }
64 recipes.append(recipe_dict)
66 selected_recipe = prompt_select_from_table(
67 app.console,
68 "Would you like to initialize your deployment configuration with a recipe?",
69 columns=[
70 {"header": "Name", "key": "name"},
71 {"header": "Description", "key": "description"},
72 ],
73 data=recipes,
74 opt_out_message="No, I'll use the default deployment configuration.",
75 opt_out_response={},
76 )
77 if selected_recipe != {}:
78 recipe = selected_recipe["name"]
80 if recipe and (recipe_paths / recipe / "prefect.yaml").exists():
81 with open(recipe_paths / recipe / "prefect.yaml") as f:
82 recipe_inputs = yaml.safe_load(f).get("required_inputs") or {}
83 if recipe_inputs:
84 if set(recipe_inputs.keys()) < set(inputs.keys()):
85 app.console.print(
86 (
87 f"Warning: extra fields provided for {recipe!r} recipe:"
88 f" '{', '.join(set(inputs.keys()) - set(recipe_inputs.keys()))}'"
89 ),
90 style="red",
91 )
92 elif set(recipe_inputs.keys()) > set(inputs.keys()):
93 table = Table(
94 title=f"[red]Required inputs for {recipe!r} recipe[/red]",
95 )
96 table.add_column("Field Name", style="green", no_wrap=True)
97 table.add_column(
98 "Description", justify="left", style="white", no_wrap=False
99 )
100 for field, description in recipe_inputs.items():
101 if field not in inputs:
102 table.add_row(field, description)
103 app.console.print(table)
104 for key, description in recipe_inputs.items():
105 if key not in inputs:
106 inputs[key] = typer.prompt(key)
107 app.console.print("-" * 15)
109 try:
110 files = [
111 f"[green]{fname}[/green]"
112 for fname in initialize_project(name=name, recipe=recipe, inputs=inputs)
113 ]
114 except ValueError as exc:
115 if "Unknown recipe" in str(exc):
116 exit_with_error(
117 f"Unknown recipe {recipe!r} provided - run [yellow]`prefect init`[/yellow] to see all available recipes."
118 )
119 else:
120 raise
122 files = "\n".join(files)
123 empty_msg = f"Created project in [green]{Path('.').resolve()}[/green]; no new files created."
124 file_msg = f"Created project in [green]{Path('.').resolve()}[/green] with the following new files:\n{files}"
125 app.console.print(file_msg if files else empty_msg)
128@app.command() 1a
129async def deploy( 1a
130 entrypoint: str = typer.Argument(
131 None,
132 help=(
133 "The path to a flow entrypoint within a project, in the form of"
134 " `./path/to/file.py:flow_func_name`"
135 ),
136 ),
137 names: List[str] = typer.Option(
138 None,
139 "--name",
140 "-n",
141 help=(
142 "The name to give the deployment. Can be a pattern. Examples:"
143 " 'my-deployment', 'my-flow/my-deployment', 'my-deployment-*',"
144 " '*-flow-name/deployment*'"
145 ),
146 ),
147 description: str = typer.Option(
148 None,
149 "--description",
150 "-d",
151 help=(
152 "The description to give the deployment. If not provided, the description will be populated from the flow's description."
153 ),
154 ),
155 version_type: str = typer.Option(
156 None, "--version-type", help="The type of version to use for this deployment."
157 ),
158 version: str = typer.Option(
159 None, "--version", help="A version to give the deployment."
160 ),
161 tags: List[str] = typer.Option(
162 None,
163 "-t",
164 "--tag",
165 help=(
166 "One or more optional tags to apply to the deployment. Note: tags are used only for organizational purposes. For delegating work to workers, use the --work-queue flag."
167 ),
168 ),
169 concurrency_limit: int = typer.Option(
170 None,
171 "-cl",
172 "--concurrency-limit",
173 help=("The maximum number of concurrent runs for this deployment."),
174 ),
175 concurrency_limit_collision_strategy: str = typer.Option(
176 None,
177 "--collision-strategy",
178 help="Configure the behavior for runs once the concurrency limit is reached. Falls back to `ENQUEUE` if unset.",
179 ),
180 work_pool_name: str = typer.Option(
181 lambda: get_current_settings().deployments.default_work_pool_name,
182 "-p",
183 "--pool",
184 help="The work pool that will handle this deployment's runs.",
185 show_default="from PREFECT_DEFAULT_WORK_POOL_NAME",
186 ),
187 work_queue_name: str = typer.Option(
188 None,
189 "-q",
190 "--work-queue",
191 help=(
192 "The work queue that will handle this deployment's runs. It will be created if it doesn't already exist. Defaults to `None`."
193 ),
194 ),
195 job_variables: List[str] = typer.Option(
196 None,
197 "-jv",
198 "--job-variable",
199 help=(
200 "One or more job variable overrides for the work pool provided in the format of key=value string or a JSON object"
201 ),
202 ),
203 cron: List[str] = typer.Option(
204 None,
205 "--cron",
206 help="A cron string that will be used to set a CronSchedule on the deployment.",
207 ),
208 interval: List[int] = typer.Option(
209 None,
210 "--interval",
211 help=(
212 "An integer specifying an interval (in seconds) that will be used to set an IntervalSchedule on the deployment."
213 ),
214 ),
215 interval_anchor: Optional[str] = typer.Option(
216 None, "--anchor-date", help="The anchor date for all interval schedules"
217 ),
218 rrule: List[str] = typer.Option(
219 None,
220 "--rrule",
221 help="An RRule that will be used to set an RRuleSchedule on the deployment.",
222 ),
223 timezone: str = typer.Option(
224 None,
225 "--timezone",
226 help="Deployment schedule timezone string e.g. 'America/New_York'",
227 ),
228 trigger: List[str] = typer.Option(
229 None,
230 "--trigger",
231 help=(
232 "Specifies a trigger for the deployment. The value can be a json string or path to `.yaml`/`.json` file. This flag can be used multiple times."
233 ),
234 ),
235 param: List[str] = typer.Option(
236 None,
237 "--param",
238 help=(
239 "An optional parameter override, values are parsed as JSON strings e.g. --param question=ultimate --param answer=42"
240 ),
241 ),
242 params: str = typer.Option(
243 None,
244 "--params",
245 help=(
246 "An optional parameter override in a JSON string format e.g. --params='{"
247 "question"
248 ": "
249 "ultimate"
250 ", "
251 "answer"
252 ": 42}'"
253 ),
254 ),
255 enforce_parameter_schema: bool = typer.Option(
256 True,
257 help=(
258 "Whether to enforce the parameter schema on this deployment. If set to True, any parameters passed to this deployment must match the signature of the flow."
259 ),
260 ),
261 deploy_all: bool = typer.Option(
262 False,
263 "--all",
264 help=(
265 "Deploy all flows in the project. If a flow name or entrypoint is also provided, this flag will be ignored."
266 ),
267 ),
268 prefect_file: Path = typer.Option(
269 Path("prefect.yaml"),
270 "--prefect-file",
271 help="Specify a custom path to a prefect.yaml file",
272 ),
273 sla: List[str] = typer.Option(
274 None,
275 "--sla",
276 help="Experimental: One or more SLA configurations for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud.",
277 ),
278):
279 if job_variables is None:
280 job_variables = list()
282 concurrency_limit_config = (
283 None
284 if concurrency_limit is None
285 else (
286 concurrency_limit
287 if concurrency_limit_collision_strategy is None
288 else ConcurrencyLimitConfig(
289 limit=concurrency_limit,
290 collision_strategy=concurrency_limit_collision_strategy,
291 ).model_dump()
292 )
293 )
295 options: dict[str, Any] = {
296 "entrypoint": entrypoint,
297 "description": description,
298 "version_type": version_type,
299 "version": version,
300 "tags": tags,
301 "concurrency_limit": concurrency_limit_config,
302 "work_pool_name": work_pool_name,
303 "work_queue_name": work_queue_name,
304 "variables": job_variables,
305 "cron": cron,
306 "interval": interval,
307 "anchor_date": interval_anchor,
308 "rrule": rrule,
309 "timezone": timezone,
310 "triggers": trigger,
311 "param": param,
312 "params": params,
313 "sla": sla,
314 }
316 try:
317 all_deploy_configs, actions = _load_deploy_configs_and_actions(
318 prefect_file=prefect_file
319 )
320 parsed_names: list[str] = []
321 for name in names or []:
322 if "*" in name:
323 parsed_names.extend(_parse_name_from_pattern(all_deploy_configs, name))
324 else:
325 parsed_names.append(name)
326 deploy_configs = _pick_deploy_configs(
327 all_deploy_configs, parsed_names, deploy_all
328 )
330 if len(deploy_configs) > 1:
331 if any(options.values()):
332 app.console.print(
333 (
334 "You have passed options to the deploy command, but you are creating or updating multiple deployments. These options will be ignored."
335 ),
336 style="yellow",
337 )
338 await _run_multi_deploy(
339 deploy_configs=deploy_configs,
340 actions=actions,
341 deploy_all=deploy_all,
342 prefect_file=prefect_file,
343 )
344 else:
345 deploy_config = deploy_configs[0] if deploy_configs else {}
346 options["names"] = [
347 name.split("/", 1)[-1] if "/" in name else name for name in parsed_names
348 ]
349 if not enforce_parameter_schema:
350 options["enforce_parameter_schema"] = False
351 await _run_single_deploy(
352 deploy_config=deploy_config,
353 actions=actions,
354 options=options,
355 prefect_file=prefect_file,
356 )
357 except ValueError as exc:
358 exit_with_error(str(exc))