Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/base.py: 9%

144 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Core primitives for managing Prefect deployments via `prefect deploy`, providing a minimally opinionated 

3build system for managing flows and deployments. 

4 

5To get started, follow along with [the deployments tutorial](https://docs.prefect.io/v3/how-to-guides/deployments/create-deployments). 

6""" 

7 

8from __future__ import annotations 1a

9 

10import os 1a

11from copy import deepcopy 1a

12from pathlib import Path 1a

13from typing import Any, Dict, List, Optional 1a

14 

15import yaml 1a

16from ruamel.yaml import YAML 1a

17 

18from prefect.client.schemas.objects import ConcurrencyLimitStrategy 1a

19from prefect.client.schemas.schedules import IntervalSchedule 1a

20from prefect.utilities._git import get_git_branch, get_git_remote_origin_url 1a

21from prefect.utilities.annotations import NotSet 1a

22from prefect.utilities.filesystem import create_default_ignore_file 1a

23from prefect.utilities.templating import apply_values 1a

24 

25 

26def create_default_prefect_yaml( 1a

27 path: str, name: Optional[str] = None, contents: Optional[Dict[str, Any]] = None 

28) -> bool: 

29 """ 

30 Creates default `prefect.yaml` file in the provided path if one does not already exist; 

31 returns boolean specifying whether a file was created. 

32 

33 Args: 

34 name (str, optional): the name of the project; if not provided, the current directory name 

35 will be used 

36 contents (dict, optional): a dictionary of contents to write to the file; if not provided, 

37 defaults will be used 

38 """ 

39 path = Path(path) 

40 prefect_file = path / "prefect.yaml" 

41 if prefect_file.exists(): 

42 return False 

43 default_file = Path(__file__).parent / "templates" / "prefect.yaml" 

44 

45 with default_file.open(mode="r") as df: 

46 default_contents = yaml.safe_load(df) 

47 

48 import prefect 

49 

50 contents["prefect-version"] = prefect.__version__ 

51 contents["name"] = name 

52 

53 with prefect_file.open(mode="w") as f: 

54 # write header 

55 f.write( 

56 "# Welcome to your prefect.yaml file! You can use this file for storing and" 

57 " managing\n# configuration for deploying your flows. We recommend" 

58 " committing this file to source\n# control along with your flow code.\n\n" 

59 ) 

60 

61 f.write("# Generic metadata about this project\n") 

62 yaml.dump({"name": contents["name"]}, f, sort_keys=False) 

63 yaml.dump({"prefect-version": contents["prefect-version"]}, f, sort_keys=False) 

64 f.write("\n") 

65 

66 # build 

67 f.write("# build section allows you to manage and build docker images\n") 

68 yaml.dump( 

69 {"build": contents.get("build", default_contents.get("build"))}, 

70 f, 

71 sort_keys=False, 

72 ) 

73 f.write("\n") 

74 

75 # push 

76 f.write( 

77 "# push section allows you to manage if and how this project is uploaded to" 

78 " remote locations\n" 

79 ) 

80 yaml.dump( 

81 {"push": contents.get("push", default_contents.get("push"))}, 

82 f, 

83 sort_keys=False, 

84 ) 

85 f.write("\n") 

86 

87 # pull 

88 f.write( 

89 "# pull section allows you to provide instructions for cloning this project" 

90 " in remote locations\n" 

91 ) 

92 yaml.dump( 

93 {"pull": contents.get("pull", default_contents.get("pull"))}, 

94 f, 

95 sort_keys=False, 

96 ) 

97 f.write("\n") 

98 

99 # deployments 

100 f.write( 

101 "# the deployments section allows you to provide configuration for" 

102 " deploying flows\n" 

103 ) 

104 yaml.dump( 

105 { 

106 "deployments": contents.get( 

107 "deployments", default_contents.get("deployments") 

108 ) 

109 }, 

110 f, 

111 sort_keys=False, 

112 ) 

113 

114 return True 

115 

116 

117def configure_project_by_recipe( 1a

118 recipe: str, **formatting_kwargs: Any 

119) -> dict[str, Any] | type[NotSet]: 

120 """ 

121 Given a recipe name, returns a dictionary representing base configuration options. 

122 

123 Args: 

124 recipe (str): the name of the recipe to use 

125 formatting_kwargs (dict, optional): additional keyword arguments to format the recipe 

126 

127 Raises: 

128 ValueError: if provided recipe name does not exist. 

129 """ 

130 # load the recipe 

131 recipe_path = Path(__file__).parent / "recipes" / recipe / "prefect.yaml" 

132 

133 if not recipe_path.exists(): 

134 raise ValueError(f"Unknown recipe {recipe!r} provided.") 

135 

136 with recipe_path.open(mode="r") as f: 

137 config: dict[str, Any] = yaml.safe_load(f) 

138 

139 templated_config = apply_values( 

140 template=config, values=formatting_kwargs, remove_notset=False 

141 ) 

142 

143 return templated_config 

144 

145 

146def initialize_project( 1a

147 name: Optional[str] = None, 

148 recipe: Optional[str] = None, 

149 inputs: Optional[Dict[str, Any]] = None, 

150) -> List[str]: 

151 """ 

152 Initializes a basic project structure with base files. If no name is provided, the name 

153 of the current directory is used. If no recipe is provided, one is inferred. 

154 

155 Args: 

156 name (str, optional): the name of the project; if not provided, the current directory name 

157 recipe (str, optional): the name of the recipe to use; if not provided, one is inferred 

158 inputs (dict, optional): a dictionary of inputs to use when formatting the recipe 

159 

160 Returns: 

161 List[str]: a list of files / directories that were created 

162 """ 

163 # determine if in git repo or use directory name as a default 

164 is_git_based = False 

165 formatting_kwargs = {"directory": str(Path(".").absolute().resolve())} 

166 dir_name = os.path.basename(os.getcwd()) 

167 

168 remote_url = get_git_remote_origin_url() 

169 if remote_url: 

170 formatting_kwargs["repository"] = remote_url 

171 is_git_based = True 

172 branch = get_git_branch() 

173 formatting_kwargs["branch"] = branch or "main" 

174 

175 formatting_kwargs["name"] = dir_name 

176 

177 has_dockerfile = Path("Dockerfile").exists() 

178 

179 if has_dockerfile: 

180 formatting_kwargs["dockerfile"] = "Dockerfile" 

181 elif recipe is not None and "docker" in recipe: 

182 formatting_kwargs["dockerfile"] = "auto" 

183 

184 # hand craft a pull step 

185 if is_git_based and recipe is None: 

186 if has_dockerfile: 

187 recipe = "docker-git" 

188 else: 

189 recipe = "git" 

190 elif recipe is None and has_dockerfile: 

191 recipe = "docker" 

192 elif recipe is None: 

193 recipe = "local" 

194 

195 formatting_kwargs.update(inputs or {}) 

196 configuration = configure_project_by_recipe(recipe=recipe, **formatting_kwargs) 

197 

198 project_name = name or dir_name 

199 

200 files = [] 

201 if create_default_ignore_file("."): 

202 files.append(".prefectignore") 

203 if create_default_prefect_yaml(".", name=project_name, contents=configuration): 

204 files.append("prefect.yaml") 

205 

206 return files 

207 

208 

209def _format_deployment_for_saving_to_prefect_file( 1a

210 deployment: dict[str, Any], 

211) -> dict[str, Any]: 

212 """ 

213 Formats a deployment into a templated deploy config for saving to prefect.yaml. 

214 

215 Args: 

216 - deployment (Dict): a dictionary containing an untemplated deployment configuration 

217 

218 Returns: 

219 - deployment (Dict): a dictionary containing a templated deployment configuration 

220 """ 

221 if not deployment: 

222 raise ValueError("Deployment must be a non-empty dictionary.") 

223 deployment = deepcopy(deployment) 

224 # Parameter schema is not stored in prefect.yaml 

225 deployment.pop("parameter_openapi_schema") 

226 # Only want entrypoint to avoid errors 

227 deployment.pop("flow_name", None) 

228 

229 if deployment.get("schedules"): 

230 schedules: list[dict[str, Any]] = [] 

231 for deployment_schedule in deployment["schedules"]: 

232 if isinstance(deployment_schedule["schedule"], IntervalSchedule): 

233 schedule_config = _interval_schedule_to_dict( 

234 deployment_schedule["schedule"] 

235 ) 

236 else: # all valid SCHEDULE_TYPES are subclasses of BaseModel 

237 schedule_config = deployment_schedule["schedule"].model_dump() 

238 

239 if "active" in deployment_schedule: 

240 schedule_config["active"] = deployment_schedule["active"] 

241 schedules.append(schedule_config) 

242 

243 deployment["schedules"] = schedules 

244 

245 if deployment.get("concurrency_limit"): 

246 concurrency_limit = deployment["concurrency_limit"] 

247 if isinstance(concurrency_limit, dict): 

248 if isinstance( 

249 concurrency_limit["collision_strategy"], ConcurrencyLimitStrategy 

250 ): 

251 concurrency_limit["collision_strategy"] = str( 

252 concurrency_limit["collision_strategy"].value 

253 ) 

254 deployment["concurrency_limit"] = concurrency_limit 

255 

256 return deployment 

257 

258 

259def _interval_schedule_to_dict(schedule: IntervalSchedule) -> dict[str, Any]: 1a

260 """ 

261 Converts an IntervalSchedule to a dictionary. 

262 

263 Args: 

264 - schedule (IntervalSchedule): the schedule to convert 

265 

266 Returns: 

267 - dict[str, Any]: the schedule as a dictionary 

268 """ 

269 schedule_config = schedule.model_dump() 

270 schedule_config["interval"] = schedule_config["interval"].total_seconds() 

271 schedule_config["anchor_date"] = schedule_config["anchor_date"].isoformat() 

272 

273 return schedule_config 

274 

275 

276def _save_deployment_to_prefect_file( 1a

277 deployment: dict[str, Any], 

278 build_steps: list[dict[str, Any]] | None = None, 

279 push_steps: list[dict[str, Any]] | None = None, 

280 pull_steps: list[dict[str, Any]] | None = None, 

281 triggers: list[dict[str, Any]] | None = None, 

282 sla: list[dict[str, Any]] | None = None, 

283 prefect_file: Path = Path("prefect.yaml"), 

284): 

285 """ 

286 Save a deployment configuration to the `prefect.yaml` file in the 

287 current directory. 

288 

289 Will create a prefect.yaml file if one does not already exist. 

290 

291 Args: 

292 - deployment: a dictionary containing a deployment configuration 

293 """ 

294 deployment = _format_deployment_for_saving_to_prefect_file(deployment) 

295 

296 current_directory_name = os.path.basename(os.getcwd()) 

297 if not prefect_file.exists(): 

298 if triggers: 

299 deployment["triggers"] = triggers 

300 if sla: 

301 deployment["sla"] = sla 

302 create_default_prefect_yaml( 

303 ".", 

304 current_directory_name, 

305 contents={ 

306 "deployments": [deployment], 

307 "build": build_steps, 

308 "push": push_steps, 

309 "pull": pull_steps, 

310 }, 

311 ) 

312 create_default_ignore_file(".") 

313 else: 

314 # use ruamel.yaml to preserve comments 

315 ryaml = YAML() 

316 with prefect_file.open(mode="r") as f: 

317 parsed_prefect_file_contents = ryaml.load(f) 

318 

319 if build_steps != parsed_prefect_file_contents.get("build"): 

320 deployment["build"] = build_steps 

321 

322 if push_steps != parsed_prefect_file_contents.get("push"): 

323 deployment["push"] = push_steps 

324 

325 if pull_steps != parsed_prefect_file_contents.get("pull"): 

326 deployment["pull"] = pull_steps 

327 

328 if triggers and triggers != parsed_prefect_file_contents.get("triggers"): 

329 deployment["triggers"] = triggers 

330 

331 if sla and sla != parsed_prefect_file_contents.get("sla"): 

332 deployment["sla"] = sla 

333 

334 deployments = parsed_prefect_file_contents.get("deployments") 

335 if deployments is None: 

336 parsed_prefect_file_contents["deployments"] = [deployment] 

337 else: 

338 for i, existing_deployment in enumerate(deployments): 

339 if existing_deployment.get("name") == deployment.get("name") and ( 

340 existing_deployment.get("entrypoint") 

341 == deployment.get("entrypoint") 

342 ): 

343 deployments[i] = deployment 

344 break 

345 else: 

346 deployments.append(deployment) 

347 

348 with prefect_file.open(mode="w") as f: 

349 ryaml.dump(parsed_prefect_file_contents, f)