Coverage for /usr/local/lib/python3.12/site-packages/prefect/cli/deploy/_core.py: 12%

185 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import inspect 1a

4import os 1a

5from copy import deepcopy 1a

6from pathlib import Path 1a

7from typing import TYPE_CHECKING, Any, Optional 1a

8 

9from rich.panel import Panel 1a

10 

11import prefect.cli.root as root 1a

12from prefect.cli._prompts import ( 1a

13 confirm, 

14 prompt, 

15 prompt_build_custom_docker_image, 

16 prompt_entrypoint, 

17 prompt_push_custom_docker_image, 

18 prompt_select_work_pool, 

19) 

20from prefect.cli.root import app 1a

21from prefect.client.orchestration import get_client 1a

22from prefect.client.schemas.filters import WorkerFilter 1a

23from prefect.deployments.base import _save_deployment_to_prefect_file 1a

24from prefect.deployments.runner import RunnerDeployment 1a

25from prefect.deployments.steps.core import run_steps 1a

26from prefect.exceptions import ObjectNotFound 1a

27from prefect.flows import load_flow_from_entrypoint 1a

28from prefect.settings import get_current_settings 1a

29from prefect.utilities.callables import parameter_schema 1a

30from prefect.utilities.collections import get_from_dict 1a

31from prefect.utilities.templating import ( 1a

32 apply_values, 

33 resolve_block_document_references, 

34 resolve_variables, 

35) 

36 

37from ._actions import ( 1a

38 _generate_actions_for_remote_flow_storage, 

39 _generate_default_pull_action, 

40) 

41from ._config import ( 1a

42 _apply_cli_options_to_deploy_config, 

43 _handle_deprecated_schedule_fields, 

44 _merge_with_default_deploy_config, 

45) 

46from ._schedules import _construct_schedules 1a

47from ._sla import ( 1a

48 _create_slas, 

49 _gather_deployment_sla_definitions, 

50 _initialize_deployment_slas, 

51) 

52from ._storage import _PullStepStorage 1a

53from ._triggers import ( 1a

54 _create_deployment_triggers, 

55 _gather_deployment_trigger_definitions, 

56 _initialize_deployment_triggers, 

57) 

58 

59if TYPE_CHECKING: 59 ↛ 60line 59 didn't jump to line 60 because the condition on line 59 was never true1a

60 from prefect.client.orchestration import PrefectClient 

61 

62 

63async def _run_single_deploy( 1a

64 deploy_config: dict[str, Any], 

65 actions: dict[str, Any], 

66 options: dict[str, Any] | None = None, 

67 client: Optional["PrefectClient"] = None, 

68 prefect_file: Path = Path("prefect.yaml"), 

69): 

70 client = client or get_client() 

71 deploy_config = deepcopy(deploy_config) if deploy_config else {} 

72 actions = deepcopy(actions) if actions else {} 

73 options = deepcopy(options) if options else {} 

74 

75 deploy_config = _merge_with_default_deploy_config(deploy_config) 

76 deploy_config = _handle_deprecated_schedule_fields(deploy_config) 

77 ( 

78 deploy_config, 

79 variable_overrides, 

80 ) = _apply_cli_options_to_deploy_config(deploy_config, options) 

81 

82 build_steps = deploy_config.get("build", actions.get("build")) or [] 

83 push_steps = deploy_config.get("push", actions.get("push")) or [] 

84 pull_steps = deploy_config.get("pull", actions.get("pull")) or [] 

85 

86 deploy_config = await resolve_block_document_references(deploy_config) 

87 deploy_config = await resolve_variables(deploy_config) 

88 

89 # check for env var placeholders early so users can pass work pool names, etc. 

90 deploy_config = apply_values(deploy_config, os.environ, remove_notset=False) 

91 

92 if not deploy_config.get("entrypoint"): 

93 if not root.is_interactive(): 

94 raise ValueError( 

95 "An entrypoint must be provided:\n\n" 

96 " \t[yellow]prefect deploy path/to/file.py:flow_function\n\n" 

97 "You can also provide an entrypoint in a prefect.yaml file." 

98 ) 

99 deploy_config["entrypoint"] = await prompt_entrypoint(app.console) 

100 

101 flow = load_flow_from_entrypoint(deploy_config["entrypoint"]) 

102 

103 deploy_config["flow_name"] = flow.name 

104 

105 deployment_name = deploy_config.get("name") 

106 if not deployment_name: 

107 if not root.is_interactive(): 

108 raise ValueError("A deployment name must be provided.") 

109 deploy_config["name"] = prompt("Deployment name", default="default") 

110 

111 deploy_config["parameter_openapi_schema"] = parameter_schema(flow) 

112 

113 work_pool_name = get_from_dict(deploy_config, "work_pool.name") 

114 

115 # determine work pool 

116 if work_pool_name: 

117 try: 

118 work_pool = await client.read_work_pool(deploy_config["work_pool"]["name"]) 

119 

120 # dont allow submitting to prefect-agent typed work pools 

121 if work_pool.type == "prefect-agent": 

122 if not root.is_interactive(): 

123 raise ValueError( 

124 "Cannot create a project-style deployment with work pool of" 

125 " type 'prefect-agent'. If you wish to use an agent with" 

126 " your deployment, please use the `prefect deployment" 

127 " build` command." 

128 ) 

129 app.console.print( 

130 "You've chosen a work pool with type 'prefect-agent' which" 

131 " cannot be used for project-style deployments. Let's pick" 

132 " another work pool to deploy to." 

133 ) 

134 deploy_config["work_pool"]["name"] = await prompt_select_work_pool( 

135 app.console 

136 ) 

137 except ObjectNotFound: 

138 raise ValueError( 

139 "This deployment configuration references work pool" 

140 f" {deploy_config['work_pool']['name']!r} which does not exist. This" 

141 " means no worker will be able to pick up its runs. You can create a" 

142 " work pool in the Prefect UI." 

143 ) 

144 else: 

145 if not root.is_interactive(): 

146 raise ValueError( 

147 "A work pool is required to deploy this flow. Please specify a work" 

148 " pool name via the '--pool' flag or in your prefect.yaml file." 

149 ) 

150 if not isinstance(deploy_config.get("work_pool"), dict): 

151 deploy_config["work_pool"] = {} 

152 deploy_config["work_pool"]["name"] = await prompt_select_work_pool( 

153 console=app.console 

154 ) 

155 

156 docker_build_steps = [ 

157 "prefect_docker.deployments.steps.build_docker_image", 

158 ] 

159 

160 docker_push_steps = [ 

161 "prefect_docker.deployments.steps.push_docker_image", 

162 ] 

163 

164 docker_build_step_exists = any( 

165 any(step in action for step in docker_build_steps) 

166 for action in deploy_config.get("build", actions.get("build")) or [] 

167 ) 

168 

169 update_work_pool_image = False 

170 

171 build_step_set_to_null = "build" in deploy_config and ( 

172 deploy_config["build"] is None 

173 or deploy_config["build"] == {} 

174 or deploy_config["build"] == [] 

175 ) 

176 

177 work_pool = await client.read_work_pool(deploy_config["work_pool"]["name"]) 

178 

179 image_properties = ( 

180 work_pool.base_job_template.get("variables", {}) 

181 .get("properties", {}) 

182 .get("image", {}) 

183 ) 

184 image_is_configurable = ( 

185 "image" 

186 in work_pool.base_job_template.get("variables", {}).get("properties", {}) 

187 and image_properties.get("type") == "string" 

188 and not image_properties.get("enum") 

189 ) 

190 

191 if ( 

192 root.is_interactive() 

193 and not docker_build_step_exists 

194 and not build_step_set_to_null 

195 and image_is_configurable 

196 ): 

197 build_docker_image_step = await prompt_build_custom_docker_image( 

198 app.console, deploy_config 

199 ) 

200 if build_docker_image_step is not None: 

201 if not get_from_dict(deploy_config, "work_pool.job_variables.image"): 

202 update_work_pool_image = True 

203 

204 ( 

205 push_docker_image_step, 

206 updated_build_docker_image_step, 

207 ) = await prompt_push_custom_docker_image( 

208 app.console, deploy_config, build_docker_image_step 

209 ) 

210 

211 if actions.get("build"): 

212 actions["build"].append(updated_build_docker_image_step) 

213 else: 

214 actions["build"] = [updated_build_docker_image_step] 

215 

216 if push_docker_image_step is not None: 

217 if actions.get("push"): 

218 actions["push"].append(push_docker_image_step) 

219 else: 

220 actions["push"] = [push_docker_image_step] 

221 

222 build_steps = deploy_config.get("build", actions.get("build")) or [] 

223 push_steps = deploy_config.get("push", actions.get("push")) or [] 

224 

225 docker_push_step_exists = any( 

226 any(step in action for step in docker_push_steps) 

227 for action in deploy_config.get("push", actions.get("push")) or [] 

228 ) 

229 

230 ## CONFIGURE PUSH and/or PULL STEPS FOR REMOTE FLOW STORAGE 

231 if ( 

232 root.is_interactive() 

233 and not (deploy_config.get("pull") or actions.get("pull")) 

234 and not docker_push_step_exists 

235 and confirm( 

236 ( 

237 "Your Prefect workers will need access to this flow's code in order to" 

238 " run it. Would you like your workers to pull your flow code from a" 

239 " remote storage location when running this flow?" 

240 ), 

241 default=True, 

242 console=app.console, 

243 ) 

244 ): 

245 actions = await _generate_actions_for_remote_flow_storage( 

246 console=app.console, deploy_config=deploy_config, actions=actions 

247 ) 

248 

249 # Prefer the originally captured pull_steps (taken before resolution) to 

250 # preserve unresolved block placeholders in the deployment spec. Only fall 

251 # back to the config/actions/default if no pull steps were provided. 

252 pull_steps = ( 

253 pull_steps 

254 or deploy_config.get("pull") 

255 or actions.get("pull") 

256 or await _generate_default_pull_action( 

257 app.console, 

258 deploy_config=deploy_config, 

259 actions=actions, 

260 ) 

261 ) 

262 

263 ## RUN BUILD AND PUSH STEPS 

264 step_outputs: dict[str, Any] = {} 

265 if build_steps: 

266 app.console.print("Running deployment build steps...") 

267 step_outputs.update( 

268 await run_steps(build_steps, step_outputs, print_function=app.console.print) 

269 ) 

270 

271 if push_steps := push_steps or actions.get("push"): 

272 app.console.print("Running deployment push steps...") 

273 step_outputs.update( 

274 await run_steps(push_steps, step_outputs, print_function=app.console.print) 

275 ) 

276 

277 step_outputs.update(variable_overrides) 

278 

279 if update_work_pool_image: 

280 if "build-image" not in step_outputs: 

281 app.console.print( 

282 "Warning: no build-image step found in the deployment build steps." 

283 " The work pool image will not be updated." 

284 ) 

285 deploy_config["work_pool"]["job_variables"]["image"] = "{{ build-image.image }}" 

286 

287 if not deploy_config.get("description"): 

288 deploy_config["description"] = flow.description 

289 

290 deploy_config["schedules"] = _construct_schedules(deploy_config, step_outputs) 

291 

292 # save deploy_config before templating 

293 deploy_config_before_templating = deepcopy(deploy_config) 

294 ## apply templating from build and push steps to the final deployment spec 

295 _parameter_schema = deploy_config.pop("parameter_openapi_schema") 

296 

297 _schedules = deploy_config.pop("schedules") 

298 

299 # Save triggers before templating to preserve event template parameters 

300 _triggers = deploy_config.pop("triggers", None) 

301 

302 deploy_config = apply_values(deploy_config, step_outputs, warn_on_notset=True) 

303 deploy_config["parameter_openapi_schema"] = _parameter_schema 

304 deploy_config["schedules"] = _schedules 

305 

306 # This initialises triggers after templating to ensure that jinja variables are resolved 

307 # Use the pre-templated trigger specs to preserve event template parameters like {{ event.name }} 

308 # while still applying templating to trigger-level fields like enabled 

309 if trigger_specs := _gather_deployment_trigger_definitions( 

310 options.get("triggers"), _triggers 

311 ): 

312 # Apply templating only to non-parameter trigger fields to preserve event templates 

313 templated_trigger_specs = [] 

314 for spec in trigger_specs: 

315 # Save parameters before templating 

316 parameters = spec.pop("parameters", None) 

317 # Apply templating to trigger fields (e.g., enabled) 

318 templated_spec = apply_values(spec, step_outputs, warn_on_notset=False) 

319 # Restore parameters without templating 

320 if parameters is not None: 

321 templated_spec["parameters"] = parameters 

322 templated_trigger_specs.append(templated_spec) 

323 triggers = _initialize_deployment_triggers( 

324 deployment_name, templated_trigger_specs 

325 ) 

326 else: 

327 triggers = [] 

328 

329 if isinstance(deploy_config.get("concurrency_limit"), dict): 

330 deploy_config["concurrency_options"] = { 

331 "collision_strategy": get_from_dict( 

332 deploy_config, "concurrency_limit.collision_strategy" 

333 ) 

334 } 

335 deploy_config["concurrency_limit"] = get_from_dict( 

336 deploy_config, "concurrency_limit.limit" 

337 ) 

338 

339 pull_steps = apply_values(pull_steps, step_outputs, remove_notset=False) 

340 

341 deployment = RunnerDeployment( 

342 name=deploy_config["name"], 

343 flow_name=deploy_config.get("flow_name"), 

344 entrypoint=deploy_config.get("entrypoint"), 

345 work_pool_name=get_from_dict(deploy_config, "work_pool.name"), 

346 work_queue_name=get_from_dict(deploy_config, "work_pool.work_queue_name"), 

347 parameters=deploy_config.get("parameters"), 

348 description=deploy_config.get("description"), 

349 version=deploy_config.get("version") or options.get("version"), 

350 version_type=deploy_config.get("version_type") or options.get("version_type"), 

351 tags=deploy_config.get("tags"), 

352 concurrency_limit=deploy_config.get("concurrency_limit"), 

353 concurrency_options=deploy_config.get("concurrency_options"), 

354 paused=deploy_config.get("paused"), 

355 storage=_PullStepStorage(pull_steps), 

356 job_variables=get_from_dict(deploy_config, "work_pool.job_variables"), 

357 ) 

358 

359 deployment._set_defaults_from_flow(flow) 

360 

361 deployment._parameter_openapi_schema = deploy_config["parameter_openapi_schema"] 

362 

363 if deploy_config.get("enforce_parameter_schema") is not None: 

364 deployment.enforce_parameter_schema = deploy_config.get( 

365 "enforce_parameter_schema" 

366 ) 

367 

368 apply_coro = deployment.apply(schedules=deploy_config.get("schedules")) 

369 if TYPE_CHECKING: 

370 assert inspect.isawaitable(apply_coro) 

371 

372 deployment_id = await apply_coro 

373 

374 await _create_deployment_triggers(client, deployment_id, triggers) 

375 

376 # # We want to ensure that if a user passes an empty list of SLAs, we call the 

377 # # apply endpoint to remove existing SLAs for the deployment. 

378 # # If the argument is not provided, we will not call the endpoint. 

379 # Import SLA helpers from the package namespace to honor test monkeypatches 

380 sla_specs = _gather_deployment_sla_definitions( 

381 options.get("sla"), deploy_config.get("sla") 

382 ) 

383 if sla_specs is not None: 

384 slas = _initialize_deployment_slas(deployment_id, sla_specs) 

385 await _create_slas(client, deployment_id, slas) 

386 

387 app.console.print( 

388 Panel( 

389 f"Deployment '{deploy_config['flow_name']}/{deploy_config['name']}'" 

390 f" successfully created with id '{deployment_id}'." 

391 ), 

392 style="green", 

393 ) 

394 

395 if ui_url := get_current_settings().ui_url: 

396 message = ( 

397 "\nView Deployment in UI:" 

398 f" {ui_url}/deployments/deployment/{deployment_id}\n" 

399 ) 

400 app.console.print(message, soft_wrap=True) 

401 

402 if root.is_interactive() and not prefect_file.exists(): 

403 if confirm( 

404 ( 

405 "Would you like to save configuration for this deployment for faster" 

406 " deployments in the future?" 

407 ), 

408 console=app.console, 

409 ): 

410 deploy_config_before_templating.update({"schedules": _schedules}) 

411 _save_deployment_to_prefect_file( 

412 deploy_config_before_templating, 

413 build_steps=build_steps or None, 

414 push_steps=push_steps or None, 

415 pull_steps=pull_steps or None, 

416 triggers=trigger_specs or None, 

417 sla=sla_specs or None, 

418 prefect_file=prefect_file, 

419 ) 

420 app.console.print( 

421 ( 

422 f"\n[green]Deployment configuration saved to {prefect_file}![/]" 

423 " You can now deploy using this deployment configuration" 

424 " with:\n\n\t[blue]$ prefect deploy -n" 

425 f" {deploy_config['name']}[/]\n\nYou can also make changes to" 

426 " this deployment configuration by making changes to the" 

427 " YAML file." 

428 ), 

429 ) 

430 active_workers = [] 

431 if work_pool_name: 

432 active_workers = await client.read_workers_for_work_pool( 

433 work_pool_name, worker_filter=WorkerFilter(status={"any_": ["ONLINE"]}) 

434 ) 

435 

436 if ( 

437 not work_pool.is_push_pool 

438 and not work_pool.is_managed_pool 

439 and not active_workers 

440 ): 

441 app.console.print( 

442 "\nTo execute flow runs from these deployments, start a worker in a" 

443 " separate terminal that pulls work from the" 

444 f" {work_pool_name!r} work pool:" 

445 ) 

446 app.console.print( 

447 f"\n\t$ prefect worker start --pool {work_pool_name!r}", 

448 style="blue", 

449 ) 

450 app.console.print( 

451 "\nTo schedule a run for this deployment, use the following command:" 

452 ) 

453 app.console.print( 

454 ( 

455 "\n\t$ prefect deployment run" 

456 f" '{deploy_config['flow_name']}/{deploy_config['name']}'\n" 

457 ), 

458 style="blue", 

459 ) 

460 

461 

462async def _run_multi_deploy( 1a

463 deploy_configs: list[dict[str, Any]], 

464 actions: dict[str, Any], 

465 names: Optional[list[str]] = None, 

466 deploy_all: bool = False, 

467 prefect_file: Path = Path("prefect.yaml"), 

468): 

469 deploy_configs = deepcopy(deploy_configs) if deploy_configs else [] 

470 actions = deepcopy(actions) if actions else {} 

471 names = names or [] 

472 

473 if deploy_all: 

474 app.console.print( 

475 "Deploying all flows with an existing deployment configuration..." 

476 ) 

477 else: 

478 app.console.print("Deploying flows with selected deployment configurations...") 

479 for deploy_config in deploy_configs: 

480 if deploy_config.get("name") is None: 

481 if not root.is_interactive(): 

482 app.console.print( 

483 "Discovered unnamed deployment. Skipping...", style="yellow" 

484 ) 

485 continue 

486 app.console.print("Discovered unnamed deployment.", style="yellow") 

487 app.console.print_json(data=deploy_config) 

488 if confirm( 

489 "Would you like to give this deployment a name and deploy it?", 

490 default=True, 

491 console=app.console, 

492 ): 

493 deploy_config["name"] = prompt("Deployment name", default="default") 

494 else: 

495 app.console.print("Skipping unnamed deployment.", style="yellow") 

496 continue 

497 app.console.print(Panel(f"Deploying {deploy_config['name']}", style="blue")) 

498 await _run_single_deploy(deploy_config, actions, prefect_file=prefect_file)