Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/base.py: 9%
144 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Core primitives for managing Prefect deployments via `prefect deploy`, providing a minimally opinionated
3build system for managing flows and deployments.
5To get started, follow along with [the deployments tutorial](https://docs.prefect.io/v3/how-to-guides/deployments/create-deployments).
6"""
8from __future__ import annotations 1a
10import os 1a
11from copy import deepcopy 1a
12from pathlib import Path 1a
13from typing import Any, Dict, List, Optional 1a
15import yaml 1a
16from ruamel.yaml import YAML 1a
18from prefect.client.schemas.objects import ConcurrencyLimitStrategy 1a
19from prefect.client.schemas.schedules import IntervalSchedule 1a
20from prefect.utilities._git import get_git_branch, get_git_remote_origin_url 1a
21from prefect.utilities.annotations import NotSet 1a
22from prefect.utilities.filesystem import create_default_ignore_file 1a
23from prefect.utilities.templating import apply_values 1a
26def create_default_prefect_yaml( 1a
27 path: str, name: Optional[str] = None, contents: Optional[Dict[str, Any]] = None
28) -> bool:
29 """
30 Creates default `prefect.yaml` file in the provided path if one does not already exist;
31 returns boolean specifying whether a file was created.
33 Args:
34 name (str, optional): the name of the project; if not provided, the current directory name
35 will be used
36 contents (dict, optional): a dictionary of contents to write to the file; if not provided,
37 defaults will be used
38 """
39 path = Path(path)
40 prefect_file = path / "prefect.yaml"
41 if prefect_file.exists():
42 return False
43 default_file = Path(__file__).parent / "templates" / "prefect.yaml"
45 with default_file.open(mode="r") as df:
46 default_contents = yaml.safe_load(df)
48 import prefect
50 contents["prefect-version"] = prefect.__version__
51 contents["name"] = name
53 with prefect_file.open(mode="w") as f:
54 # write header
55 f.write(
56 "# Welcome to your prefect.yaml file! You can use this file for storing and"
57 " managing\n# configuration for deploying your flows. We recommend"
58 " committing this file to source\n# control along with your flow code.\n\n"
59 )
61 f.write("# Generic metadata about this project\n")
62 yaml.dump({"name": contents["name"]}, f, sort_keys=False)
63 yaml.dump({"prefect-version": contents["prefect-version"]}, f, sort_keys=False)
64 f.write("\n")
66 # build
67 f.write("# build section allows you to manage and build docker images\n")
68 yaml.dump(
69 {"build": contents.get("build", default_contents.get("build"))},
70 f,
71 sort_keys=False,
72 )
73 f.write("\n")
75 # push
76 f.write(
77 "# push section allows you to manage if and how this project is uploaded to"
78 " remote locations\n"
79 )
80 yaml.dump(
81 {"push": contents.get("push", default_contents.get("push"))},
82 f,
83 sort_keys=False,
84 )
85 f.write("\n")
87 # pull
88 f.write(
89 "# pull section allows you to provide instructions for cloning this project"
90 " in remote locations\n"
91 )
92 yaml.dump(
93 {"pull": contents.get("pull", default_contents.get("pull"))},
94 f,
95 sort_keys=False,
96 )
97 f.write("\n")
99 # deployments
100 f.write(
101 "# the deployments section allows you to provide configuration for"
102 " deploying flows\n"
103 )
104 yaml.dump(
105 {
106 "deployments": contents.get(
107 "deployments", default_contents.get("deployments")
108 )
109 },
110 f,
111 sort_keys=False,
112 )
114 return True
117def configure_project_by_recipe( 1a
118 recipe: str, **formatting_kwargs: Any
119) -> dict[str, Any] | type[NotSet]:
120 """
121 Given a recipe name, returns a dictionary representing base configuration options.
123 Args:
124 recipe (str): the name of the recipe to use
125 formatting_kwargs (dict, optional): additional keyword arguments to format the recipe
127 Raises:
128 ValueError: if provided recipe name does not exist.
129 """
130 # load the recipe
131 recipe_path = Path(__file__).parent / "recipes" / recipe / "prefect.yaml"
133 if not recipe_path.exists():
134 raise ValueError(f"Unknown recipe {recipe!r} provided.")
136 with recipe_path.open(mode="r") as f:
137 config: dict[str, Any] = yaml.safe_load(f)
139 templated_config = apply_values(
140 template=config, values=formatting_kwargs, remove_notset=False
141 )
143 return templated_config
146def initialize_project( 1a
147 name: Optional[str] = None,
148 recipe: Optional[str] = None,
149 inputs: Optional[Dict[str, Any]] = None,
150) -> List[str]:
151 """
152 Initializes a basic project structure with base files. If no name is provided, the name
153 of the current directory is used. If no recipe is provided, one is inferred.
155 Args:
156 name (str, optional): the name of the project; if not provided, the current directory name
157 recipe (str, optional): the name of the recipe to use; if not provided, one is inferred
158 inputs (dict, optional): a dictionary of inputs to use when formatting the recipe
160 Returns:
161 List[str]: a list of files / directories that were created
162 """
163 # determine if in git repo or use directory name as a default
164 is_git_based = False
165 formatting_kwargs = {"directory": str(Path(".").absolute().resolve())}
166 dir_name = os.path.basename(os.getcwd())
168 remote_url = get_git_remote_origin_url()
169 if remote_url:
170 formatting_kwargs["repository"] = remote_url
171 is_git_based = True
172 branch = get_git_branch()
173 formatting_kwargs["branch"] = branch or "main"
175 formatting_kwargs["name"] = dir_name
177 has_dockerfile = Path("Dockerfile").exists()
179 if has_dockerfile:
180 formatting_kwargs["dockerfile"] = "Dockerfile"
181 elif recipe is not None and "docker" in recipe:
182 formatting_kwargs["dockerfile"] = "auto"
184 # hand craft a pull step
185 if is_git_based and recipe is None:
186 if has_dockerfile:
187 recipe = "docker-git"
188 else:
189 recipe = "git"
190 elif recipe is None and has_dockerfile:
191 recipe = "docker"
192 elif recipe is None:
193 recipe = "local"
195 formatting_kwargs.update(inputs or {})
196 configuration = configure_project_by_recipe(recipe=recipe, **formatting_kwargs)
198 project_name = name or dir_name
200 files = []
201 if create_default_ignore_file("."):
202 files.append(".prefectignore")
203 if create_default_prefect_yaml(".", name=project_name, contents=configuration):
204 files.append("prefect.yaml")
206 return files
209def _format_deployment_for_saving_to_prefect_file( 1a
210 deployment: dict[str, Any],
211) -> dict[str, Any]:
212 """
213 Formats a deployment into a templated deploy config for saving to prefect.yaml.
215 Args:
216 - deployment (Dict): a dictionary containing an untemplated deployment configuration
218 Returns:
219 - deployment (Dict): a dictionary containing a templated deployment configuration
220 """
221 if not deployment:
222 raise ValueError("Deployment must be a non-empty dictionary.")
223 deployment = deepcopy(deployment)
224 # Parameter schema is not stored in prefect.yaml
225 deployment.pop("parameter_openapi_schema")
226 # Only want entrypoint to avoid errors
227 deployment.pop("flow_name", None)
229 if deployment.get("schedules"):
230 schedules: list[dict[str, Any]] = []
231 for deployment_schedule in deployment["schedules"]:
232 if isinstance(deployment_schedule["schedule"], IntervalSchedule):
233 schedule_config = _interval_schedule_to_dict(
234 deployment_schedule["schedule"]
235 )
236 else: # all valid SCHEDULE_TYPES are subclasses of BaseModel
237 schedule_config = deployment_schedule["schedule"].model_dump()
239 if "active" in deployment_schedule:
240 schedule_config["active"] = deployment_schedule["active"]
241 schedules.append(schedule_config)
243 deployment["schedules"] = schedules
245 if deployment.get("concurrency_limit"):
246 concurrency_limit = deployment["concurrency_limit"]
247 if isinstance(concurrency_limit, dict):
248 if isinstance(
249 concurrency_limit["collision_strategy"], ConcurrencyLimitStrategy
250 ):
251 concurrency_limit["collision_strategy"] = str(
252 concurrency_limit["collision_strategy"].value
253 )
254 deployment["concurrency_limit"] = concurrency_limit
256 return deployment
259def _interval_schedule_to_dict(schedule: IntervalSchedule) -> dict[str, Any]: 1a
260 """
261 Converts an IntervalSchedule to a dictionary.
263 Args:
264 - schedule (IntervalSchedule): the schedule to convert
266 Returns:
267 - dict[str, Any]: the schedule as a dictionary
268 """
269 schedule_config = schedule.model_dump()
270 schedule_config["interval"] = schedule_config["interval"].total_seconds()
271 schedule_config["anchor_date"] = schedule_config["anchor_date"].isoformat()
273 return schedule_config
276def _save_deployment_to_prefect_file( 1a
277 deployment: dict[str, Any],
278 build_steps: list[dict[str, Any]] | None = None,
279 push_steps: list[dict[str, Any]] | None = None,
280 pull_steps: list[dict[str, Any]] | None = None,
281 triggers: list[dict[str, Any]] | None = None,
282 sla: list[dict[str, Any]] | None = None,
283 prefect_file: Path = Path("prefect.yaml"),
284):
285 """
286 Save a deployment configuration to the `prefect.yaml` file in the
287 current directory.
289 Will create a prefect.yaml file if one does not already exist.
291 Args:
292 - deployment: a dictionary containing a deployment configuration
293 """
294 deployment = _format_deployment_for_saving_to_prefect_file(deployment)
296 current_directory_name = os.path.basename(os.getcwd())
297 if not prefect_file.exists():
298 if triggers:
299 deployment["triggers"] = triggers
300 if sla:
301 deployment["sla"] = sla
302 create_default_prefect_yaml(
303 ".",
304 current_directory_name,
305 contents={
306 "deployments": [deployment],
307 "build": build_steps,
308 "push": push_steps,
309 "pull": pull_steps,
310 },
311 )
312 create_default_ignore_file(".")
313 else:
314 # use ruamel.yaml to preserve comments
315 ryaml = YAML()
316 with prefect_file.open(mode="r") as f:
317 parsed_prefect_file_contents = ryaml.load(f)
319 if build_steps != parsed_prefect_file_contents.get("build"):
320 deployment["build"] = build_steps
322 if push_steps != parsed_prefect_file_contents.get("push"):
323 deployment["push"] = push_steps
325 if pull_steps != parsed_prefect_file_contents.get("pull"):
326 deployment["pull"] = pull_steps
328 if triggers and triggers != parsed_prefect_file_contents.get("triggers"):
329 deployment["triggers"] = triggers
331 if sla and sla != parsed_prefect_file_contents.get("sla"):
332 deployment["sla"] = sla
334 deployments = parsed_prefect_file_contents.get("deployments")
335 if deployments is None:
336 parsed_prefect_file_contents["deployments"] = [deployment]
337 else:
338 for i, existing_deployment in enumerate(deployments):
339 if existing_deployment.get("name") == deployment.get("name") and (
340 existing_deployment.get("entrypoint")
341 == deployment.get("entrypoint")
342 ):
343 deployments[i] = deployment
344 break
345 else:
346 deployments.append(deployment)
348 with prefect_file.open(mode="w") as f:
349 ryaml.dump(parsed_prefect_file_contents, f)