Coverage for /usr/local/lib/python3.12/site-packages/prefect/deployments/runner.py: 20%

380 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Objects for creating and configuring deployments for flows using `serve` functionality. 

3 

4Example: 

5 ```python 

6 import time 

7 from prefect import flow, serve 

8 

9 

10 @flow 

11 def slow_flow(sleep: int = 60): 

12 "Sleepy flow - sleeps the provided amount of time (in seconds)." 

13 time.sleep(sleep) 

14 

15 

16 @flow 

17 def fast_flow(): 

18 "Fastest flow this side of the Mississippi." 

19 return 

20 

21 

22 if __name__ == "__main__": 

23 # to_deployment creates RunnerDeployment instances 

24 slow_deploy = slow_flow.to_deployment(name="sleeper", interval=45) 

25 fast_deploy = fast_flow.to_deployment(name="fast") 

26 

27 serve(slow_deploy, fast_deploy) 

28 ``` 

29 

30""" 

31 

32from __future__ import annotations 1a

33 

34import importlib 1a

35import tempfile 1a

36from datetime import datetime, timedelta 1a

37from pathlib import Path 1a

38from typing import ( 1a

39 TYPE_CHECKING, 

40 Any, 

41 ClassVar, 

42 Iterable, 

43 List, 

44 Optional, 

45 Union, 

46) 

47from uuid import UUID 1a

48 

49from pydantic import ( 1a

50 BaseModel, 

51 ConfigDict, 

52 Field, 

53 PrivateAttr, 

54 field_validator, 

55 model_validator, 

56) 

57from rich.console import Console 1a

58from rich.progress import Progress, SpinnerColumn, TextColumn, track 1a

59from rich.table import Table 1a

60from typing_extensions import Self 1a

61 

62from prefect._experimental.sla.objects import SlaTypes 1a

63from prefect._internal.compatibility.async_dispatch import async_dispatch 1a

64from prefect._internal.concurrency.api import create_call, from_async 1a

65from prefect._internal.schemas.validators import ( 1a

66 reconcile_paused_deployment, 

67 reconcile_schedules_runner, 

68) 

69from prefect._versioning import VersionType, get_inferred_version_info 1a

70from prefect.client.base import ServerType 1a

71from prefect.client.orchestration import PrefectClient, get_client 1a

72from prefect.client.schemas.actions import ( 1a

73 DeploymentScheduleCreate, 

74 DeploymentScheduleUpdate, 

75 DeploymentUpdate, 

76) 

77from prefect.client.schemas.filters import WorkerFilter, WorkerFilterStatus 1a

78from prefect.client.schemas.objects import ( 1a

79 ConcurrencyLimitConfig, 

80 ConcurrencyOptions, 

81 VersionInfo, 

82) 

83from prefect.client.schemas.schedules import ( 1a

84 SCHEDULE_TYPES, 

85 construct_schedule, 

86) 

87from prefect.deployments.schedules import ( 1a

88 create_deployment_schedule_create, 

89) 

90from prefect.docker.docker_image import DockerImage 1a

91from prefect.events import DeploymentTriggerTypes, TriggerTypes 1a

92from prefect.exceptions import ( 1a

93 ObjectNotFound, 

94 PrefectHTTPStatusError, 

95) 

96from prefect.runner.storage import RunnerStorage 1a

97from prefect.schedules import Schedule 1a

98from prefect.settings import ( 1a

99 PREFECT_DEFAULT_WORK_POOL_NAME, 

100 PREFECT_UI_URL, 

101) 

102from prefect.types import ListOfNonEmptyStrings 1a

103from prefect.types.entrypoint import EntrypointType 1a

104from prefect.utilities.annotations import freeze 1a

105from prefect.utilities.asyncutils import run_coro_as_sync, sync_compatible 1a

106from prefect.utilities.callables import ParameterSchema, parameter_schema 1a

107from prefect.utilities.collections import get_from_dict, isiterable 1a

108from prefect.utilities.dockerutils import ( 1a

109 parse_image_tag, 

110) 

111 

112if TYPE_CHECKING: 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true1a

113 from prefect.client.types.flexible_schedule_list import FlexibleScheduleList 

114 from prefect.flows import Flow 

115 

116__all__ = ["RunnerDeployment"] 1a

117 

118 

119class DeploymentApplyError(RuntimeError): 1a

120 """ 

121 Raised when an error occurs while applying a deployment. 

122 """ 

123 

124 

125class RunnerDeployment(BaseModel): 1a

126 """ 

127 A Prefect RunnerDeployment definition, used for specifying and building deployments. 

128 

129 Attributes: 

130 name: A name for the deployment (required). 

131 version: An optional version for the deployment; defaults to the flow's version 

132 description: An optional description of the deployment; defaults to the flow's 

133 description 

134 tags: An optional list of tags to associate with this deployment; note that tags 

135 are used only for organizational purposes. For delegating work to workers, 

136 see `work_queue_name`. 

137 schedule: A schedule to run this deployment on, once registered 

138 parameters: A dictionary of parameter values to pass to runs created from this 

139 deployment 

140 path: The path to the working directory for the workflow, relative to remote 

141 storage or, if stored on a local filesystem, an absolute path 

142 entrypoint: The path to the entrypoint for the workflow, always relative to the 

143 `path` 

144 parameter_openapi_schema: The parameter schema of the flow, including defaults. 

145 enforce_parameter_schema: Whether or not the Prefect API should enforce the 

146 parameter schema for this deployment. 

147 work_pool_name: The name of the work pool to use for this deployment. 

148 work_queue_name: The name of the work queue to use for this deployment's scheduled runs. 

149 If not provided the default work queue for the work pool will be used. 

150 job_variables: Settings used to override the values specified default base job template 

151 of the chosen work pool. Refer to the base job template of the chosen work pool for 

152 available settings. 

153 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. 

154 """ 

155 

156 model_config: ClassVar[ConfigDict] = ConfigDict( 1a

157 arbitrary_types_allowed=True, validate_assignment=True 

158 ) 

159 

160 name: str = Field(..., description="The name of the deployment.") 1a

161 flow_name: Optional[str] = Field( 1a

162 None, description="The name of the underlying flow; typically inferred." 

163 ) 

164 description: Optional[str] = Field( 1a

165 default=None, description="An optional description of the deployment." 

166 ) 

167 version: Optional[str] = Field( 1a

168 default=None, description="An optional version for the deployment." 

169 ) 

170 version_type: Optional[VersionType] = Field( 1a

171 default=None, 

172 description=( 

173 "The type of version information to use for the deployment. The version type" 

174 " will be inferred if not provided." 

175 ), 

176 ) 

177 tags: ListOfNonEmptyStrings = Field( 1a

178 default_factory=list, 

179 description="One of more tags to apply to this deployment.", 

180 ) 

181 schedules: Optional[ 1a

182 List[Union[DeploymentScheduleCreate, DeploymentScheduleUpdate]] 

183 ] = Field( 

184 default=None, 

185 description="The schedules that should cause this deployment to run.", 

186 ) 

187 concurrency_limit: Optional[int] = Field( 1a

188 default=None, 

189 description="The maximum number of concurrent runs of this deployment.", 

190 ) 

191 concurrency_options: Optional[ConcurrencyOptions] = Field( 1a

192 default=None, 

193 description="The concurrency limit config for the deployment.", 

194 ) 

195 paused: Optional[bool] = Field( 1a

196 default=None, description="Whether or not the deployment is paused." 

197 ) 

198 parameters: dict[str, Any] = Field(default_factory=dict) 1a

199 entrypoint: Optional[str] = Field( 1a

200 default=None, 

201 description=( 

202 "The path to the entrypoint for the workflow, relative to the `path`." 

203 ), 

204 ) 

205 triggers: List[Union[DeploymentTriggerTypes, TriggerTypes]] = Field( 1a

206 default_factory=list, 

207 description="The triggers that should cause this deployment to run.", 

208 ) 

209 enforce_parameter_schema: bool = Field( 1a

210 default=True, 

211 description=( 

212 "Whether or not the Prefect API should enforce the parameter schema for" 

213 " this deployment." 

214 ), 

215 ) 

216 storage: Optional[RunnerStorage] = Field( 1a

217 default=None, 

218 description=( 

219 "The storage object used to retrieve flow code for this deployment." 

220 ), 

221 ) 

222 work_pool_name: Optional[str] = Field( 1a

223 default=None, 

224 description=( 

225 "The name of the work pool to use for this deployment. Only used when" 

226 " the deployment is registered with a built runner." 

227 ), 

228 ) 

229 work_queue_name: Optional[str] = Field( 1a

230 default=None, 

231 description=( 

232 "The name of the work queue to use for this deployment. Only used when" 

233 " the deployment is registered with a built runner." 

234 ), 

235 ) 

236 job_variables: dict[str, Any] = Field( 1a

237 default_factory=dict, 

238 description=( 

239 "Job variables used to override the default values of a work pool" 

240 " base job template. Only used when the deployment is registered with" 

241 " a built runner." 

242 ), 

243 ) 

244 

245 # (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. 

246 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = PrivateAttr( 1a

247 default=None, 

248 ) 

249 _entrypoint_type: EntrypointType = PrivateAttr( 1a

250 default=EntrypointType.FILE_PATH, 

251 ) 

252 _path: Optional[str] = PrivateAttr( 1a

253 default=None, 

254 ) 

255 _parameter_openapi_schema: ParameterSchema = PrivateAttr( 1a

256 default_factory=ParameterSchema, 

257 ) 

258 _version_from_flow: bool = PrivateAttr( 1a

259 default=False, 

260 ) 

261 

262 @property 1a

263 def entrypoint_type(self) -> EntrypointType: 1a

264 return self._entrypoint_type 

265 

266 @property 1a

267 def full_name(self) -> str: 1a

268 return f"{self.flow_name}/{self.name}" 

269 

270 def _get_deployment_version_info( 1a

271 self, version_type: Optional[VersionType] = None 

272 ) -> VersionInfo: 

273 if inferred_version := run_coro_as_sync( 

274 get_inferred_version_info(version_type) 

275 ): 

276 if not self.version or self._version_from_flow: 

277 self.version = inferred_version.version # TODO: maybe reconsider 

278 

279 inferred_version.version = self.version 

280 return inferred_version 

281 

282 return VersionInfo(version=self.version or "", type="prefect:simple") 

283 

284 @field_validator("name", mode="before") 1a

285 @classmethod 1a

286 def validate_name(cls, value: str) -> str: 1a

287 if value.endswith(".py"): 

288 return Path(value).stem 

289 return value 

290 

291 @model_validator(mode="after") 1a

292 def validate_automation_names(self): 1a

293 """Ensure that each trigger has a name for its automation if none is provided.""" 

294 trigger: Union[DeploymentTriggerTypes, TriggerTypes] 

295 for i, trigger in enumerate(self.triggers, start=1): 

296 if trigger.name is None: 

297 trigger.name = f"{self.name}__automation_{i}" 

298 return self 

299 

300 @model_validator(mode="after") 1a

301 def validate_deployment_parameters(self) -> Self: 1a

302 """Update the parameter schema to mark frozen parameters as readonly.""" 

303 if not self.parameters: 

304 return self 

305 

306 for key, value in self.parameters.items(): 

307 if isinstance(value, freeze): 

308 raw_value = value.unfreeze() 

309 if key in self._parameter_openapi_schema.properties: 

310 self._parameter_openapi_schema.properties[key]["readOnly"] = True 

311 self._parameter_openapi_schema.properties[key]["enum"] = [raw_value] 

312 self.parameters[key] = raw_value 

313 

314 return self 

315 

316 @model_validator(mode="before") 1a

317 @classmethod 1a

318 def reconcile_paused(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

319 return reconcile_paused_deployment(values) 

320 

321 @model_validator(mode="before") 1a

322 @classmethod 1a

323 def reconcile_schedules(cls, values: dict[str, Any]) -> dict[str, Any]: 1a

324 return reconcile_schedules_runner(values) 

325 

326 async def _create( 1a

327 self, 

328 work_pool_name: Optional[str] = None, 

329 image: Optional[str] = None, 

330 version_info: VersionInfo | None = None, 

331 ) -> UUID: 

332 work_pool_name = work_pool_name or self.work_pool_name 

333 

334 if image and not work_pool_name: 

335 raise ValueError( 

336 "An image can only be provided when registering a deployment with a" 

337 " work pool." 

338 ) 

339 

340 if self.work_queue_name and not work_pool_name: 

341 raise ValueError( 

342 "A work queue can only be provided when registering a deployment with" 

343 " a work pool." 

344 ) 

345 

346 if self.job_variables and not work_pool_name: 

347 raise ValueError( 

348 "Job variables can only be provided when registering a deployment" 

349 " with a work pool." 

350 ) 

351 

352 async with get_client() as client: 

353 flow_id = await client.create_flow_from_name(self.flow_name) 

354 

355 create_payload: dict[str, Any] = dict( 

356 flow_id=flow_id, 

357 name=self.name, 

358 work_queue_name=self.work_queue_name, 

359 work_pool_name=work_pool_name, 

360 version=self.version, 

361 version_info=version_info, 

362 paused=self.paused, 

363 schedules=self.schedules, 

364 concurrency_limit=self.concurrency_limit, 

365 concurrency_options=self.concurrency_options, 

366 parameters=self.parameters, 

367 description=self.description, 

368 tags=self.tags, 

369 path=self._path, 

370 entrypoint=self.entrypoint, 

371 storage_document_id=None, 

372 infrastructure_document_id=None, 

373 parameter_openapi_schema=self._parameter_openapi_schema.model_dump( 

374 exclude_unset=True 

375 ), 

376 enforce_parameter_schema=self.enforce_parameter_schema, 

377 ) 

378 

379 if work_pool_name: 

380 create_payload["job_variables"] = self.job_variables 

381 if image: 

382 create_payload["job_variables"]["image"] = image 

383 create_payload["path"] = None if self.storage else self._path 

384 if self.storage: 

385 pull_steps = self.storage.to_pull_step() 

386 if isinstance(pull_steps, list): 

387 create_payload["pull_steps"] = pull_steps 

388 elif pull_steps: 

389 create_payload["pull_steps"] = [pull_steps] 

390 else: 

391 create_payload["pull_steps"] = [] 

392 else: 

393 create_payload["pull_steps"] = [] 

394 

395 try: 

396 deployment_id = await client.create_deployment(**create_payload) 

397 except Exception as exc: 

398 if isinstance(exc, PrefectHTTPStatusError): 

399 detail = exc.response.json().get("detail") 

400 if detail: 

401 raise DeploymentApplyError(detail) from exc 

402 raise DeploymentApplyError( 

403 f"Error while applying deployment: {str(exc)}" 

404 ) from exc 

405 

406 await self._create_triggers(deployment_id, client) 

407 

408 # We plan to support SLA configuration on the Prefect Server in the future. 

409 # For now, we only support it on Prefect Cloud. 

410 

411 # If we're provided with an empty list, we will call the apply endpoint 

412 # to remove existing SLAs for the deployment. If the argument is not provided, 

413 # we will not call the endpoint. 

414 if self._sla or self._sla == []: 

415 await self._create_slas(deployment_id, client) 

416 

417 return deployment_id 

418 

419 async def _update( 1a

420 self, 

421 deployment_id: UUID, 

422 client: PrefectClient, 

423 version_info: VersionInfo | None, 

424 ): 

425 parameter_openapi_schema = self._parameter_openapi_schema.model_dump( 

426 exclude_unset=True 

427 ) 

428 

429 update_payload = self.model_dump( 

430 mode="json", 

431 exclude_unset=True, 

432 exclude={"storage", "name", "flow_name", "triggers", "version_type"}, 

433 ) 

434 

435 if self.storage: 

436 pull_steps = self.storage.to_pull_step() 

437 if pull_steps and not isinstance(pull_steps, list): 

438 pull_steps = [pull_steps] 

439 update_payload["pull_steps"] = pull_steps 

440 else: 

441 update_payload["pull_steps"] = None 

442 

443 if self.schedules: 

444 update_payload["schedules"] = [ 

445 schedule.model_dump(mode="json", exclude_unset=True) 

446 for schedule in self.schedules 

447 ] 

448 

449 await client.update_deployment( 

450 deployment_id, 

451 deployment=DeploymentUpdate( 

452 **update_payload, 

453 version_info=version_info, 

454 parameter_openapi_schema=parameter_openapi_schema, 

455 ), 

456 ) 

457 

458 await self._create_triggers(deployment_id, client) 

459 

460 # We plan to support SLA configuration on the Prefect Server in the future. 

461 # For now, we only support it on Prefect Cloud. 

462 

463 # If we're provided with an empty list, we will call the apply endpoint 

464 # to remove existing SLAs for the deployment. If the argument is not provided, 

465 # we will not call the endpoint. 

466 if self._sla or self._sla == []: 

467 await self._create_slas(deployment_id, client) 

468 

469 return deployment_id 

470 

471 async def _create_triggers(self, deployment_id: UUID, client: PrefectClient): 1a

472 try: 

473 # The triggers defined in the deployment spec are, essentially, 

474 # anonymous and attempting truly sync them with cloud is not 

475 # feasible. Instead, we remove all automations that are owned 

476 # by the deployment, meaning that they were created via this 

477 # mechanism below, and then recreate them. 

478 await client.delete_resource_owned_automations( 

479 f"prefect.deployment.{deployment_id}" 

480 ) 

481 except PrefectHTTPStatusError as e: 

482 if e.response.status_code == 404: 

483 # This Prefect server does not support automations, so we can safely 

484 # ignore this 404 and move on. 

485 return deployment_id 

486 raise e 

487 

488 for trigger in self.triggers: 

489 trigger.set_deployment_id(deployment_id) 

490 await client.create_automation(trigger.as_automation()) 

491 

492 @sync_compatible 1a

493 async def apply( 1a

494 self, 

495 schedules: Optional[List[dict[str, Any]]] = None, 

496 work_pool_name: Optional[str] = None, 

497 image: Optional[str] = None, 

498 version_info: Optional[VersionInfo] = None, 

499 ) -> UUID: 

500 """ 

501 Registers this deployment with the API and returns the deployment's ID. 

502 

503 Args: 

504 work_pool_name: The name of the work pool to use for this 

505 deployment. 

506 image: The registry, name, and tag of the Docker image to 

507 use for this deployment. Only used when the deployment is 

508 deployed to a work pool. 

509 version_info: The version information to use for the deployment. 

510 Returns: 

511 The ID of the created deployment. 

512 """ 

513 

514 version_info = version_info or self._get_deployment_version_info( 

515 self.version_type 

516 ) 

517 

518 async with get_client() as client: 

519 try: 

520 deployment = await client.read_deployment_by_name(self.full_name) 

521 except ObjectNotFound: 

522 if schedules: 

523 self.schedules = [ 

524 DeploymentScheduleCreate(**schedule) for schedule in schedules 

525 ] 

526 return await self._create(work_pool_name, image, version_info) 

527 else: 

528 if image: 

529 self.job_variables["image"] = image 

530 if work_pool_name: 

531 self.work_pool_name = work_pool_name 

532 if schedules: 

533 self.schedules = [ 

534 DeploymentScheduleUpdate(**schedule) for schedule in schedules 

535 ] 

536 return await self._update(deployment.id, client, version_info) 

537 

538 async def _create_slas(self, deployment_id: UUID, client: PrefectClient): 1a

539 if not isinstance(self._sla, list): 

540 self._sla = [self._sla] 

541 

542 if client.server_type == ServerType.CLOUD: 

543 await client.apply_slas_for_deployment(deployment_id, self._sla) 

544 else: 

545 raise ValueError( 

546 "SLA configuration is currently only supported on Prefect Cloud." 

547 ) 

548 

549 @staticmethod 1a

550 def _construct_deployment_schedules( 1a

551 interval: Optional[ 

552 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta] 

553 ] = None, 

554 anchor_date: Optional[Union[datetime, str]] = None, 

555 cron: Optional[Union[Iterable[str], str]] = None, 

556 rrule: Optional[Union[Iterable[str], str]] = None, 

557 timezone: Optional[str] = None, 

558 schedule: Union[SCHEDULE_TYPES, Schedule, None] = None, 

559 schedules: Optional["FlexibleScheduleList"] = None, 

560 ) -> Union[List[DeploymentScheduleCreate], "FlexibleScheduleList"]: 

561 """ 

562 Construct a schedule or schedules from the provided arguments. 

563 

564 This method serves as a unified interface for creating deployment 

565 schedules. If `schedules` is provided, it is directly returned. If 

566 `schedule` is provided, it is encapsulated in a list and returned. If 

567 `interval`, `cron`, or `rrule` are provided, they are used to construct 

568 schedule objects. 

569 

570 Args: 

571 interval: An interval on which to schedule runs, either as a single 

572 value or as a list of values. Accepts numbers (interpreted as 

573 seconds) or `timedelta` objects. Each value defines a separate 

574 scheduling interval. 

575 anchor_date: The anchor date from which interval schedules should 

576 start. This applies to all intervals if a list is provided. 

577 cron: A cron expression or a list of cron expressions defining cron 

578 schedules. Each expression defines a separate cron schedule. 

579 rrule: An rrule string or a list of rrule strings for scheduling. 

580 Each string defines a separate recurrence rule. 

581 timezone: The timezone to apply to the cron or rrule schedules. 

582 This is a single value applied uniformly to all schedules. 

583 schedule: A singular schedule object, used for advanced scheduling 

584 options like specifying a timezone. This is returned as a list 

585 containing this single schedule. 

586 schedules: A pre-defined list of schedule objects. If provided, 

587 this list is returned as-is, bypassing other schedule construction 

588 logic. 

589 """ 

590 num_schedules = sum( 

591 1 

592 for entry in (interval, cron, rrule, schedule, schedules) 

593 if entry is not None 

594 ) 

595 if num_schedules > 1: 

596 raise ValueError( 

597 "Only one of interval, cron, rrule, schedule, or schedules can be provided." 

598 ) 

599 elif num_schedules == 0: 

600 return [] 

601 

602 if schedules is not None: 

603 return schedules 

604 elif interval or cron or rrule: 

605 # `interval`, `cron`, and `rrule` can be lists of values. This 

606 # block figures out which one is not None and uses that to 

607 # construct the list of schedules via `construct_schedule`. 

608 parameters = [("interval", interval), ("cron", cron), ("rrule", rrule)] 

609 schedule_type, value = [ 

610 param for param in parameters if param[1] is not None 

611 ][0] 

612 

613 if not isiterable(value): 

614 value = [value] 

615 

616 return [ 

617 create_deployment_schedule_create( 

618 construct_schedule( 

619 **{ 

620 schedule_type: v, 

621 "timezone": timezone, 

622 "anchor_date": anchor_date, 

623 } 

624 ) 

625 ) 

626 for v in value 

627 ] 

628 else: 

629 return [create_deployment_schedule_create(schedule)] 

630 

631 def _set_defaults_from_flow(self, flow: "Flow[..., Any]"): 1a

632 self._parameter_openapi_schema = parameter_schema(flow) 

633 

634 if not self.version: 

635 self.version = flow.version 

636 self._version_from_flow = True 

637 if not self.description: 

638 self.description = flow.description 

639 

640 @classmethod 1a

641 def from_flow( 1a

642 cls, 

643 flow: "Flow[..., Any]", 

644 name: str, 

645 interval: Optional[ 

646 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta] 

647 ] = None, 

648 cron: Optional[Union[Iterable[str], str]] = None, 

649 rrule: Optional[Union[Iterable[str], str]] = None, 

650 paused: Optional[bool] = None, 

651 schedule: Optional[Schedule] = None, 

652 schedules: Optional["FlexibleScheduleList"] = None, 

653 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, 

654 parameters: Optional[dict[str, Any]] = None, 

655 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, 

656 description: Optional[str] = None, 

657 tags: Optional[List[str]] = None, 

658 version: Optional[str] = None, 

659 version_type: Optional[VersionType] = None, 

660 enforce_parameter_schema: bool = True, 

661 work_pool_name: Optional[str] = None, 

662 work_queue_name: Optional[str] = None, 

663 job_variables: Optional[dict[str, Any]] = None, 

664 entrypoint_type: EntrypointType = EntrypointType.FILE_PATH, 

665 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental 

666 ) -> "RunnerDeployment": 

667 """ 

668 Configure a deployment for a given flow. 

669 

670 Args: 

671 flow: A flow function to deploy 

672 name: A name for the deployment 

673 interval: An interval on which to execute the current flow. Accepts either a number 

674 or a timedelta object. If a number is given, it will be interpreted as seconds. 

675 cron: A cron schedule of when to execute runs of this flow. 

676 rrule: An rrule schedule of when to execute runs of this flow. 

677 paused: Whether or not to set this deployment as paused. 

678 schedule: A schedule object defining when to execute runs of this deployment. 

679 Used to provide additional scheduling options like `timezone` or `parameters`. 

680 schedules: A list of schedule objects defining when to execute runs of this deployment. 

681 Used to define multiple schedules or additional scheduling options like `timezone`. 

682 concurrency_limit: The maximum number of concurrent runs this deployment will allow. 

683 triggers: A list of triggers that should kick of a run of this flow. 

684 parameters: A dictionary of default parameter values to pass to runs of this flow. 

685 description: A description for the created deployment. Defaults to the flow's 

686 description if not provided. 

687 tags: A list of tags to associate with the created deployment for organizational 

688 purposes. 

689 version: A version for the created deployment. Defaults to the flow's version. 

690 version_type: The type of version information to use for the deployment. 

691 enforce_parameter_schema: Whether or not the Prefect API should enforce the 

692 parameter schema for this deployment. 

693 work_pool_name: The name of the work pool to use for this deployment. 

694 work_queue_name: The name of the work queue to use for this deployment's scheduled runs. 

695 If not provided the default work queue for the work pool will be used. 

696 job_variables: Settings used to override the values specified default base job template 

697 of the chosen work pool. Refer to the base job template of the chosen work pool for 

698 available settings. 

699 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. 

700 """ 

701 constructed_schedules = cls._construct_deployment_schedules( 

702 interval=interval, 

703 cron=cron, 

704 rrule=rrule, 

705 schedules=schedules, 

706 schedule=schedule, 

707 ) 

708 

709 job_variables = job_variables or {} 

710 

711 if isinstance(concurrency_limit, ConcurrencyLimitConfig): 

712 concurrency_options = { 

713 "collision_strategy": concurrency_limit.collision_strategy 

714 } 

715 concurrency_limit = concurrency_limit.limit 

716 else: 

717 concurrency_options = None 

718 

719 deployment = cls( 

720 name=name, 

721 flow_name=flow.name, 

722 schedules=constructed_schedules, 

723 concurrency_limit=concurrency_limit, 

724 concurrency_options=concurrency_options, 

725 paused=paused, 

726 tags=tags or [], 

727 triggers=triggers or [], 

728 parameters=parameters or {}, 

729 description=description, 

730 version=version, 

731 version_type=version_type, 

732 enforce_parameter_schema=enforce_parameter_schema, 

733 work_pool_name=work_pool_name, 

734 work_queue_name=work_queue_name, 

735 job_variables=job_variables, 

736 ) 

737 deployment._sla = _sla 

738 

739 if not deployment.entrypoint: 

740 no_file_location_error = ( 

741 "Flows defined interactively cannot be deployed. Check out the" 

742 " quickstart guide for help getting started:" 

743 " https://docs.prefect.io/latest/get-started/quickstart" 

744 ) 

745 ## first see if an entrypoint can be determined 

746 flow_file = getattr(flow, "__globals__", {}).get("__file__") 

747 mod_name = getattr(flow, "__module__", None) 

748 if entrypoint_type == EntrypointType.MODULE_PATH: 

749 if mod_name: 

750 deployment.entrypoint = f"{mod_name}.{flow.__name__}" 

751 else: 

752 raise ValueError( 

753 "Unable to determine module path for provided flow." 

754 ) 

755 else: 

756 if not flow_file: 

757 if not mod_name: 

758 raise ValueError(no_file_location_error) 

759 try: 

760 module = importlib.import_module(mod_name) 

761 flow_file = getattr(module, "__file__", None) 

762 except ModuleNotFoundError: 

763 raise ValueError(no_file_location_error) 

764 if not flow_file: 

765 raise ValueError(no_file_location_error) 

766 

767 # set entrypoint 

768 entry_path = ( 

769 Path(flow_file).absolute().relative_to(Path.cwd().absolute()) 

770 ) 

771 deployment.entrypoint = ( 

772 f"{entry_path}:{getattr(flow.fn, '__qualname__', flow.fn.__name__)}" 

773 ) 

774 

775 if entrypoint_type == EntrypointType.FILE_PATH and not deployment._path: 

776 deployment._path = "." 

777 

778 deployment._entrypoint_type = entrypoint_type 

779 

780 cls._set_defaults_from_flow(deployment, flow) 

781 

782 return deployment 

783 

784 @classmethod 1a

785 def from_entrypoint( 1a

786 cls, 

787 entrypoint: str, 

788 name: str, 

789 flow_name: Optional[str] = None, 

790 interval: Optional[ 

791 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta] 

792 ] = None, 

793 cron: Optional[Union[Iterable[str], str]] = None, 

794 rrule: Optional[Union[Iterable[str], str]] = None, 

795 paused: Optional[bool] = None, 

796 schedule: Optional[Schedule] = None, 

797 schedules: Optional["FlexibleScheduleList"] = None, 

798 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, 

799 parameters: Optional[dict[str, Any]] = None, 

800 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, 

801 description: Optional[str] = None, 

802 tags: Optional[List[str]] = None, 

803 version: Optional[str] = None, 

804 enforce_parameter_schema: bool = True, 

805 work_pool_name: Optional[str] = None, 

806 work_queue_name: Optional[str] = None, 

807 job_variables: Optional[dict[str, Any]] = None, 

808 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental 

809 ) -> "RunnerDeployment": 

810 """ 

811 Configure a deployment for a given flow located at a given entrypoint. 

812 

813 Args: 

814 entrypoint: The path to a file containing a flow and the name of the flow function in 

815 the format `./path/to/file.py:flow_func_name`. 

816 name: A name for the deployment 

817 flow_name: The name of the flow to deploy 

818 interval: An interval on which to execute the current flow. Accepts either a number 

819 or a timedelta object. If a number is given, it will be interpreted as seconds. 

820 cron: A cron schedule of when to execute runs of this flow. 

821 rrule: An rrule schedule of when to execute runs of this flow. 

822 paused: Whether or not to set this deployment as paused. 

823 schedules: A list of schedule objects defining when to execute runs of this deployment. 

824 Used to define multiple schedules or additional scheduling options like `timezone`. 

825 triggers: A list of triggers that should kick of a run of this flow. 

826 parameters: A dictionary of default parameter values to pass to runs of this flow. 

827 description: A description for the created deployment. Defaults to the flow's 

828 description if not provided. 

829 tags: A list of tags to associate with the created deployment for organizational 

830 purposes. 

831 version: A version for the created deployment. Defaults to the flow's version. 

832 enforce_parameter_schema: Whether or not the Prefect API should enforce the 

833 parameter schema for this deployment. 

834 work_pool_name: The name of the work pool to use for this deployment. 

835 work_queue_name: The name of the work queue to use for this deployment's scheduled runs. 

836 If not provided the default work queue for the work pool will be used. 

837 job_variables: Settings used to override the values specified default base job template 

838 of the chosen work pool. Refer to the base job template of the chosen work pool for 

839 available settings. 

840 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. 

841 """ 

842 from prefect.flows import load_flow_from_entrypoint 

843 

844 job_variables = job_variables or {} 

845 flow = load_flow_from_entrypoint(entrypoint) 

846 

847 constructed_schedules = cls._construct_deployment_schedules( 

848 interval=interval, 

849 cron=cron, 

850 rrule=rrule, 

851 schedules=schedules, 

852 schedule=schedule, 

853 ) 

854 

855 if isinstance(concurrency_limit, ConcurrencyLimitConfig): 

856 concurrency_options = { 

857 "collision_strategy": concurrency_limit.collision_strategy 

858 } 

859 concurrency_limit = concurrency_limit.limit 

860 else: 

861 concurrency_options = None 

862 

863 deployment = cls( 

864 name=name, 

865 flow_name=flow_name or flow.name, 

866 schedules=constructed_schedules, 

867 concurrency_limit=concurrency_limit, 

868 concurrency_options=concurrency_options, 

869 paused=paused, 

870 tags=tags or [], 

871 triggers=triggers or [], 

872 parameters=parameters or {}, 

873 description=description, 

874 version=version, 

875 entrypoint=entrypoint, 

876 enforce_parameter_schema=enforce_parameter_schema, 

877 work_pool_name=work_pool_name, 

878 work_queue_name=work_queue_name, 

879 job_variables=job_variables, 

880 ) 

881 deployment._sla = _sla 

882 deployment._path = str(Path.cwd()) 

883 

884 cls._set_defaults_from_flow(deployment, flow) 

885 

886 return deployment 

887 

888 @classmethod 1a

889 async def afrom_storage( 1a

890 cls, 

891 storage: RunnerStorage, 

892 entrypoint: str, 

893 name: str, 

894 flow_name: Optional[str] = None, 

895 interval: Optional[ 

896 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta] 

897 ] = None, 

898 cron: Optional[Union[Iterable[str], str]] = None, 

899 rrule: Optional[Union[Iterable[str], str]] = None, 

900 paused: Optional[bool] = None, 

901 schedule: Optional[Schedule] = None, 

902 schedules: Optional["FlexibleScheduleList"] = None, 

903 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, 

904 parameters: Optional[dict[str, Any]] = None, 

905 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, 

906 description: Optional[str] = None, 

907 tags: Optional[List[str]] = None, 

908 version: Optional[str] = None, 

909 version_type: Optional[VersionType] = None, 

910 enforce_parameter_schema: bool = True, 

911 work_pool_name: Optional[str] = None, 

912 work_queue_name: Optional[str] = None, 

913 job_variables: Optional[dict[str, Any]] = None, 

914 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental 

915 ) -> "RunnerDeployment": 

916 """ 

917 Create a RunnerDeployment from a flow located at a given entrypoint and stored in a 

918 local storage location. 

919 

920 Args: 

921 entrypoint: The path to a file containing a flow and the name of the flow function in 

922 the format `./path/to/file.py:flow_func_name`. 

923 name: A name for the deployment 

924 flow_name: The name of the flow to deploy 

925 storage: A storage object to use for retrieving flow code. If not provided, a 

926 URL must be provided. 

927 interval: An interval on which to execute the current flow. Accepts either a number 

928 or a timedelta object. If a number is given, it will be interpreted as seconds. 

929 cron: A cron schedule of when to execute runs of this flow. 

930 rrule: An rrule schedule of when to execute runs of this flow. 

931 paused: Whether or not the deployment is paused. 

932 schedule: A schedule object defining when to execute runs of this deployment. 

933 Used to provide additional scheduling options like `timezone` or `parameters`. 

934 schedules: A list of schedule objects defining when to execute runs of this deployment. 

935 Used to provide additional scheduling options like `timezone` or `parameters`. 

936 triggers: A list of triggers that should kick of a run of this flow. 

937 parameters: A dictionary of default parameter values to pass to runs of this flow. 

938 description: A description for the created deployment. Defaults to the flow's 

939 description if not provided. 

940 tags: A list of tags to associate with the created deployment for organizational 

941 purposes. 

942 version: A version for the created deployment. Defaults to the flow's version. 

943 version_type: The type of version information to use for the deployment. The version type 

944 will be inferred if not provided. 

945 enforce_parameter_schema: Whether or not the Prefect API should enforce the 

946 parameter schema for this deployment. 

947 work_pool_name: The name of the work pool to use for this deployment. 

948 work_queue_name: The name of the work queue to use for this deployment's scheduled runs. 

949 If not provided the default work queue for the work pool will be used. 

950 job_variables: Settings used to override the values specified default base job template 

951 of the chosen work pool. Refer to the base job template of the chosen work pool for 

952 available settings. 

953 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. 

954 """ 

955 from prefect.flows import load_flow_from_entrypoint 

956 

957 constructed_schedules = cls._construct_deployment_schedules( 

958 interval=interval, 

959 cron=cron, 

960 rrule=rrule, 

961 schedules=schedules, 

962 schedule=schedule, 

963 ) 

964 

965 if isinstance(concurrency_limit, ConcurrencyLimitConfig): 

966 concurrency_options = { 

967 "collision_strategy": concurrency_limit.collision_strategy 

968 } 

969 concurrency_limit = concurrency_limit.limit 

970 else: 

971 concurrency_options = None 

972 

973 job_variables = job_variables or {} 

974 

975 with tempfile.TemporaryDirectory() as tmpdir: 

976 storage.set_base_path(Path(tmpdir)) 

977 await storage.pull_code() 

978 

979 full_entrypoint = str(storage.destination / entrypoint) 

980 flow = await from_async.wait_for_call_in_new_thread( 

981 create_call(load_flow_from_entrypoint, full_entrypoint) 

982 ) 

983 

984 deployment = cls( 

985 name=name, 

986 flow_name=flow_name or flow.name, 

987 schedules=constructed_schedules, 

988 concurrency_limit=concurrency_limit, 

989 concurrency_options=concurrency_options, 

990 paused=paused, 

991 tags=tags or [], 

992 triggers=triggers or [], 

993 parameters=parameters or {}, 

994 description=description, 

995 version=version, 

996 version_type=version_type, 

997 entrypoint=entrypoint, 

998 enforce_parameter_schema=enforce_parameter_schema, 

999 storage=storage, 

1000 work_pool_name=work_pool_name, 

1001 work_queue_name=work_queue_name, 

1002 job_variables=job_variables, 

1003 ) 

1004 deployment._sla = _sla 

1005 deployment._path = str(storage.destination).replace( 

1006 tmpdir, "$STORAGE_BASE_PATH" 

1007 ) 

1008 

1009 cls._set_defaults_from_flow(deployment, flow) 

1010 

1011 return deployment 

1012 

1013 @classmethod 1a

1014 @async_dispatch(afrom_storage) 1a

1015 def from_storage( 1a

1016 cls, 

1017 storage: RunnerStorage, 

1018 entrypoint: str, 

1019 name: str, 

1020 flow_name: Optional[str] = None, 

1021 interval: Optional[ 

1022 Union[Iterable[Union[int, float, timedelta]], int, float, timedelta] 

1023 ] = None, 

1024 cron: Optional[Union[Iterable[str], str]] = None, 

1025 rrule: Optional[Union[Iterable[str], str]] = None, 

1026 paused: Optional[bool] = None, 

1027 schedule: Optional[Schedule] = None, 

1028 schedules: Optional["FlexibleScheduleList"] = None, 

1029 concurrency_limit: Optional[Union[int, ConcurrencyLimitConfig, None]] = None, 

1030 parameters: Optional[dict[str, Any]] = None, 

1031 triggers: Optional[List[Union[DeploymentTriggerTypes, TriggerTypes]]] = None, 

1032 description: Optional[str] = None, 

1033 tags: Optional[List[str]] = None, 

1034 version: Optional[str] = None, 

1035 version_type: Optional[VersionType] = None, 

1036 enforce_parameter_schema: bool = True, 

1037 work_pool_name: Optional[str] = None, 

1038 work_queue_name: Optional[str] = None, 

1039 job_variables: Optional[dict[str, Any]] = None, 

1040 _sla: Optional[Union[SlaTypes, list[SlaTypes]]] = None, # experimental 

1041 ) -> "RunnerDeployment": 

1042 """ 

1043 Create a RunnerDeployment from a flow located at a given entrypoint and stored in a 

1044 local storage location. 

1045 

1046 Args: 

1047 entrypoint: The path to a file containing a flow and the name of the flow function in 

1048 the format `./path/to/file.py:flow_func_name`. 

1049 name: A name for the deployment 

1050 flow_name: The name of the flow to deploy 

1051 storage: A storage object to use for retrieving flow code. If not provided, a 

1052 URL must be provided. 

1053 interval: An interval on which to execute the current flow. Accepts either a number 

1054 or a timedelta object. If a number is given, it will be interpreted as seconds. 

1055 cron: A cron schedule of when to execute runs of this flow. 

1056 rrule: An rrule schedule of when to execute runs of this flow. 

1057 paused: Whether or not the deployment is paused. 

1058 schedule: A schedule object defining when to execute runs of this deployment. 

1059 Used to provide additional scheduling options like `timezone` or `parameters`. 

1060 schedules: A list of schedule objects defining when to execute runs of this deployment. 

1061 Used to provide additional scheduling options like `timezone` or `parameters`. 

1062 triggers: A list of triggers that should kick of a run of this flow. 

1063 parameters: A dictionary of default parameter values to pass to runs of this flow. 

1064 description: A description for the created deployment. Defaults to the flow's 

1065 description if not provided. 

1066 tags: A list of tags to associate with the created deployment for organizational 

1067 purposes. 

1068 version: A version for the created deployment. Defaults to the flow's version. 

1069 version_type: The type of version information to use for the deployment. The version type 

1070 will be inferred if not provided. 

1071 enforce_parameter_schema: Whether or not the Prefect API should enforce the 

1072 parameter schema for this deployment. 

1073 work_pool_name: The name of the work pool to use for this deployment. 

1074 work_queue_name: The name of the work queue to use for this deployment's scheduled runs. 

1075 If not provided the default work queue for the work pool will be used. 

1076 job_variables: Settings used to override the values specified default base job template 

1077 of the chosen work pool. Refer to the base job template of the chosen work pool for 

1078 available settings. 

1079 _sla: (Experimental) SLA configuration for the deployment. May be removed or modified at any time. Currently only supported on Prefect Cloud. 

1080 """ 

1081 from prefect.flows import load_flow_from_entrypoint 

1082 

1083 constructed_schedules = cls._construct_deployment_schedules( 

1084 interval=interval, 

1085 cron=cron, 

1086 rrule=rrule, 

1087 schedules=schedules, 

1088 schedule=schedule, 

1089 ) 

1090 

1091 if isinstance(concurrency_limit, ConcurrencyLimitConfig): 

1092 concurrency_options = { 

1093 "collision_strategy": concurrency_limit.collision_strategy 

1094 } 

1095 concurrency_limit = concurrency_limit.limit 

1096 else: 

1097 concurrency_options = None 

1098 

1099 job_variables = job_variables or {} 

1100 

1101 with tempfile.TemporaryDirectory() as tmpdir: 

1102 storage.set_base_path(Path(tmpdir)) 

1103 run_coro_as_sync(storage.pull_code()) 

1104 

1105 full_entrypoint = str(storage.destination / entrypoint) 

1106 flow = load_flow_from_entrypoint(full_entrypoint) 

1107 

1108 deployment = cls( 

1109 name=name, 

1110 flow_name=flow_name or flow.name, 

1111 schedules=constructed_schedules, 

1112 concurrency_limit=concurrency_limit, 

1113 concurrency_options=concurrency_options, 

1114 paused=paused, 

1115 tags=tags or [], 

1116 triggers=triggers or [], 

1117 parameters=parameters or {}, 

1118 description=description, 

1119 version=version, 

1120 version_type=version_type, 

1121 entrypoint=entrypoint, 

1122 enforce_parameter_schema=enforce_parameter_schema, 

1123 storage=storage, 

1124 work_pool_name=work_pool_name, 

1125 work_queue_name=work_queue_name, 

1126 job_variables=job_variables, 

1127 ) 

1128 deployment._sla = _sla 

1129 deployment._path = str(storage.destination).replace( 

1130 tmpdir, "$STORAGE_BASE_PATH" 

1131 ) 

1132 

1133 cls._set_defaults_from_flow(deployment, flow) 

1134 

1135 return deployment 

1136 

1137 

1138@sync_compatible 1a

1139async def deploy( 1a

1140 *deployments: RunnerDeployment, 

1141 work_pool_name: Optional[str] = None, 

1142 image: Optional[Union[str, DockerImage]] = None, 

1143 build: bool = True, 

1144 push: bool = True, 

1145 print_next_steps_message: bool = True, 

1146 ignore_warnings: bool = False, 

1147) -> List[UUID]: 

1148 """ 

1149 Deploy the provided list of deployments to dynamic infrastructure via a 

1150 work pool. 

1151 

1152 By default, calling this function will build a Docker image for the deployments, push it to a 

1153 registry, and create each deployment via the Prefect API that will run the corresponding 

1154 flow on the given schedule. 

1155 

1156 If you want to use an existing image, you can pass `build=False` to skip building and pushing 

1157 an image. 

1158 

1159 Args: 

1160 *deployments: A list of deployments to deploy. 

1161 work_pool_name: The name of the work pool to use for these deployments. Defaults to 

1162 the value of `PREFECT_DEFAULT_WORK_POOL_NAME`. 

1163 image: The name of the Docker image to build, including the registry and 

1164 repository. Pass a DockerImage instance to customize the Dockerfile used 

1165 and build arguments. 

1166 build: Whether or not to build a new image for the flow. If False, the provided 

1167 image will be used as-is and pulled at runtime. 

1168 push: Whether or not to skip pushing the built image to a registry. 

1169 print_next_steps_message: Whether or not to print a message with next steps 

1170 after deploying the deployments. 

1171 

1172 Returns: 

1173 A list of deployment IDs for the created/updated deployments. 

1174 

1175 Examples: 

1176 Deploy a group of flows to a work pool: 

1177 

1178 ```python 

1179 from prefect import deploy, flow 

1180 

1181 @flow(log_prints=True) 

1182 def local_flow(): 

1183 print("I'm a locally defined flow!") 

1184 

1185 if __name__ == "__main__": 

1186 deploy( 

1187 local_flow.to_deployment(name="example-deploy-local-flow"), 

1188 flow.from_source( 

1189 source="https://github.com/org/repo.git", 

1190 entrypoint="flows.py:my_flow", 

1191 ).to_deployment( 

1192 name="example-deploy-remote-flow", 

1193 ), 

1194 work_pool_name="my-work-pool", 

1195 image="my-registry/my-image:dev", 

1196 ) 

1197 ``` 

1198 """ 

1199 work_pool_name = work_pool_name or PREFECT_DEFAULT_WORK_POOL_NAME.value() 

1200 

1201 if not image and not all( 

1202 d.storage or d.entrypoint_type == EntrypointType.MODULE_PATH 

1203 for d in deployments 

1204 ): 

1205 raise ValueError( 

1206 "Either an image or remote storage location must be provided when deploying" 

1207 " a deployment." 

1208 ) 

1209 

1210 if not work_pool_name: 

1211 raise ValueError( 

1212 "A work pool name must be provided when deploying a deployment. Either" 

1213 " provide a work pool name when calling `deploy` or set" 

1214 " `PREFECT_DEFAULT_WORK_POOL_NAME` in your profile." 

1215 ) 

1216 

1217 if image and isinstance(image, str): 

1218 image_name, image_tag = parse_image_tag(image) 

1219 image = DockerImage(name=image_name, tag=image_tag) 

1220 

1221 try: 

1222 async with get_client() as client: 

1223 work_pool = await client.read_work_pool(work_pool_name) 

1224 active_workers = await client.read_workers_for_work_pool( 

1225 work_pool_name, 

1226 worker_filter=WorkerFilter(status=WorkerFilterStatus(any_=["ONLINE"])), 

1227 ) 

1228 except ObjectNotFound as exc: 

1229 raise ValueError( 

1230 f"Could not find work pool {work_pool_name!r}. Please create it before" 

1231 " deploying this flow." 

1232 ) from exc 

1233 

1234 is_docker_based_work_pool = get_from_dict( 

1235 work_pool.base_job_template, "variables.properties.image", False 

1236 ) 

1237 is_block_based_work_pool = get_from_dict( 

1238 work_pool.base_job_template, "variables.properties.block", False 

1239 ) 

1240 # carve out an exception for block based work pools that only have a block in their base job template 

1241 console = Console() 

1242 if not is_docker_based_work_pool and not is_block_based_work_pool: 

1243 if image: 

1244 raise ValueError( 

1245 f"Work pool {work_pool_name!r} does not support custom Docker images." 

1246 " Please use a work pool with an `image` variable in its base job template" 

1247 " or specify a remote storage location for the flow with `.from_source`." 

1248 " If you are attempting to deploy a flow to a local process work pool," 

1249 " consider using `flow.serve` instead. See the documentation for more" 

1250 " information: https://docs.prefect.io/latest/how-to-guides/deployments/run-flows-in-local-processes" 

1251 ) 

1252 elif work_pool.type == "process" and not ignore_warnings: 

1253 console.print( 

1254 "Looks like you're deploying to a process work pool. If you're creating a" 

1255 " deployment for local development, calling `.serve` on your flow is a great" 

1256 " way to get started. See the documentation for more information:" 

1257 " https://docs.prefect.io/latest/how-to-guides/deployments/run-flows-in-local-processes " 

1258 " Set `ignore_warnings=True` to suppress this message.", 

1259 style="yellow", 

1260 ) 

1261 

1262 is_managed_pool = work_pool.is_managed_pool 

1263 if is_managed_pool: 

1264 build = False 

1265 push = False 

1266 

1267 if image and build: 

1268 with Progress( 

1269 SpinnerColumn(), 

1270 TextColumn(f"Building image {image.reference}..."), 

1271 transient=True, 

1272 console=console, 

1273 ) as progress: 

1274 docker_build_task = progress.add_task("docker_build", total=1) 

1275 image.build() 

1276 

1277 progress.update(docker_build_task, completed=1) 

1278 console.print( 

1279 f"Successfully built image {image.reference!r}", style="green" 

1280 ) 

1281 

1282 if image and build and push: 

1283 with Progress( 

1284 SpinnerColumn(), 

1285 TextColumn("Pushing image..."), 

1286 transient=True, 

1287 console=console, 

1288 ) as progress: 

1289 docker_push_task = progress.add_task("docker_push", total=1) 

1290 

1291 image.push() 

1292 

1293 progress.update(docker_push_task, completed=1) 

1294 

1295 console.print(f"Successfully pushed image {image.reference!r}", style="green") 

1296 

1297 deployment_exceptions: list[dict[str, Any]] = [] 

1298 deployment_ids: list[UUID] = [] 

1299 image_ref = image.reference if image else None 

1300 for deployment in track( 

1301 deployments, 

1302 description="Creating/updating deployments...", 

1303 console=console, 

1304 transient=True, 

1305 ): 

1306 try: 

1307 deployment_ids.append( 

1308 await deployment.apply(image=image_ref, work_pool_name=work_pool_name) 

1309 ) 

1310 except Exception as exc: 

1311 if len(deployments) == 1: 

1312 raise 

1313 deployment_exceptions.append({"deployment": deployment, "exc": exc}) 

1314 

1315 if deployment_exceptions: 

1316 console.print( 

1317 "Encountered errors while creating/updating deployments:\n", 

1318 style="orange_red1", 

1319 ) 

1320 else: 

1321 console.print("Successfully created/updated all deployments!\n", style="green") 

1322 

1323 complete_failure = len(deployment_exceptions) == len(deployments) 

1324 

1325 table = Table( 

1326 title="Deployments", 

1327 show_lines=True, 

1328 ) 

1329 

1330 table.add_column(header="Name", style="blue", no_wrap=True) 

1331 table.add_column(header="Status", style="blue", no_wrap=True) 

1332 table.add_column(header="Details", style="blue") 

1333 

1334 for deployment in deployments: 

1335 errored_deployment = next( 

1336 (d for d in deployment_exceptions if d["deployment"] == deployment), 

1337 None, 

1338 ) 

1339 if errored_deployment: 

1340 table.add_row( 

1341 f"{deployment.flow_name}/{deployment.name}", 

1342 "failed", 

1343 str(errored_deployment["exc"]), 

1344 style="red", 

1345 ) 

1346 else: 

1347 table.add_row(f"{deployment.flow_name}/{deployment.name}", "applied") 

1348 console.print(table) 

1349 

1350 if print_next_steps_message and not complete_failure: 

1351 if ( 

1352 not work_pool.is_push_pool 

1353 and not work_pool.is_managed_pool 

1354 and not active_workers 

1355 ): 

1356 console.print( 

1357 "\nTo execute flow runs from these deployments, start a worker in a" 

1358 " separate terminal that pulls work from the" 

1359 f" {work_pool_name!r} work pool:" 

1360 f"\n\t[blue]$ prefect worker start --pool {work_pool_name!r}[/]", 

1361 ) 

1362 console.print( 

1363 "\nTo trigger any of these deployments, use the" 

1364 " following command:\n[blue]\n\t$ prefect deployment run" 

1365 " [DEPLOYMENT_NAME]\n[/]" 

1366 ) 

1367 

1368 if PREFECT_UI_URL: 

1369 console.print( 

1370 "\nYou can also trigger your deployments via the Prefect UI:" 

1371 f" [blue]{PREFECT_UI_URL.value()}/deployments[/]\n" 

1372 ) 

1373 

1374 return deployment_ids