Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_core.py: 12%
185 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import inspect 1a
4import os 1a
5from copy import deepcopy 1a
6from pathlib import Path 1a
7from typing import TYPE_CHECKING, Any, Optional 1a
9from rich.panel import Panel 1a
11import prefect.cli.root as root 1a
12from prefect.cli._prompts import ( 1a
13 confirm,
14 prompt,
15 prompt_build_custom_docker_image,
16 prompt_entrypoint,
17 prompt_push_custom_docker_image,
18 prompt_select_work_pool,
19)
20from prefect.cli.root import app 1a
21from prefect.client.orchestration import get_client 1a
22from prefect.client.schemas.filters import WorkerFilter 1a
23from prefect.deployments.base import _save_deployment_to_prefect_file 1a
24from prefect.deployments.runner import RunnerDeployment 1a
25from prefect.deployments.steps.core import run_steps 1a
26from prefect.exceptions import ObjectNotFound 1a
27from prefect.flows import load_flow_from_entrypoint 1a
28from prefect.settings import get_current_settings 1a
29from prefect.utilities.callables import parameter_schema 1a
30from prefect.utilities.collections import get_from_dict 1a
31from prefect.utilities.templating import ( 1a
32 apply_values,
33 resolve_block_document_references,
34 resolve_variables,
35)
37from ._actions import ( 1a
38 _generate_actions_for_remote_flow_storage,
39 _generate_default_pull_action,
40)
41from ._config import ( 1a
42 _apply_cli_options_to_deploy_config,
43 _handle_deprecated_schedule_fields,
44 _merge_with_default_deploy_config,
45)
46from ._schedules import _construct_schedules 1a
47from ._sla import ( 1a
48 _create_slas,
49 _gather_deployment_sla_definitions,
50 _initialize_deployment_slas,
51)
52from ._storage import _PullStepStorage 1a
53from ._triggers import ( 1a
54 _create_deployment_triggers,
55 _gather_deployment_trigger_definitions,
56 _initialize_deployment_triggers,
57)
59if TYPE_CHECKING: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true1a
60 from prefect.client.orchestration import PrefectClient
63async def _run_single_deploy( 1a
64 deploy_config: dict[str, Any],
65 actions: dict[str, Any],
66 options: dict[str, Any] | None = None,
67 client: Optional["PrefectClient"] = None,
68 prefect_file: Path = Path("prefect.yaml"),
69):
70 client = client or get_client()
71 deploy_config = deepcopy(deploy_config) if deploy_config else {}
72 actions = deepcopy(actions) if actions else {}
73 options = deepcopy(options) if options else {}
75 deploy_config = _merge_with_default_deploy_config(deploy_config)
76 deploy_config = _handle_deprecated_schedule_fields(deploy_config)
77 (
78 deploy_config,
79 variable_overrides,
80 ) = _apply_cli_options_to_deploy_config(deploy_config, options)
82 build_steps = deploy_config.get("build", actions.get("build")) or []
83 push_steps = deploy_config.get("push", actions.get("push")) or []
84 pull_steps = deploy_config.get("pull", actions.get("pull")) or []
86 deploy_config = await resolve_block_document_references(deploy_config)
87 deploy_config = await resolve_variables(deploy_config)
89 # check for env var placeholders early so users can pass work pool names, etc.
90 deploy_config = apply_values(deploy_config, os.environ, remove_notset=False)
92 if not deploy_config.get("entrypoint"):
93 if not root.is_interactive():
94 raise ValueError(
95 "An entrypoint must be provided:\n\n"
96 " \t[yellow]prefect deploy path/to/file.py:flow_function\n\n"
97 "You can also provide an entrypoint in a prefect.yaml file."
98 )
99 deploy_config["entrypoint"] = await prompt_entrypoint(app.console)
101 flow = load_flow_from_entrypoint(deploy_config["entrypoint"])
103 deploy_config["flow_name"] = flow.name
105 deployment_name = deploy_config.get("name")
106 if not deployment_name:
107 if not root.is_interactive():
108 raise ValueError("A deployment name must be provided.")
109 deploy_config["name"] = prompt("Deployment name", default="default")
111 deploy_config["parameter_openapi_schema"] = parameter_schema(flow)
113 work_pool_name = get_from_dict(deploy_config, "work_pool.name")
115 # determine work pool
116 if work_pool_name:
117 try:
118 work_pool = await client.read_work_pool(deploy_config["work_pool"]["name"])
120 # dont allow submitting to prefect-agent typed work pools
121 if work_pool.type == "prefect-agent":
122 if not root.is_interactive():
123 raise ValueError(
124 "Cannot create a project-style deployment with work pool of"
125 " type 'prefect-agent'. If you wish to use an agent with"
126 " your deployment, please use the `prefect deployment"
127 " build` command."
128 )
129 app.console.print(
130 "You've chosen a work pool with type 'prefect-agent' which"
131 " cannot be used for project-style deployments. Let's pick"
132 " another work pool to deploy to."
133 )
134 deploy_config["work_pool"]["name"] = await prompt_select_work_pool(
135 app.console
136 )
137 except ObjectNotFound:
138 raise ValueError(
139 "This deployment configuration references work pool"
140 f" {deploy_config['work_pool']['name']!r} which does not exist. This"
141 " means no worker will be able to pick up its runs. You can create a"
142 " work pool in the Prefect UI."
143 )
144 else:
145 if not root.is_interactive():
146 raise ValueError(
147 "A work pool is required to deploy this flow. Please specify a work"
148 " pool name via the '--pool' flag or in your prefect.yaml file."
149 )
150 if not isinstance(deploy_config.get("work_pool"), dict):
151 deploy_config["work_pool"] = {}
152 deploy_config["work_pool"]["name"] = await prompt_select_work_pool(
153 console=app.console
154 )
156 docker_build_steps = [
157 "prefect_docker.deployments.steps.build_docker_image",
158 ]
160 docker_push_steps = [
161 "prefect_docker.deployments.steps.push_docker_image",
162 ]
164 docker_build_step_exists = any(
165 any(step in action for step in docker_build_steps)
166 for action in deploy_config.get("build", actions.get("build")) or []
167 )
169 update_work_pool_image = False
171 build_step_set_to_null = "build" in deploy_config and (
172 deploy_config["build"] is None
173 or deploy_config["build"] == {}
174 or deploy_config["build"] == []
175 )
177 work_pool = await client.read_work_pool(deploy_config["work_pool"]["name"])
179 image_properties = (
180 work_pool.base_job_template.get("variables", {})
181 .get("properties", {})
182 .get("image", {})
183 )
184 image_is_configurable = (
185 "image"
186 in work_pool.base_job_template.get("variables", {}).get("properties", {})
187 and image_properties.get("type") == "string"
188 and not image_properties.get("enum")
189 )
191 if (
192 root.is_interactive()
193 and not docker_build_step_exists
194 and not build_step_set_to_null
195 and image_is_configurable
196 ):
197 build_docker_image_step = await prompt_build_custom_docker_image(
198 app.console, deploy_config
199 )
200 if build_docker_image_step is not None:
201 if not get_from_dict(deploy_config, "work_pool.job_variables.image"):
202 update_work_pool_image = True
204 (
205 push_docker_image_step,
206 updated_build_docker_image_step,
207 ) = await prompt_push_custom_docker_image(
208 app.console, deploy_config, build_docker_image_step
209 )
211 if actions.get("build"):
212 actions["build"].append(updated_build_docker_image_step)
213 else:
214 actions["build"] = [updated_build_docker_image_step]
216 if push_docker_image_step is not None:
217 if actions.get("push"):
218 actions["push"].append(push_docker_image_step)
219 else:
220 actions["push"] = [push_docker_image_step]
222 build_steps = deploy_config.get("build", actions.get("build")) or []
223 push_steps = deploy_config.get("push", actions.get("push")) or []
225 docker_push_step_exists = any(
226 any(step in action for step in docker_push_steps)
227 for action in deploy_config.get("push", actions.get("push")) or []
228 )
230 ## CONFIGURE PUSH and/or PULL STEPS FOR REMOTE FLOW STORAGE
231 if (
232 root.is_interactive()
233 and not (deploy_config.get("pull") or actions.get("pull"))
234 and not docker_push_step_exists
235 and confirm(
236 (
237 "Your Prefect workers will need access to this flow's code in order to"
238 " run it. Would you like your workers to pull your flow code from a"
239 " remote storage location when running this flow?"
240 ),
241 default=True,
242 console=app.console,
243 )
244 ):
245 actions = await _generate_actions_for_remote_flow_storage(
246 console=app.console, deploy_config=deploy_config, actions=actions
247 )
249 # Prefer the originally captured pull_steps (taken before resolution) to
250 # preserve unresolved block placeholders in the deployment spec. Only fall
251 # back to the config/actions/default if no pull steps were provided.
252 pull_steps = (
253 pull_steps
254 or deploy_config.get("pull")
255 or actions.get("pull")
256 or await _generate_default_pull_action(
257 app.console,
258 deploy_config=deploy_config,
259 actions=actions,
260 )
261 )
263 ## RUN BUILD AND PUSH STEPS
264 step_outputs: dict[str, Any] = {}
265 if build_steps:
266 app.console.print("Running deployment build steps...")
267 step_outputs.update(
268 await run_steps(build_steps, step_outputs, print_function=app.console.print)
269 )
271 if push_steps := push_steps or actions.get("push"):
272 app.console.print("Running deployment push steps...")
273 step_outputs.update(
274 await run_steps(push_steps, step_outputs, print_function=app.console.print)
275 )
277 step_outputs.update(variable_overrides)
279 if update_work_pool_image:
280 if "build-image" not in step_outputs:
281 app.console.print(
282 "Warning: no build-image step found in the deployment build steps."
283 " The work pool image will not be updated."
284 )
285 deploy_config["work_pool"]["job_variables"]["image"] = "{{ build-image.image }}"
287 if not deploy_config.get("description"):
288 deploy_config["description"] = flow.description
290 deploy_config["schedules"] = _construct_schedules(deploy_config, step_outputs)
292 # save deploy_config before templating
293 deploy_config_before_templating = deepcopy(deploy_config)
294 ## apply templating from build and push steps to the final deployment spec
295 _parameter_schema = deploy_config.pop("parameter_openapi_schema")
297 _schedules = deploy_config.pop("schedules")
299 # Save triggers before templating to preserve event template parameters
300 _triggers = deploy_config.pop("triggers", None)
302 deploy_config = apply_values(deploy_config, step_outputs, warn_on_notset=True)
303 deploy_config["parameter_openapi_schema"] = _parameter_schema
304 deploy_config["schedules"] = _schedules
306 # This initialises triggers after templating to ensure that jinja variables are resolved
307 # Use the pre-templated trigger specs to preserve event template parameters like {{ event.name }}
308 # while still applying templating to trigger-level fields like enabled
309 if trigger_specs := _gather_deployment_trigger_definitions(
310 options.get("triggers"), _triggers
311 ):
312 # Apply templating only to non-parameter trigger fields to preserve event templates
313 templated_trigger_specs = []
314 for spec in trigger_specs:
315 # Save parameters before templating
316 parameters = spec.pop("parameters", None)
317 # Apply templating to trigger fields (e.g., enabled)
318 templated_spec = apply_values(spec, step_outputs, warn_on_notset=False)
319 # Restore parameters without templating
320 if parameters is not None:
321 templated_spec["parameters"] = parameters
322 templated_trigger_specs.append(templated_spec)
323 triggers = _initialize_deployment_triggers(
324 deployment_name, templated_trigger_specs
325 )
326 else:
327 triggers = []
329 if isinstance(deploy_config.get("concurrency_limit"), dict):
330 deploy_config["concurrency_options"] = {
331 "collision_strategy": get_from_dict(
332 deploy_config, "concurrency_limit.collision_strategy"
333 )
334 }
335 deploy_config["concurrency_limit"] = get_from_dict(
336 deploy_config, "concurrency_limit.limit"
337 )
339 pull_steps = apply_values(pull_steps, step_outputs, remove_notset=False)
341 deployment = RunnerDeployment(
342 name=deploy_config["name"],
343 flow_name=deploy_config.get("flow_name"),
344 entrypoint=deploy_config.get("entrypoint"),
345 work_pool_name=get_from_dict(deploy_config, "work_pool.name"),
346 work_queue_name=get_from_dict(deploy_config, "work_pool.work_queue_name"),
347 parameters=deploy_config.get("parameters"),
348 description=deploy_config.get("description"),
349 version=deploy_config.get("version") or options.get("version"),
350 version_type=deploy_config.get("version_type") or options.get("version_type"),
351 tags=deploy_config.get("tags"),
352 concurrency_limit=deploy_config.get("concurrency_limit"),
353 concurrency_options=deploy_config.get("concurrency_options"),
354 paused=deploy_config.get("paused"),
355 storage=_PullStepStorage(pull_steps),
356 job_variables=get_from_dict(deploy_config, "work_pool.job_variables"),
357 )
359 deployment._set_defaults_from_flow(flow)
361 deployment._parameter_openapi_schema = deploy_config["parameter_openapi_schema"]
363 if deploy_config.get("enforce_parameter_schema") is not None:
364 deployment.enforce_parameter_schema = deploy_config.get(
365 "enforce_parameter_schema"
366 )
368 apply_coro = deployment.apply(schedules=deploy_config.get("schedules"))
369 if TYPE_CHECKING:
370 assert inspect.isawaitable(apply_coro)
372 deployment_id = await apply_coro
374 await _create_deployment_triggers(client, deployment_id, triggers)
376 # # We want to ensure that if a user passes an empty list of SLAs, we call the
377 # # apply endpoint to remove existing SLAs for the deployment.
378 # # If the argument is not provided, we will not call the endpoint.
379 # Import SLA helpers from the package namespace to honor test monkeypatches
380 sla_specs = _gather_deployment_sla_definitions(
381 options.get("sla"), deploy_config.get("sla")
382 )
383 if sla_specs is not None:
384 slas = _initialize_deployment_slas(deployment_id, sla_specs)
385 await _create_slas(client, deployment_id, slas)
387 app.console.print(
388 Panel(
389 f"Deployment '{deploy_config['flow_name']}/{deploy_config['name']}'"
390 f" successfully created with id '{deployment_id}'."
391 ),
392 style="green",
393 )
395 if ui_url := get_current_settings().ui_url:
396 message = (
397 "\nView Deployment in UI:"
398 f" {ui_url}/deployments/deployment/{deployment_id}\n"
399 )
400 app.console.print(message, soft_wrap=True)
402 if root.is_interactive() and not prefect_file.exists():
403 if confirm(
404 (
405 "Would you like to save configuration for this deployment for faster"
406 " deployments in the future?"
407 ),
408 console=app.console,
409 ):
410 deploy_config_before_templating.update({"schedules": _schedules})
411 _save_deployment_to_prefect_file(
412 deploy_config_before_templating,
413 build_steps=build_steps or None,
414 push_steps=push_steps or None,
415 pull_steps=pull_steps or None,
416 triggers=trigger_specs or None,
417 sla=sla_specs or None,
418 prefect_file=prefect_file,
419 )
420 app.console.print(
421 (
422 f"\n[green]Deployment configuration saved to {prefect_file}![/]"
423 " You can now deploy using this deployment configuration"
424 " with:\n\n\t[blue]$ prefect deploy -n"
425 f" {deploy_config['name']}[/]\n\nYou can also make changes to"
426 " this deployment configuration by making changes to the"
427 " YAML file."
428 ),
429 )
430 active_workers = []
431 if work_pool_name:
432 active_workers = await client.read_workers_for_work_pool(
433 work_pool_name, worker_filter=WorkerFilter(status={"any_": ["ONLINE"]})
434 )
436 if (
437 not work_pool.is_push_pool
438 and not work_pool.is_managed_pool
439 and not active_workers
440 ):
441 app.console.print(
442 "\nTo execute flow runs from these deployments, start a worker in a"
443 " separate terminal that pulls work from the"
444 f" {work_pool_name!r} work pool:"
445 )
446 app.console.print(
447 f"\n\t$ prefect worker start --pool {work_pool_name!r}",
448 style="blue",
449 )
450 app.console.print(
451 "\nTo schedule a run for this deployment, use the following command:"
452 )
453 app.console.print(
454 (
455 "\n\t$ prefect deployment run"
456 f" '{deploy_config['flow_name']}/{deploy_config['name']}'\n"
457 ),
458 style="blue",
459 )
462async def _run_multi_deploy( 1a
463 deploy_configs: list[dict[str, Any]],
464 actions: dict[str, Any],
465 names: Optional[list[str]] = None,
466 deploy_all: bool = False,
467 prefect_file: Path = Path("prefect.yaml"),
468):
469 deploy_configs = deepcopy(deploy_configs) if deploy_configs else []
470 actions = deepcopy(actions) if actions else {}
471 names = names or []
473 if deploy_all:
474 app.console.print(
475 "Deploying all flows with an existing deployment configuration..."
476 )
477 else:
478 app.console.print("Deploying flows with selected deployment configurations...")
479 for deploy_config in deploy_configs:
480 if deploy_config.get("name") is None:
481 if not root.is_interactive():
482 app.console.print(
483 "Discovered unnamed deployment. Skipping...", style="yellow"
484 )
485 continue
486 app.console.print("Discovered unnamed deployment.", style="yellow")
487 app.console.print_json(data=deploy_config)
488 if confirm(
489 "Would you like to give this deployment a name and deploy it?",
490 default=True,
491 console=app.console,
492 ):
493 deploy_config["name"] = prompt("Deployment name", default="default")
494 else:
495 app.console.print("Skipping unnamed deployment.", style="yellow")
496 continue
497 app.console.print(Panel(f"Deploying {deploy_config['name']}", style="blue"))
498 await _run_single_deploy(deploy_config, actions, prefect_file=prefect_file)