Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_deployments/client.py: 10%

369 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3from collections.abc import Iterable 1a

4from typing import TYPE_CHECKING, Any, Union 1a

5from uuid import UUID 1a

6 

7from httpx import HTTPStatusError, RequestError 1a

8 

9from prefect._internal.compatibility.deprecated import deprecated_callable 1a

10from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a

11from prefect.exceptions import ObjectAlreadyExists, ObjectLimitReached, ObjectNotFound 1a

12 

13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a

14 import datetime 

15 

16 from prefect.client.schemas import FlowRun 

17 from prefect.client.schemas.actions import ( 

18 DeploymentCreate, 

19 DeploymentScheduleCreate, 

20 DeploymentUpdate, 

21 ) 

22 from prefect.client.schemas.filters import ( 

23 DeploymentFilter, 

24 FlowFilter, 

25 FlowRunFilter, 

26 TaskRunFilter, 

27 WorkPoolFilter, 

28 WorkQueueFilter, 

29 ) 

30 from prefect.client.schemas.objects import ( 

31 ConcurrencyOptions, 

32 DeploymentBranchingOptions, 

33 DeploymentSchedule, 

34 VersionInfo, 

35 ) 

36 from prefect.client.schemas.responses import ( 

37 DeploymentResponse, 

38 FlowRunResponse, 

39 ) 

40 from prefect.client.schemas.schedules import SCHEDULE_TYPES 

41 from prefect.client.schemas.sorting import ( 

42 DeploymentSort, 

43 ) 

44 from prefect.states import State 

45 from prefect.types import KeyValueLabelsField 

46 

47 

48class DeploymentClient(BaseClient): 1a

49 def create_deployment( 1a

50 self, 

51 flow_id: UUID, 

52 name: str, 

53 version: str | None = None, 

54 version_info: "VersionInfo | None" = None, 

55 schedules: list["DeploymentScheduleCreate"] | None = None, 

56 concurrency_limit: int | None = None, 

57 concurrency_options: "ConcurrencyOptions | None" = None, 

58 parameters: dict[str, Any] | None = None, 

59 description: str | None = None, 

60 work_queue_name: str | None = None, 

61 work_pool_name: str | None = None, 

62 tags: list[str] | None = None, 

63 storage_document_id: UUID | None = None, 

64 path: str | None = None, 

65 entrypoint: str | None = None, 

66 infrastructure_document_id: UUID | None = None, 

67 parameter_openapi_schema: dict[str, Any] | None = None, 

68 paused: bool | None = None, 

69 pull_steps: list[dict[str, Any]] | None = None, 

70 enforce_parameter_schema: bool | None = None, 

71 job_variables: dict[str, Any] | None = None, 

72 branch: str | None = None, 

73 base: UUID | None = None, 

74 root: UUID | None = None, 

75 ) -> UUID: 

76 """ 

77 Create a deployment. 

78 

79 Args: 

80 flow_id: the flow ID to create a deployment for 

81 name: the name of the deployment 

82 version: an optional version string for the deployment 

83 tags: an optional list of tags to apply to the deployment 

84 storage_document_id: an reference to the storage block document 

85 used for the deployed flow 

86 infrastructure_document_id: an reference to the infrastructure block document 

87 to use for this deployment 

88 job_variables: A dictionary of dot delimited infrastructure overrides that 

89 will be applied at runtime; for example `env.CONFIG_KEY=config_value` or 

90 `namespace='prefect'`. This argument was previously named `infra_overrides`. 

91 Both arguments are supported for backwards compatibility. 

92 

93 Raises: 

94 RequestError: if the deployment was not created for any reason 

95 

96 Returns: 

97 the ID of the deployment in the backend 

98 """ 

99 

100 from prefect.client.schemas.actions import DeploymentCreate 

101 

102 deployment_create = DeploymentCreate( 

103 flow_id=flow_id, 

104 name=name, 

105 version=version, 

106 version_info=version_info, 

107 parameters=dict(parameters or {}), 

108 tags=list(tags or []), 

109 work_queue_name=work_queue_name, 

110 description=description, 

111 storage_document_id=storage_document_id, 

112 path=path, 

113 entrypoint=entrypoint, 

114 infrastructure_document_id=infrastructure_document_id, 

115 job_variables=dict(job_variables or {}), 

116 parameter_openapi_schema=parameter_openapi_schema or {}, 

117 paused=paused, 

118 schedules=schedules or [], 

119 concurrency_limit=concurrency_limit, 

120 concurrency_options=concurrency_options, 

121 pull_steps=pull_steps, 

122 enforce_parameter_schema=enforce_parameter_schema, 

123 branch=branch, 

124 base=base, 

125 root=root, 

126 ) 

127 

128 if work_pool_name is not None: 

129 deployment_create.work_pool_name = work_pool_name 

130 

131 # Exclude newer fields that are not set to avoid compatibility issues 

132 exclude = { 

133 field 

134 for field in [ 

135 "work_pool_name", 

136 "work_queue_name", 

137 ] 

138 if field not in deployment_create.model_fields_set 

139 } 

140 

141 exclude_if_none = [ 

142 "paused", 

143 "pull_steps", 

144 "enforce_parameter_schema", 

145 "version_info", 

146 "branch", 

147 "base", 

148 "root", 

149 ] 

150 

151 for field in exclude_if_none: 

152 if getattr(deployment_create, field) is None: 

153 exclude.add(field) 

154 

155 payload = deployment_create.model_dump(mode="json", exclude=exclude) 

156 if deployment_create.version_info: 

157 payload["version_info"] = deployment_create.version_info.model_dump( 

158 mode="json" 

159 ) 

160 

161 try: 

162 response = self.request("POST", "/deployments/", json=payload) 

163 except HTTPStatusError as e: 

164 if e.response.status_code == 403 and "maximum number" in str(e): 

165 raise ObjectLimitReached(http_exc=e) from e 

166 if e.response.status_code == 409: 

167 raise ObjectAlreadyExists(http_exc=e) from e 

168 else: 

169 raise 

170 

171 deployment_id = response.json().get("id") 

172 if not deployment_id: 

173 raise RequestError(f"Malformed response: {response}") 

174 

175 return UUID(deployment_id) 

176 

177 def _set_deployment_paused_state(self, deployment_id: UUID, paused: bool) -> None: 1a

178 self.request( 

179 "PATCH", 

180 "/deployments/{id}", 

181 path_params={"id": deployment_id}, 

182 json={"paused": paused}, 

183 ) 

184 

185 @deprecated_callable( 1a

186 start_date="Jun 2025", 

187 help="Use pause_deployment or resume_deployment instead.", 

188 ) 

189 def set_deployment_paused_state(self, deployment_id: UUID, paused: bool) -> None: 1a

190 """ 

191 DEPRECATED: Use pause_deployment or resume_deployment instead. 

192 

193 Set the paused state of a deployment. 

194 

195 Args: 

196 deployment_id: the deployment ID to update 

197 paused: whether the deployment should be paused 

198 """ 

199 self._set_deployment_paused_state(deployment_id, paused) 

200 

201 def pause_deployment(self, deployment_id: Union[UUID, str]) -> None: 1a

202 """ 

203 Pause a deployment by ID. 

204 

205 Args: 

206 deployment_id: The deployment ID of interest (can be a UUID or a string). 

207 

208 Raises: 

209 ObjectNotFound: If request returns 404 

210 RequestError: If request fails 

211 """ 

212 if not isinstance(deployment_id, UUID): 

213 try: 

214 deployment_id = UUID(deployment_id) 

215 except ValueError: 

216 raise ValueError(f"Invalid deployment ID: {deployment_id}") 

217 

218 try: 

219 self._set_deployment_paused_state(deployment_id, paused=True) 

220 except HTTPStatusError as e: 

221 if e.response.status_code == 404: 

222 raise ObjectNotFound(http_exc=e) from e 

223 else: 

224 raise 

225 

226 def resume_deployment(self, deployment_id: Union[UUID, str]) -> None: 1a

227 """ 

228 Resume (unpause) a deployment by ID. 

229 

230 Args: 

231 deployment_id: The deployment ID of interest (can be a UUID or a string). 

232 

233 Raises: 

234 ObjectNotFound: If request returns 404 

235 RequestError: If request fails 

236 """ 

237 if not isinstance(deployment_id, UUID): 

238 try: 

239 deployment_id = UUID(deployment_id) 

240 except ValueError: 

241 raise ValueError(f"Invalid deployment ID: {deployment_id}") 

242 

243 try: 

244 self._set_deployment_paused_state(deployment_id, paused=False) 

245 except HTTPStatusError as e: 

246 if e.response.status_code == 404: 

247 raise ObjectNotFound(http_exc=e) from e 

248 else: 

249 raise 

250 

251 def update_deployment( 1a

252 self, 

253 deployment_id: UUID, 

254 deployment: "DeploymentUpdate", 

255 ) -> None: 

256 exclude_if_none = [ 

257 "version_info", 

258 ] 

259 

260 exclude = {"name", "flow_name", "triggers"} 

261 for field in exclude_if_none: 

262 if getattr(deployment, field) is None: 

263 exclude.add(field) 

264 

265 payload = deployment.model_dump( 

266 mode="json", 

267 exclude_unset=True, 

268 exclude=exclude, 

269 ) 

270 if deployment.version_info: 

271 payload["version_info"] = deployment.version_info.model_dump(mode="json") 

272 

273 self.request( 

274 "PATCH", 

275 "/deployments/{id}", 

276 path_params={"id": deployment_id}, 

277 json=payload, 

278 ) 

279 

280 def _create_deployment_from_schema(self, schema: "DeploymentCreate") -> UUID: 1a

281 """ 

282 Create a deployment from a prepared `DeploymentCreate` schema. 

283 """ 

284 # TODO: We are likely to remove this method once we have considered the 

285 # packaging interface for deployments further. 

286 response = self.request( 

287 "POST", "/deployments/", json=schema.model_dump(mode="json") 

288 ) 

289 deployment_id = response.json().get("id") 

290 if not deployment_id: 

291 raise RequestError(f"Malformed response: {response}") 

292 

293 return UUID(deployment_id) 

294 

295 def read_deployment( 1a

296 self, 

297 deployment_id: Union[UUID, str], 

298 ) -> "DeploymentResponse": 

299 """ 

300 Query the Prefect API for a deployment by id. 

301 

302 Args: 

303 deployment_id: the deployment ID of interest 

304 

305 Returns: 

306 a Deployment model representation of the deployment 

307 """ 

308 

309 from prefect.client.schemas.responses import DeploymentResponse 

310 

311 if not isinstance(deployment_id, UUID): 

312 try: 

313 deployment_id = UUID(deployment_id) 

314 except ValueError: 

315 raise ValueError(f"Invalid deployment ID: {deployment_id}") 

316 

317 try: 

318 response = self.request( 

319 "GET", 

320 "/deployments/{id}", 

321 path_params={"id": deployment_id}, 

322 ) 

323 except HTTPStatusError as e: 

324 if e.response.status_code == 404: 

325 raise ObjectNotFound(http_exc=e) from e 

326 else: 

327 raise 

328 return DeploymentResponse.model_validate(response.json()) 

329 

330 def read_deployment_by_name( 1a

331 self, 

332 name: str, 

333 ) -> "DeploymentResponse": 

334 """ 

335 Query the Prefect API for a deployment by name. 

336 

337 Args: 

338 name: A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME> 

339 

340 Raises: 

341 ObjectNotFound: If request returns 404 

342 RequestError: If request fails 

343 

344 Returns: 

345 a Deployment model representation of the deployment 

346 """ 

347 from prefect.client.schemas.responses import DeploymentResponse 

348 

349 try: 

350 flow_name, deployment_name = name.split("/") 

351 response = self.request( 

352 "GET", 

353 "/deployments/name/{flow_name}/{deployment_name}", 

354 path_params={ 

355 "flow_name": flow_name, 

356 "deployment_name": deployment_name, 

357 }, 

358 ) 

359 except (HTTPStatusError, ValueError) as e: 

360 if isinstance(e, HTTPStatusError) and e.response.status_code == 404: 

361 raise ObjectNotFound(http_exc=e) from e 

362 elif isinstance(e, ValueError): 

363 raise ValueError( 

364 f"Invalid deployment name format: {name}. Expected format: <FLOW_NAME>/<DEPLOYMENT_NAME>" 

365 ) from e 

366 else: 

367 raise 

368 

369 return DeploymentResponse.model_validate(response.json()) 

370 

371 def read_deployments( 1a

372 self, 

373 *, 

374 flow_filter: "FlowFilter | None" = None, 

375 flow_run_filter: "FlowRunFilter | None" = None, 

376 task_run_filter: "TaskRunFilter | None" = None, 

377 deployment_filter: "DeploymentFilter | None" = None, 

378 work_pool_filter: "WorkPoolFilter | None" = None, 

379 work_queue_filter: "WorkQueueFilter | None" = None, 

380 limit: int | None = None, 

381 sort: "DeploymentSort | None" = None, 

382 offset: int = 0, 

383 ) -> list["DeploymentResponse"]: 

384 """ 

385 Query the Prefect API for deployments. Only deployments matching all 

386 the provided criteria will be returned. 

387 

388 Args: 

389 flow_filter: filter criteria for flows 

390 flow_run_filter: filter criteria for flow runs 

391 task_run_filter: filter criteria for task runs 

392 deployment_filter: filter criteria for deployments 

393 work_pool_filter: filter criteria for work pools 

394 work_queue_filter: filter criteria for work pool queues 

395 limit: a limit for the deployment query 

396 offset: an offset for the deployment query 

397 

398 Returns: 

399 a list of Deployment model representations 

400 of the deployments 

401 """ 

402 from prefect.client.schemas.responses import DeploymentResponse 

403 

404 body: dict[str, Any] = { 

405 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

406 "flow_runs": ( 

407 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

408 if flow_run_filter 

409 else None 

410 ), 

411 "task_runs": ( 

412 task_run_filter.model_dump(mode="json") if task_run_filter else None 

413 ), 

414 "deployments": ( 

415 deployment_filter.model_dump(mode="json") if deployment_filter else None 

416 ), 

417 "work_pools": ( 

418 work_pool_filter.model_dump(mode="json") if work_pool_filter else None 

419 ), 

420 "work_pool_queues": ( 

421 work_queue_filter.model_dump(mode="json") if work_queue_filter else None 

422 ), 

423 "limit": limit, 

424 "offset": offset, 

425 "sort": sort, 

426 } 

427 

428 response = self.request("POST", "/deployments/filter", json=body) 

429 return DeploymentResponse.model_validate_list(response.json()) 

430 

431 def delete_deployment( 1a

432 self, 

433 deployment_id: UUID, 

434 ) -> None: 

435 """ 

436 Delete deployment by id. 

437 

438 Args: 

439 deployment_id: The deployment id of interest. 

440 Raises: 

441 ObjectNotFound: If request returns 404 

442 RequestError: If requests fails 

443 """ 

444 try: 

445 self.request( 

446 "DELETE", 

447 "/deployments/{id}", 

448 path_params={"id": deployment_id}, 

449 ) 

450 except HTTPStatusError as e: 

451 if e.response.status_code == 404: 

452 raise ObjectNotFound(http_exc=e) from e 

453 else: 

454 raise 

455 

456 def create_deployment_schedules( 1a

457 self, 

458 deployment_id: UUID, 

459 schedules: list[tuple["SCHEDULE_TYPES", bool]], 

460 ) -> list["DeploymentSchedule"]: 

461 """ 

462 Create deployment schedules. 

463 

464 Args: 

465 deployment_id: the deployment ID 

466 schedules: a list of tuples containing the schedule to create 

467 and whether or not it should be active. 

468 

469 Raises: 

470 RequestError: if the schedules were not created for any reason 

471 

472 Returns: 

473 the list of schedules created in the backend 

474 """ 

475 from prefect.client.schemas.actions import DeploymentScheduleCreate 

476 from prefect.client.schemas.objects import DeploymentSchedule 

477 

478 deployment_schedule_create = [ 

479 DeploymentScheduleCreate(schedule=schedule[0], active=schedule[1]) 

480 for schedule in schedules 

481 ] 

482 

483 json = [ 

484 deployment_schedule_create.model_dump(mode="json") 

485 for deployment_schedule_create in deployment_schedule_create 

486 ] 

487 response = self.request( 

488 "POST", 

489 "/deployments/{id}/schedules", 

490 path_params={"id": deployment_id}, 

491 json=json, 

492 ) 

493 return DeploymentSchedule.model_validate_list(response.json()) 

494 

495 def read_deployment_schedules( 1a

496 self, 

497 deployment_id: UUID, 

498 ) -> list["DeploymentSchedule"]: 

499 """ 

500 Query the Prefect API for a deployment's schedules. 

501 

502 Args: 

503 deployment_id: the deployment ID 

504 

505 Returns: 

506 a list of DeploymentSchedule model representations of the deployment schedules 

507 """ 

508 from prefect.client.schemas.objects import DeploymentSchedule 

509 

510 try: 

511 response = self.request( 

512 "GET", 

513 "/deployments/{id}/schedules", 

514 path_params={"id": deployment_id}, 

515 ) 

516 except HTTPStatusError as e: 

517 if e.response.status_code == 404: 

518 raise ObjectNotFound(http_exc=e) from e 

519 else: 

520 raise 

521 return DeploymentSchedule.model_validate_list(response.json()) 

522 

523 def update_deployment_schedule( 1a

524 self, 

525 deployment_id: UUID, 

526 schedule_id: UUID, 

527 active: bool | None = None, 

528 schedule: "SCHEDULE_TYPES | None" = None, 

529 ) -> None: 

530 """ 

531 Update a deployment schedule by ID. 

532 

533 Args: 

534 deployment_id: the deployment ID 

535 schedule_id: the deployment schedule ID of interest 

536 active: whether or not the schedule should be active 

537 schedule: the cron, rrule, or interval schedule this deployment schedule should use 

538 """ 

539 from prefect.client.schemas.actions import DeploymentScheduleUpdate 

540 

541 kwargs: dict[str, Any] = {} 

542 if active is not None: 

543 kwargs["active"] = active 

544 if schedule is not None: 

545 kwargs["schedule"] = schedule 

546 

547 deployment_schedule_update = DeploymentScheduleUpdate(**kwargs) 

548 json = deployment_schedule_update.model_dump(mode="json", exclude_unset=True) 

549 

550 try: 

551 self.request( 

552 "PATCH", 

553 "/deployments/{id}/schedules/{schedule_id}", 

554 path_params={"id": deployment_id, "schedule_id": schedule_id}, 

555 json=json, 

556 ) 

557 except HTTPStatusError as e: 

558 if e.response.status_code == 404: 

559 raise ObjectNotFound(http_exc=e) from e 

560 else: 

561 raise 

562 

563 def delete_deployment_schedule( 1a

564 self, 

565 deployment_id: UUID, 

566 schedule_id: UUID, 

567 ) -> None: 

568 """ 

569 Delete a deployment schedule. 

570 

571 Args: 

572 deployment_id: the deployment ID 

573 schedule_id: the ID of the deployment schedule to delete. 

574 

575 Raises: 

576 RequestError: if the schedules were not deleted for any reason 

577 """ 

578 try: 

579 self.request( 

580 "DELETE", 

581 "/deployments/{id}/schedules/{schedule_id}", 

582 path_params={"id": deployment_id, "schedule_id": schedule_id}, 

583 ) 

584 except HTTPStatusError as e: 

585 if e.response.status_code == 404: 

586 raise ObjectNotFound(http_exc=e) from e 

587 else: 

588 raise 

589 

590 def get_scheduled_flow_runs_for_deployments( 1a

591 self, 

592 deployment_ids: list[UUID], 

593 scheduled_before: "datetime.datetime | None" = None, 

594 limit: int | None = None, 

595 ) -> list["FlowRunResponse"]: 

596 from prefect.client.schemas.responses import FlowRunResponse 

597 

598 body: dict[str, Any] = dict(deployment_ids=[str(id) for id in deployment_ids]) 

599 if scheduled_before: 

600 body["scheduled_before"] = str(scheduled_before) 

601 if limit: 

602 body["limit"] = limit 

603 

604 response = self.request( 

605 "POST", 

606 "/deployments/get_scheduled_flow_runs", 

607 json=body, 

608 ) 

609 

610 return FlowRunResponse.model_validate_list(response.json()) 

611 

612 def create_flow_run_from_deployment( 1a

613 self, 

614 deployment_id: UUID, 

615 *, 

616 parameters: dict[str, Any] | None = None, 

617 context: dict[str, Any] | None = None, 

618 state: State[Any] | None = None, 

619 name: str | None = None, 

620 tags: Iterable[str] | None = None, 

621 idempotency_key: str | None = None, 

622 parent_task_run_id: UUID | None = None, 

623 work_queue_name: str | None = None, 

624 job_variables: dict[str, Any] | None = None, 

625 labels: "KeyValueLabelsField | None" = None, 

626 ) -> "FlowRun": 

627 """ 

628 Create a flow run for a deployment. 

629 

630 Args: 

631 deployment_id: The deployment ID to create the flow run from 

632 parameters: Parameter overrides for this flow run. Merged with the 

633 deployment defaults 

634 context: Optional run context data 

635 state: The initial state for the run. If not provided, defaults to 

636 `Scheduled` for now. Should always be a `Scheduled` type. 

637 name: An optional name for the flow run. If not provided, the server will 

638 generate a name. 

639 tags: An optional iterable of tags to apply to the flow run; these tags 

640 are merged with the deployment's tags. 

641 idempotency_key: Optional idempotency key for creation of the flow run. 

642 If the key matches the key of an existing flow run, the existing run will 

643 be returned instead of creating a new one. 

644 parent_task_run_id: if a subflow run is being created, the placeholder task 

645 run identifier in the parent flow 

646 work_queue_name: An optional work queue name to add this run to. If not provided, 

647 will default to the deployment's set work queue. If one is provided that does not 

648 exist, a new work queue will be created within the deployment's work pool. 

649 job_variables: Optional variables that will be supplied to the flow run job. 

650 

651 Raises: 

652 RequestError: if the Prefect API does not successfully create a run for any reason 

653 

654 Returns: 

655 The flow run model 

656 """ 

657 from prefect.client.schemas.actions import DeploymentFlowRunCreate 

658 from prefect.client.schemas.objects import FlowRun 

659 from prefect.states import Scheduled, to_state_create 

660 

661 parameters = parameters or {} 

662 context = context or {} 

663 state = state or Scheduled() 

664 tags = tags or [] 

665 labels = labels or {} 

666 

667 flow_run_create = DeploymentFlowRunCreate( 

668 parameters=parameters, 

669 context=context, 

670 state=to_state_create(state), 

671 tags=list(tags), 

672 name=name, 

673 idempotency_key=idempotency_key, 

674 parent_task_run_id=parent_task_run_id, 

675 job_variables=job_variables, 

676 labels=labels, 

677 ) 

678 

679 # done separately to avoid including this field in payloads sent to older API versions 

680 if work_queue_name: 

681 flow_run_create.work_queue_name = work_queue_name 

682 

683 response = self.request( 

684 "POST", 

685 "/deployments/{id}/create_flow_run", 

686 path_params={"id": deployment_id}, 

687 json=flow_run_create.model_dump(mode="json", exclude_unset=True), 

688 ) 

689 return FlowRun.model_validate(response.json()) 

690 

691 def create_deployment_branch( 1a

692 self, 

693 deployment_id: UUID, 

694 branch: str, 

695 options: "DeploymentBranchingOptions | None" = None, 

696 overrides: "DeploymentUpdate | None" = None, 

697 ) -> UUID: 

698 from prefect.client.schemas.actions import DeploymentBranch 

699 from prefect.client.schemas.objects import DeploymentBranchingOptions 

700 

701 response = self.request( 

702 "POST", 

703 "/deployments/{id}/branch", 

704 path_params={"id": deployment_id}, 

705 json=DeploymentBranch( 

706 branch=branch, 

707 options=options or DeploymentBranchingOptions(), 

708 overrides=overrides, 

709 ).model_dump(mode="json", exclude_unset=True), 

710 ) 

711 return UUID(response.json().get("id")) 

712 

713 

714class DeploymentAsyncClient(BaseAsyncClient): 1a

715 async def create_deployment( 1a

716 self, 

717 flow_id: UUID, 

718 name: str, 

719 version: str | None = None, 

720 version_info: "VersionInfo | None" = None, 

721 schedules: list["DeploymentScheduleCreate"] | None = None, 

722 concurrency_limit: int | None = None, 

723 concurrency_options: "ConcurrencyOptions | None" = None, 

724 parameters: dict[str, Any] | None = None, 

725 description: str | None = None, 

726 work_queue_name: str | None = None, 

727 work_pool_name: str | None = None, 

728 tags: list[str] | None = None, 

729 storage_document_id: UUID | None = None, 

730 path: str | None = None, 

731 entrypoint: str | None = None, 

732 infrastructure_document_id: UUID | None = None, 

733 parameter_openapi_schema: dict[str, Any] | None = None, 

734 paused: bool | None = None, 

735 pull_steps: list[dict[str, Any]] | None = None, 

736 enforce_parameter_schema: bool | None = None, 

737 job_variables: dict[str, Any] | None = None, 

738 branch: str | None = None, 

739 base: UUID | None = None, 

740 root: UUID | None = None, 

741 ) -> UUID: 

742 """ 

743 Create a deployment. 

744 

745 Args: 

746 flow_id: the flow ID to create a deployment for 

747 name: the name of the deployment 

748 version: an optional version string for the deployment 

749 tags: an optional list of tags to apply to the deployment 

750 storage_document_id: an reference to the storage block document 

751 used for the deployed flow 

752 infrastructure_document_id: an reference to the infrastructure block document 

753 to use for this deployment 

754 job_variables: A dictionary of dot delimited infrastructure overrides that 

755 will be applied at runtime; for example `env.CONFIG_KEY=config_value` or 

756 `namespace='prefect'`. This argument was previously named `infra_overrides`. 

757 Both arguments are supported for backwards compatibility. 

758 

759 Raises: 

760 RequestError: if the deployment was not created for any reason 

761 

762 Returns: 

763 the ID of the deployment in the backend 

764 """ 

765 

766 from prefect.client.schemas.actions import DeploymentCreate 

767 

768 deployment_create = DeploymentCreate( 

769 flow_id=flow_id, 

770 name=name, 

771 version=version, 

772 version_info=version_info, 

773 parameters=dict(parameters or {}), 

774 tags=list(tags or []), 

775 work_queue_name=work_queue_name, 

776 description=description, 

777 storage_document_id=storage_document_id, 

778 path=path, 

779 entrypoint=entrypoint, 

780 infrastructure_document_id=infrastructure_document_id, 

781 job_variables=dict(job_variables or {}), 

782 parameter_openapi_schema=parameter_openapi_schema or {}, 

783 paused=paused, 

784 schedules=schedules or [], 

785 concurrency_limit=concurrency_limit, 

786 concurrency_options=concurrency_options, 

787 pull_steps=pull_steps, 

788 enforce_parameter_schema=enforce_parameter_schema, 

789 branch=branch, 

790 base=base, 

791 root=root, 

792 ) 

793 

794 if work_pool_name is not None: 

795 deployment_create.work_pool_name = work_pool_name 

796 

797 # Exclude newer fields that are not set to avoid compatibility issues 

798 exclude = { 

799 field 

800 for field in [ 

801 "work_pool_name", 

802 "work_queue_name", 

803 ] 

804 if field not in deployment_create.model_fields_set 

805 } 

806 

807 exclude_if_none = [ 

808 "paused", 

809 "pull_steps", 

810 "enforce_parameter_schema", 

811 "version_info", 

812 "branch", 

813 "base", 

814 "root", 

815 ] 

816 

817 for field in exclude_if_none: 

818 if getattr(deployment_create, field) is None: 

819 exclude.add(field) 

820 

821 payload = deployment_create.model_dump(mode="json", exclude=exclude) 

822 if deployment_create.version_info: 

823 payload["version_info"] = deployment_create.version_info.model_dump( 

824 mode="json" 

825 ) 

826 

827 try: 

828 response = await self.request("POST", "/deployments/", json=payload) 

829 except HTTPStatusError as e: 

830 if e.response.status_code == 403 and "maximum number of deployments" in str( 

831 e 

832 ): 

833 raise ObjectLimitReached(http_exc=e) from e 

834 if e.response.status_code == 409: 

835 raise ObjectAlreadyExists(http_exc=e) from e 

836 else: 

837 raise 

838 

839 deployment_id = response.json().get("id") 

840 if not deployment_id: 

841 raise RequestError(f"Malformed response: {response}") 

842 

843 return UUID(deployment_id) 

844 

845 async def _set_deployment_paused_state( 1a

846 self, deployment_id: UUID, paused: bool 

847 ) -> None: 

848 await self.request( 

849 "PATCH", 

850 "/deployments/{id}", 

851 path_params={"id": deployment_id}, 

852 json={"paused": paused}, 

853 ) 

854 

855 @deprecated_callable( 1a

856 start_date="Jun 2025", 

857 help="Use pause_deployment or resume_deployment instead.", 

858 ) 

859 async def set_deployment_paused_state( 1a

860 self, deployment_id: UUID, paused: bool 

861 ) -> None: 

862 """ 

863 DEPRECATED: Use pause_deployment or resume_deployment instead. 

864 

865 Set the paused state of a deployment. 

866 

867 Args: 

868 deployment_id: the deployment ID to update 

869 paused: whether the deployment should be paused 

870 """ 

871 await self._set_deployment_paused_state(deployment_id, paused) 

872 

873 async def pause_deployment(self, deployment_id: Union[UUID, str]) -> None: 1a

874 """ 

875 Pause a deployment by ID. 

876 

877 Args: 

878 deployment_id: The deployment ID of interest (can be a UUID or a string). 

879 

880 Raises: 

881 ObjectNotFound: If request returns 404 

882 RequestError: If request fails 

883 """ 

884 if not isinstance(deployment_id, UUID): 

885 try: 

886 deployment_id = UUID(deployment_id) 

887 except ValueError: 

888 raise ValueError(f"Invalid deployment ID: {deployment_id}") 

889 

890 try: 

891 await self._set_deployment_paused_state(deployment_id, paused=True) 

892 except HTTPStatusError as e: 

893 if e.response.status_code == 404: 

894 raise ObjectNotFound(http_exc=e) from e 

895 else: 

896 raise 

897 

898 async def resume_deployment(self, deployment_id: Union[UUID, str]) -> None: 1a

899 """ 

900 Resume (unpause) a deployment by ID. 

901 

902 Args: 

903 deployment_id: The deployment ID of interest (can be a UUID or a string). 

904 

905 Raises: 

906 ObjectNotFound: If request returns 404 

907 RequestError: If request fails 

908 """ 

909 if not isinstance(deployment_id, UUID): 

910 try: 

911 deployment_id = UUID(deployment_id) 

912 except ValueError: 

913 raise ValueError(f"Invalid deployment ID: {deployment_id}") 

914 

915 try: 

916 await self._set_deployment_paused_state(deployment_id, paused=False) 

917 except HTTPStatusError as e: 

918 if e.response.status_code == 404: 

919 raise ObjectNotFound(http_exc=e) from e 

920 else: 

921 raise 

922 

923 async def update_deployment( 1a

924 self, 

925 deployment_id: UUID, 

926 deployment: "DeploymentUpdate", 

927 ) -> None: 

928 exclude_if_none = [ 

929 "version_info", 

930 ] 

931 

932 exclude = {"name", "flow_name", "triggers"} 

933 for field in exclude_if_none: 

934 if getattr(deployment, field) is None: 

935 exclude.add(field) 

936 

937 payload = deployment.model_dump( 

938 mode="json", 

939 exclude_unset=True, 

940 exclude=exclude, 

941 ) 

942 if deployment.version_info: 

943 payload["version_info"] = deployment.version_info.model_dump(mode="json") 

944 

945 await self.request( 

946 "PATCH", 

947 "/deployments/{id}", 

948 path_params={"id": deployment_id}, 

949 json=payload, 

950 ) 

951 

952 async def _create_deployment_from_schema(self, schema: "DeploymentCreate") -> UUID: 1a

953 """ 

954 Create a deployment from a prepared `DeploymentCreate` schema. 

955 """ 

956 

957 # TODO: We are likely to remove this method once we have considered the 

958 # packaging interface for deployments further. 

959 response = await self.request( 

960 "POST", "/deployments/", json=schema.model_dump(mode="json") 

961 ) 

962 deployment_id = response.json().get("id") 

963 if not deployment_id: 

964 raise RequestError(f"Malformed response: {response}") 

965 

966 return UUID(deployment_id) 

967 

968 async def read_deployment( 1a

969 self, 

970 deployment_id: Union[UUID, str], 

971 ) -> "DeploymentResponse": 

972 """ 

973 Query the Prefect API for a deployment by id. 

974 

975 Args: 

976 deployment_id: the deployment ID of interest 

977 

978 Returns: 

979 a Deployment model representation of the deployment 

980 """ 

981 

982 from prefect.client.schemas.responses import DeploymentResponse 

983 

984 if not isinstance(deployment_id, UUID): 

985 try: 

986 deployment_id = UUID(deployment_id) 

987 except ValueError: 

988 raise ValueError(f"Invalid deployment ID: {deployment_id}") 

989 

990 try: 

991 response = await self.request( 

992 "GET", 

993 "/deployments/{id}", 

994 path_params={"id": deployment_id}, 

995 ) 

996 except HTTPStatusError as e: 

997 if e.response.status_code == 404: 

998 raise ObjectNotFound(http_exc=e) from e 

999 else: 

1000 raise 

1001 return DeploymentResponse.model_validate(response.json()) 

1002 

1003 async def read_deployment_by_name( 1a

1004 self, 

1005 name: str, 

1006 ) -> "DeploymentResponse": 

1007 """ 

1008 Query the Prefect API for a deployment by name. 

1009 

1010 Args: 

1011 name: A deployed flow's name: <FLOW_NAME>/<DEPLOYMENT_NAME> 

1012 

1013 Raises: 

1014 ObjectNotFound: If request returns 404 

1015 RequestError: If request fails 

1016 

1017 Returns: 

1018 a Deployment model representation of the deployment 

1019 """ 

1020 from prefect.client.schemas.responses import DeploymentResponse 

1021 

1022 try: 

1023 flow_name, deployment_name = name.split("/") 

1024 response = await self.request( 

1025 "GET", 

1026 "/deployments/name/{flow_name}/{deployment_name}", 

1027 path_params={ 

1028 "flow_name": flow_name, 

1029 "deployment_name": deployment_name, 

1030 }, 

1031 ) 

1032 except (HTTPStatusError, ValueError) as e: 

1033 if isinstance(e, HTTPStatusError) and e.response.status_code == 404: 

1034 raise ObjectNotFound(http_exc=e) from e 

1035 elif isinstance(e, ValueError): 

1036 raise ValueError( 

1037 f"Invalid deployment name format: {name}. Expected format: <FLOW_NAME>/<DEPLOYMENT_NAME>" 

1038 ) from e 

1039 else: 

1040 raise 

1041 

1042 return DeploymentResponse.model_validate(response.json()) 

1043 

1044 async def read_deployments( 1a

1045 self, 

1046 *, 

1047 flow_filter: "FlowFilter | None" = None, 

1048 flow_run_filter: "FlowRunFilter | None" = None, 

1049 task_run_filter: "TaskRunFilter | None" = None, 

1050 deployment_filter: "DeploymentFilter | None" = None, 

1051 work_pool_filter: "WorkPoolFilter | None" = None, 

1052 work_queue_filter: "WorkQueueFilter | None" = None, 

1053 limit: int | None = None, 

1054 sort: "DeploymentSort | None" = None, 

1055 offset: int = 0, 

1056 ) -> list["DeploymentResponse"]: 

1057 """ 

1058 Query the Prefect API for deployments. Only deployments matching all 

1059 the provided criteria will be returned. 

1060 

1061 Args: 

1062 flow_filter: filter criteria for flows 

1063 flow_run_filter: filter criteria for flow runs 

1064 task_run_filter: filter criteria for task runs 

1065 deployment_filter: filter criteria for deployments 

1066 work_pool_filter: filter criteria for work pools 

1067 work_queue_filter: filter criteria for work pool queues 

1068 limit: a limit for the deployment query 

1069 offset: an offset for the deployment query 

1070 

1071 Returns: 

1072 a list of Deployment model representations 

1073 of the deployments 

1074 """ 

1075 from prefect.client.schemas.responses import DeploymentResponse 

1076 

1077 body: dict[str, Any] = { 

1078 "flows": flow_filter.model_dump(mode="json") if flow_filter else None, 

1079 "flow_runs": ( 

1080 flow_run_filter.model_dump(mode="json", exclude_unset=True) 

1081 if flow_run_filter 

1082 else None 

1083 ), 

1084 "task_runs": ( 

1085 task_run_filter.model_dump(mode="json") if task_run_filter else None 

1086 ), 

1087 "deployments": ( 

1088 deployment_filter.model_dump(mode="json") if deployment_filter else None 

1089 ), 

1090 "work_pools": ( 

1091 work_pool_filter.model_dump(mode="json") if work_pool_filter else None 

1092 ), 

1093 "work_pool_queues": ( 

1094 work_queue_filter.model_dump(mode="json") if work_queue_filter else None 

1095 ), 

1096 "limit": limit, 

1097 "offset": offset, 

1098 "sort": sort, 

1099 } 

1100 

1101 response = await self.request("POST", "/deployments/filter", json=body) 

1102 return DeploymentResponse.model_validate_list(response.json()) 

1103 

1104 async def delete_deployment( 1a

1105 self, 

1106 deployment_id: UUID, 

1107 ) -> None: 

1108 """ 

1109 Delete deployment by id. 

1110 

1111 Args: 

1112 deployment_id: The deployment id of interest. 

1113 Raises: 

1114 ObjectNotFound: If request returns 404 

1115 RequestError: If requests fails 

1116 """ 

1117 try: 

1118 await self.request( 

1119 "DELETE", 

1120 "/deployments/{id}", 

1121 path_params={"id": deployment_id}, 

1122 ) 

1123 except HTTPStatusError as e: 

1124 if e.response.status_code == 404: 

1125 raise ObjectNotFound(http_exc=e) from e 

1126 else: 

1127 raise 

1128 

1129 async def create_deployment_schedules( 1a

1130 self, 

1131 deployment_id: UUID, 

1132 schedules: list[tuple["SCHEDULE_TYPES", bool]], 

1133 ) -> list["DeploymentSchedule"]: 

1134 """ 

1135 Create deployment schedules. 

1136 

1137 Args: 

1138 deployment_id: the deployment ID 

1139 schedules: a list of tuples containing the schedule to create 

1140 and whether or not it should be active. 

1141 

1142 Raises: 

1143 RequestError: if the schedules were not created for any reason 

1144 

1145 Returns: 

1146 the list of schedules created in the backend 

1147 """ 

1148 from prefect.client.schemas.actions import DeploymentScheduleCreate 

1149 from prefect.client.schemas.objects import DeploymentSchedule 

1150 

1151 deployment_schedule_create = [ 

1152 DeploymentScheduleCreate(schedule=schedule[0], active=schedule[1]) 

1153 for schedule in schedules 

1154 ] 

1155 

1156 json = [ 

1157 deployment_schedule_create.model_dump(mode="json") 

1158 for deployment_schedule_create in deployment_schedule_create 

1159 ] 

1160 response = await self.request( 

1161 "POST", 

1162 "/deployments/{id}/schedules", 

1163 path_params={"id": deployment_id}, 

1164 json=json, 

1165 ) 

1166 return DeploymentSchedule.model_validate_list(response.json()) 

1167 

1168 async def read_deployment_schedules( 1a

1169 self, 

1170 deployment_id: UUID, 

1171 ) -> list["DeploymentSchedule"]: 

1172 """ 

1173 Query the Prefect API for a deployment's schedules. 

1174 

1175 Args: 

1176 deployment_id: the deployment ID 

1177 

1178 Returns: 

1179 a list of DeploymentSchedule model representations of the deployment schedules 

1180 """ 

1181 from prefect.client.schemas.objects import DeploymentSchedule 

1182 

1183 try: 

1184 response = await self.request( 

1185 "GET", 

1186 "/deployments/{id}/schedules", 

1187 path_params={"id": deployment_id}, 

1188 ) 

1189 except HTTPStatusError as e: 

1190 if e.response.status_code == 404: 

1191 raise ObjectNotFound(http_exc=e) from e 

1192 else: 

1193 raise 

1194 return DeploymentSchedule.model_validate_list(response.json()) 

1195 

1196 async def update_deployment_schedule( 1a

1197 self, 

1198 deployment_id: UUID, 

1199 schedule_id: UUID, 

1200 active: bool | None = None, 

1201 schedule: "SCHEDULE_TYPES | None" = None, 

1202 ) -> None: 

1203 """ 

1204 Update a deployment schedule by ID. 

1205 

1206 Args: 

1207 deployment_id: the deployment ID 

1208 schedule_id: the deployment schedule ID of interest 

1209 active: whether or not the schedule should be active 

1210 schedule: the cron, rrule, or interval schedule this deployment schedule should use 

1211 """ 

1212 from prefect.client.schemas.actions import DeploymentScheduleUpdate 

1213 

1214 kwargs: dict[str, Any] = {} 

1215 if active is not None: 

1216 kwargs["active"] = active 

1217 if schedule is not None: 

1218 kwargs["schedule"] = schedule 

1219 

1220 deployment_schedule_update = DeploymentScheduleUpdate(**kwargs) 

1221 json = deployment_schedule_update.model_dump(mode="json", exclude_unset=True) 

1222 

1223 try: 

1224 await self.request( 

1225 "PATCH", 

1226 "/deployments/{id}/schedules/{schedule_id}", 

1227 path_params={"id": deployment_id, "schedule_id": schedule_id}, 

1228 json=json, 

1229 ) 

1230 except HTTPStatusError as e: 

1231 if e.response.status_code == 404: 

1232 raise ObjectNotFound(http_exc=e) from e 

1233 else: 

1234 raise 

1235 

1236 async def delete_deployment_schedule( 1a

1237 self, 

1238 deployment_id: UUID, 

1239 schedule_id: UUID, 

1240 ) -> None: 

1241 """ 

1242 Delete a deployment schedule. 

1243 

1244 Args: 

1245 deployment_id: the deployment ID 

1246 schedule_id: the ID of the deployment schedule to delete. 

1247 

1248 Raises: 

1249 RequestError: if the schedules were not deleted for any reason 

1250 """ 

1251 try: 

1252 await self.request( 

1253 "DELETE", 

1254 "/deployments/{id}/schedules/{schedule_id}", 

1255 path_params={"id": deployment_id, "schedule_id": schedule_id}, 

1256 ) 

1257 except HTTPStatusError as e: 

1258 if e.response.status_code == 404: 

1259 raise ObjectNotFound(http_exc=e) from e 

1260 else: 

1261 raise 

1262 

1263 async def get_scheduled_flow_runs_for_deployments( 1a

1264 self, 

1265 deployment_ids: list[UUID], 

1266 scheduled_before: "datetime.datetime | None" = None, 

1267 limit: int | None = None, 

1268 ) -> list["FlowRun"]: 

1269 from prefect.client.schemas.objects import FlowRun 

1270 

1271 body: dict[str, Any] = dict(deployment_ids=[str(id) for id in deployment_ids]) 

1272 if scheduled_before: 

1273 body["scheduled_before"] = str(scheduled_before) 

1274 if limit: 

1275 body["limit"] = limit 

1276 

1277 response = await self.request( 

1278 "POST", 

1279 "/deployments/get_scheduled_flow_runs", 

1280 json=body, 

1281 ) 

1282 

1283 return FlowRun.model_validate_list(response.json()) 

1284 

1285 async def create_flow_run_from_deployment( 1a

1286 self, 

1287 deployment_id: UUID, 

1288 *, 

1289 parameters: dict[str, Any] | None = None, 

1290 context: dict[str, Any] | None = None, 

1291 state: State[Any] | None = None, 

1292 name: str | None = None, 

1293 tags: Iterable[str] | None = None, 

1294 idempotency_key: str | None = None, 

1295 parent_task_run_id: UUID | None = None, 

1296 work_queue_name: str | None = None, 

1297 job_variables: dict[str, Any] | None = None, 

1298 labels: "KeyValueLabelsField | None" = None, 

1299 ) -> "FlowRun": 

1300 """ 

1301 Create a flow run for a deployment. 

1302 

1303 Args: 

1304 deployment_id: The deployment ID to create the flow run from 

1305 parameters: Parameter overrides for this flow run. Merged with the 

1306 deployment defaults 

1307 context: Optional run context data 

1308 state: The initial state for the run. If not provided, defaults to 

1309 `Scheduled` for now. Should always be a `Scheduled` type. 

1310 name: An optional name for the flow run. If not provided, the server will 

1311 generate a name. 

1312 tags: An optional iterable of tags to apply to the flow run; these tags 

1313 are merged with the deployment's tags. 

1314 idempotency_key: Optional idempotency key for creation of the flow run. 

1315 If the key matches the key of an existing flow run, the existing run will 

1316 be returned instead of creating a new one. 

1317 parent_task_run_id: if a subflow run is being created, the placeholder task 

1318 run identifier in the parent flow 

1319 work_queue_name: An optional work queue name to add this run to. If not provided, 

1320 will default to the deployment's set work queue. If one is provided that does not 

1321 exist, a new work queue will be created within the deployment's work pool. 

1322 job_variables: Optional variables that will be supplied to the flow run job. 

1323 

1324 Raises: 

1325 RequestError: if the Prefect API does not successfully create a run for any reason 

1326 

1327 Returns: 

1328 The flow run model 

1329 """ 

1330 from prefect.client.schemas.actions import DeploymentFlowRunCreate 

1331 from prefect.client.schemas.objects import FlowRun 

1332 from prefect.states import Scheduled, to_state_create 

1333 

1334 parameters = parameters or {} 

1335 context = context or {} 

1336 state = state or Scheduled() 

1337 tags = tags or [] 

1338 labels = labels or {} 

1339 

1340 flow_run_create = DeploymentFlowRunCreate( 

1341 parameters=parameters, 

1342 context=context, 

1343 state=to_state_create(state), 

1344 tags=list(tags), 

1345 name=name, 

1346 idempotency_key=idempotency_key, 

1347 parent_task_run_id=parent_task_run_id, 

1348 job_variables=job_variables, 

1349 labels=labels, 

1350 ) 

1351 

1352 # done separately to avoid including this field in payloads sent to older API versions 

1353 if work_queue_name: 

1354 flow_run_create.work_queue_name = work_queue_name 

1355 

1356 response = await self.request( 

1357 "POST", 

1358 "/deployments/{id}/create_flow_run", 

1359 path_params={"id": deployment_id}, 

1360 json=flow_run_create.model_dump(mode="json", exclude_unset=True), 

1361 ) 

1362 return FlowRun.model_validate(response.json()) 

1363 

1364 async def create_deployment_branch( 1a

1365 self, 

1366 deployment_id: UUID, 

1367 branch: str, 

1368 options: "DeploymentBranchingOptions | None" = None, 

1369 overrides: "DeploymentUpdate | None" = None, 

1370 ) -> UUID: 

1371 from prefect.client.schemas.actions import DeploymentBranch 

1372 from prefect.client.schemas.objects import DeploymentBranchingOptions 

1373 

1374 response = await self.request( 

1375 "POST", 

1376 "/deployments/{id}/branch", 

1377 path_params={"id": deployment_id}, 

1378 json=DeploymentBranch( 

1379 branch=branch, 

1380 options=options or DeploymentBranchingOptions(), 

1381 overrides=overrides, 

1382 ).model_dump(mode="json", exclude_unset=True), 

1383 ) 

1384 return UUID(response.json().get("id"))