Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/deployments.py: 39%

281 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Routes for interacting with Deployment objects. 

3""" 

4 

5import datetime 1b

6from typing import List, Optional 1b

7from uuid import UUID 1b

8 

9import jsonschema.exceptions 1b

10import sqlalchemy as sa 1b

11from fastapi import Body, Depends, HTTPException, Path, Response 1b

12 

13import prefect.server.api.dependencies as dependencies 1b

14import prefect.server.models as models 1b

15import prefect.server.schemas as schemas 1b

16from prefect._internal.compatibility.starlette import status 1b

17from prefect.server.api.validation import ( 1b

18 validate_job_variables_for_deployment, 

19 validate_job_variables_for_deployment_flow_run, 

20) 

21from prefect.server.api.workers import WorkerLookups 1b

22from prefect.server.database import PrefectDBInterface, provide_database_interface 1b

23from prefect.server.exceptions import MissingVariableError, ObjectNotFoundError 1b

24from prefect.server.models.deployments import mark_deployments_ready 1b

25from prefect.server.models.workers import DEFAULT_AGENT_WORK_POOL_NAME 1b

26from prefect.server.schemas.responses import DeploymentPaginationResponse 1b

27from prefect.server.utilities.server import PrefectRouter 1b

28from prefect.types import DateTime 1b

29from prefect.types._datetime import now 1b

30from prefect.utilities.schema_tools.hydration import ( 1b

31 HydrationContext, 

32 HydrationError, 

33 hydrate, 

34) 

35from prefect.utilities.schema_tools.validation import ( 1b

36 CircularSchemaRefError, 

37 ValidationError, 

38 validate, 

39) 

40 

41router: PrefectRouter = PrefectRouter(prefix="/deployments", tags=["Deployments"]) 1b

42 

43 

44def _multiple_schedules_error(deployment_id) -> HTTPException: 1b

45 return HTTPException( 

46 status.HTTP_422_UNPROCESSABLE_ENTITY, 

47 detail=( 

48 "Error updating deployment: " 

49 f"Deployment {deployment_id!r} has multiple schedules. " 

50 "Please use the UI or update your client to adjust this " 

51 "deployment's schedules.", 

52 ), 

53 ) 

54 

55 

56@router.post("/") 1b

57async def create_deployment( 1b

58 deployment: schemas.actions.DeploymentCreate, 

59 response: Response, 

60 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

61 created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by), 

62 updated_by: Optional[schemas.core.UpdatedBy] = Depends(dependencies.get_updated_by), 

63 db: PrefectDBInterface = Depends(provide_database_interface), 

64) -> schemas.responses.DeploymentResponse: 

65 """ 

66 Creates a new deployment from the provided schema. If a deployment with 

67 the same name and flow_id already exists, the deployment is updated. 

68 

69 If the deployment has an active schedule, flow runs will be scheduled. 

70 When upserting, any scheduled runs from the existing deployment will be deleted. 

71 

72 For more information, see https://docs.prefect.io/v3/concepts/deployments. 

73 """ 

74 

75 data = deployment.model_dump(exclude_unset=True) 1adc

76 data["created_by"] = created_by.model_dump() if created_by else None 1adc

77 data["updated_by"] = updated_by.model_dump() if created_by else None 1adc

78 

79 async with db.session_context(begin_transaction=True) as session: 1adc

80 if ( 

81 deployment.work_pool_name 

82 and deployment.work_pool_name != DEFAULT_AGENT_WORK_POOL_NAME 

83 ): 

84 # Make sure that deployment is valid before beginning creation process 

85 work_pool = await models.workers.read_work_pool_by_name( 1ac

86 session=session, work_pool_name=deployment.work_pool_name 

87 ) 

88 if work_pool is None: 

89 raise HTTPException( 

90 status_code=status.HTTP_404_NOT_FOUND, 

91 detail=f'Work pool "{deployment.work_pool_name}" not found.', 

92 ) 

93 

94 await validate_job_variables_for_deployment( 

95 session, 

96 work_pool, 

97 deployment, 

98 ) 

99 

100 # hydrate the input model into a full model 

101 deployment_dict: dict = deployment.model_dump( 1adc

102 exclude={"work_pool_name"}, 

103 exclude_unset=True, 

104 ) 

105 

106 requested_concurrency_limit = deployment_dict.pop( 1adc

107 "global_concurrency_limit_id", "unset" 

108 ) 

109 if requested_concurrency_limit != "unset": 1adc

110 if requested_concurrency_limit: 1adc

111 concurrency_limit = ( 

112 await models.concurrency_limits_v2.read_concurrency_limit( 

113 session=session, 

114 concurrency_limit_id=requested_concurrency_limit, 

115 ) 

116 ) 

117 

118 if not concurrency_limit: 

119 raise HTTPException( 

120 status_code=status.HTTP_404_NOT_FOUND, 

121 detail="Concurrency limit not found", 

122 ) 

123 

124 deployment_dict["concurrency_limit_id"] = requested_concurrency_limit 1adc

125 

126 if deployment.work_pool_name and deployment.work_queue_name: 126 ↛ 129line 126 didn't jump to line 129 because the condition on line 126 was never true1adc

127 # If a specific pool name/queue name combination was provided, get the 

128 # ID for that work pool queue. 

129 deployment_dict[ 

130 "work_queue_id" 

131 ] = await worker_lookups._get_work_queue_id_from_name( 

132 session=session, 

133 work_pool_name=deployment.work_pool_name, 

134 work_queue_name=deployment.work_queue_name, 

135 create_queue_if_not_found=True, 

136 ) 

137 elif deployment.work_pool_name: 137 ↛ 140line 137 didn't jump to line 140 because the condition on line 137 was never true1adc

138 # If just a pool name was provided, get the ID for its default 

139 # work pool queue. 

140 deployment_dict[ 

141 "work_queue_id" 

142 ] = await worker_lookups._get_default_work_queue_id_from_work_pool_name( 

143 session=session, 

144 work_pool_name=deployment.work_pool_name, 

145 ) 

146 elif deployment.work_queue_name: 1adc

147 # If just a queue name was provided, ensure that the queue exists and 

148 # get its ID. 

149 work_queue = await models.work_queues.ensure_work_queue_exists( 1adc

150 session=session, name=deployment.work_queue_name 

151 ) 

152 deployment_dict["work_queue_id"] = work_queue.id 

153 

154 deployment = schemas.core.Deployment(**deployment_dict) 1adc

155 # check to see if relevant blocks exist, allowing us throw a useful error message 

156 # for debugging 

157 if deployment.infrastructure_document_id is not None: 1adc

158 infrastructure_block = ( 

159 await models.block_documents.read_block_document_by_id( 

160 session=session, 

161 block_document_id=deployment.infrastructure_document_id, 

162 ) 

163 ) 

164 if not infrastructure_block: 

165 raise HTTPException( 

166 status_code=status.HTTP_409_CONFLICT, 

167 detail=( 

168 "Error creating deployment. Could not find infrastructure" 

169 f" block with id: {deployment.infrastructure_document_id}. This" 

170 " usually occurs when applying a deployment specification that" 

171 " was built against a different Prefect database / workspace." 

172 ), 

173 ) 

174 

175 if deployment.storage_document_id is not None: 1adc

176 storage_block = await models.block_documents.read_block_document_by_id( 1adc

177 session=session, 

178 block_document_id=deployment.storage_document_id, 

179 ) 

180 if not storage_block: 

181 raise HTTPException( 

182 status_code=status.HTTP_409_CONFLICT, 

183 detail=( 

184 "Error creating deployment. Could not find storage block with" 

185 f" id: {deployment.storage_document_id}. This usually occurs" 

186 " when applying a deployment specification that was built" 

187 " against a different Prefect database / workspace." 

188 ), 

189 ) 

190 

191 right_now = now("UTC") 1adc

192 model = await models.deployments.create_deployment( 1adc

193 session=session, deployment=deployment 

194 ) 

195 

196 if model.created >= right_now: 

197 response.status_code = status.HTTP_201_CREATED 

198 

199 return schemas.responses.DeploymentResponse.model_validate( 

200 model, from_attributes=True 

201 ) 

202 

203 

204@router.patch("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1b

205async def update_deployment( 1b

206 deployment: schemas.actions.DeploymentUpdate, 

207 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

208 db: PrefectDBInterface = Depends(provide_database_interface), 

209) -> None: 

210 async with db.session_context(begin_transaction=True) as session: 1adc

211 existing_deployment = await models.deployments.read_deployment( 1adc

212 session=session, deployment_id=deployment_id 

213 ) 

214 if not existing_deployment: 

215 raise HTTPException( 

216 status.HTTP_404_NOT_FOUND, detail="Deployment not found." 

217 ) 

218 

219 # Checking how we should handle schedule updates 

220 # If not all existing schedules have slugs then we'll fall back to the existing logic where are schedules are recreated to match the request. 

221 # If the existing schedules have slugs, but not all provided schedules have slugs, then we'll return a 422 to avoid accidentally blowing away schedules. 

222 # Otherwise, we'll use the existing slugs and the provided slugs to make targeted updates to the deployment's schedules. 

223 schedules_to_patch: list[schemas.actions.DeploymentScheduleUpdate] = [] 

224 schedules_to_create: list[schemas.actions.DeploymentScheduleUpdate] = [] 

225 all_provided_have_slugs = all( 1dc

226 schedule.slug is not None for schedule in deployment.schedules or [] 

227 ) 

228 all_existing_have_slugs = existing_deployment.schedules and all( 

229 schedule.slug is not None for schedule in existing_deployment.schedules 

230 ) 

231 if all_provided_have_slugs and all_existing_have_slugs: 

232 current_slugs = [ 

233 schedule.slug for schedule in existing_deployment.schedules 

234 ] 

235 

236 for schedule in deployment.schedules: 

237 if schedule.slug in current_slugs: 

238 schedules_to_patch.append(schedule) 

239 elif schedule.schedule: 

240 schedules_to_create.append(schedule) 

241 else: 

242 raise HTTPException( 

243 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

244 detail="Unable to create new deployment schedules without a schedule configuration.", 

245 ) 

246 # Clear schedules to handle their update/creation separately 

247 deployment.schedules = None 

248 elif not all_provided_have_slugs and all_existing_have_slugs: 

249 raise HTTPException( 

250 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

251 detail="Please provide a slug for each schedule in your request to ensure schedules are updated correctly.", 

252 ) 

253 

254 if deployment.work_pool_name: 

255 # Make sure that deployment is valid before beginning creation process 

256 work_pool = await models.workers.read_work_pool_by_name( 

257 session=session, work_pool_name=deployment.work_pool_name 

258 ) 

259 try: 

260 deployment.check_valid_configuration(work_pool.base_job_template) 

261 except (MissingVariableError, jsonschema.exceptions.ValidationError) as exc: 

262 raise HTTPException( 

263 status_code=status.HTTP_409_CONFLICT, 

264 detail=f"Error creating deployment: {exc!r}", 

265 ) 

266 

267 if deployment.parameters is not None: 

268 try: 

269 dehydrated_params = deployment.parameters 

270 ctx = await HydrationContext.build( 

271 session=session, 

272 raise_on_error=True, 

273 render_jinja=True, 

274 render_workspace_variables=True, 

275 ) 

276 parameters = hydrate(dehydrated_params, ctx) 

277 deployment.parameters = parameters 

278 except HydrationError as exc: 

279 raise HTTPException( 

280 status.HTTP_400_BAD_REQUEST, 

281 detail=f"Error hydrating deployment parameters: {exc}", 

282 ) 

283 else: 

284 parameters = existing_deployment.parameters 

285 

286 enforce_parameter_schema = ( 

287 deployment.enforce_parameter_schema 

288 if deployment.enforce_parameter_schema is not None 

289 else existing_deployment.enforce_parameter_schema 

290 ) 

291 if enforce_parameter_schema: 

292 # ensure that the new parameters conform to the proposed schema 

293 if deployment.parameter_openapi_schema: 

294 openapi_schema = deployment.parameter_openapi_schema 

295 else: 

296 openapi_schema = existing_deployment.parameter_openapi_schema 

297 

298 if not isinstance(openapi_schema, dict): 

299 raise HTTPException( 

300 status.HTTP_409_CONFLICT, 

301 detail=( 

302 "Error updating deployment: Cannot update parameters because" 

303 " parameter schema enforcement is enabled and the deployment" 

304 " does not have a valid parameter schema." 

305 ), 

306 ) 

307 try: 

308 validate( 

309 parameters, 

310 openapi_schema, 

311 raise_on_error=True, 

312 ignore_required=True, 

313 ) 

314 except ValidationError as exc: 

315 raise HTTPException( 

316 status.HTTP_409_CONFLICT, 

317 detail=f"Error updating deployment: {exc}", 

318 ) 

319 except CircularSchemaRefError: 

320 raise HTTPException( 

321 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

322 detail="Invalid schema: Unable to validate schema with circular references.", 

323 ) 

324 

325 if deployment.global_concurrency_limit_id: 

326 concurrency_limit = ( 

327 await models.concurrency_limits_v2.read_concurrency_limit( 

328 session=session, 

329 concurrency_limit_id=deployment.global_concurrency_limit_id, 

330 ) 

331 ) 

332 

333 if not concurrency_limit: 

334 raise HTTPException( 

335 status_code=status.HTTP_404_NOT_FOUND, 

336 detail="Concurrency limit not found", 

337 ) 

338 

339 result = await models.deployments.update_deployment( 

340 session=session, 

341 deployment_id=deployment_id, 

342 deployment=deployment, 

343 ) 

344 

345 for schedule in schedules_to_patch: 

346 await models.deployments.update_deployment_schedule( 

347 session=session, 

348 deployment_id=deployment_id, 

349 schedule=schedule, 

350 deployment_schedule_slug=schedule.slug, 

351 ) 

352 if schedules_to_create: 

353 await models.deployments.create_deployment_schedules( 

354 session=session, 

355 deployment_id=deployment_id, 

356 schedules=[ 

357 schemas.actions.DeploymentScheduleCreate( 

358 schedule=schedule.schedule, # type: ignore We will raise above if schedule is not provided 

359 active=schedule.active if schedule.active is not None else True, 

360 slug=schedule.slug, 

361 parameters=schedule.parameters, 

362 ) 

363 for schedule in schedules_to_create 

364 ], 

365 ) 

366 if not result: 

367 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Deployment not found.") 

368 

369 

370@router.get("/name/{flow_name}/{deployment_name}") 1b

371async def read_deployment_by_name( 1b

372 flow_name: str = Path(..., description="The name of the flow"), 

373 deployment_name: str = Path(..., description="The name of the deployment"), 

374 db: PrefectDBInterface = Depends(provide_database_interface), 

375) -> schemas.responses.DeploymentResponse: 

376 """ 

377 Get a deployment using the name of the flow and the deployment. 

378 """ 

379 async with db.session_context() as session: 1ac

380 deployment = await models.deployments.read_deployment_by_name( 1ac

381 session=session, name=deployment_name, flow_name=flow_name 

382 ) 

383 if not deployment: 

384 raise HTTPException( 

385 status.HTTP_404_NOT_FOUND, detail="Deployment not found" 

386 ) 

387 return schemas.responses.DeploymentResponse.model_validate( 

388 deployment, from_attributes=True 

389 ) 

390 

391 

392@router.get("/{id:uuid}") 1b

393async def read_deployment( 1b

394 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

395 db: PrefectDBInterface = Depends(provide_database_interface), 

396) -> schemas.responses.DeploymentResponse: 

397 """ 

398 Get a deployment by id. 

399 """ 

400 async with db.session_context() as session: 1adc

401 deployment = await models.deployments.read_deployment( 1adc

402 session=session, deployment_id=deployment_id 

403 ) 

404 if not deployment: 

405 raise HTTPException( 

406 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found" 

407 ) 

408 return schemas.responses.DeploymentResponse.model_validate( 

409 deployment, from_attributes=True 

410 ) 

411 

412 

413@router.post("/filter") 1b

414async def read_deployments( 1b

415 limit: int = dependencies.LimitBody(), 

416 offset: int = Body(0, ge=0), 

417 flows: Optional[schemas.filters.FlowFilter] = None, 

418 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

419 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

420 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

421 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

422 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

423 sort: schemas.sorting.DeploymentSort = Body( 

424 schemas.sorting.DeploymentSort.NAME_ASC 

425 ), 

426 db: PrefectDBInterface = Depends(provide_database_interface), 

427) -> List[schemas.responses.DeploymentResponse]: 

428 """ 

429 Query for deployments. 

430 """ 

431 async with db.session_context() as session: 1adgc

432 response = await models.deployments.read_deployments( 1adgc

433 session=session, 

434 offset=offset, 

435 sort=sort, 

436 limit=limit, 

437 flow_filter=flows, 

438 flow_run_filter=flow_runs, 

439 task_run_filter=task_runs, 

440 deployment_filter=deployments, 

441 work_pool_filter=work_pools, 

442 work_queue_filter=work_pool_queues, 

443 ) 

444 return [ 

445 schemas.responses.DeploymentResponse.model_validate( 

446 deployment, from_attributes=True 

447 ) 

448 for deployment in response 

449 ] 

450 

451 

452@router.post("/paginate") 1b

453async def paginate_deployments( 1b

454 limit: int = dependencies.LimitBody(), 

455 page: int = Body(1, ge=1), 

456 flows: Optional[schemas.filters.FlowFilter] = None, 

457 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

458 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

459 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

460 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

461 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

462 sort: schemas.sorting.DeploymentSort = Body( 

463 schemas.sorting.DeploymentSort.NAME_ASC 

464 ), 

465 db: PrefectDBInterface = Depends(provide_database_interface), 

466) -> DeploymentPaginationResponse: 

467 """ 

468 Pagination query for flow runs. 

469 """ 

470 offset = (page - 1) * limit 1adec

471 

472 async with db.session_context() as session: 1adec

473 response = await models.deployments.read_deployments( 1adec

474 session=session, 

475 offset=offset, 

476 sort=sort, 

477 limit=limit, 

478 flow_filter=flows, 

479 flow_run_filter=flow_runs, 

480 task_run_filter=task_runs, 

481 deployment_filter=deployments, 

482 work_pool_filter=work_pools, 

483 work_queue_filter=work_pool_queues, 

484 ) 

485 

486 count = await models.deployments.count_deployments( 1adec

487 session=session, 

488 flow_filter=flows, 

489 flow_run_filter=flow_runs, 

490 task_run_filter=task_runs, 

491 deployment_filter=deployments, 

492 work_pool_filter=work_pools, 

493 work_queue_filter=work_pool_queues, 

494 ) 

495 

496 results = [ 1adec

497 schemas.responses.DeploymentResponse.model_validate( 

498 deployment, from_attributes=True 

499 ) 

500 for deployment in response 

501 ] 

502 

503 return DeploymentPaginationResponse( 1adec

504 results=results, 

505 count=count, 

506 limit=limit, 

507 pages=(count + limit - 1) // limit, 

508 page=page, 

509 ) 

510 

511 

512@router.post("/get_scheduled_flow_runs") 1b

513async def get_scheduled_flow_runs_for_deployments( 1b

514 docket: dependencies.Docket, 

515 deployment_ids: list[UUID] = Body( 

516 default=..., description="The deployment IDs to get scheduled runs for" 

517 ), 

518 scheduled_before: DateTime = Body( 

519 None, description="The maximum time to look for scheduled flow runs" 

520 ), 

521 limit: int = dependencies.LimitBody(), 

522 db: PrefectDBInterface = Depends(provide_database_interface), 

523) -> list[schemas.responses.FlowRunResponse]: 

524 """ 

525 Get scheduled runs for a set of deployments. Used by a runner to poll for work. 

526 """ 

527 async with db.session_context() as session: 1afdc

528 orm_flow_runs = await models.flow_runs.read_flow_runs( 1afdc

529 session=session, 

530 limit=limit, 

531 deployment_filter=schemas.filters.DeploymentFilter( 

532 id=schemas.filters.DeploymentFilterId(any_=deployment_ids), 

533 ), 

534 flow_run_filter=schemas.filters.FlowRunFilter( 

535 next_scheduled_start_time=schemas.filters.FlowRunFilterNextScheduledStartTime( 

536 before_=scheduled_before 

537 ), 

538 state=schemas.filters.FlowRunFilterState( 

539 type=schemas.filters.FlowRunFilterStateType( 

540 any_=[schemas.states.StateType.SCHEDULED] 

541 ) 

542 ), 

543 ), 

544 sort=schemas.sorting.FlowRunSort.NEXT_SCHEDULED_START_TIME_ASC, 

545 ) 

546 

547 flow_run_responses = [ 

548 schemas.responses.FlowRunResponse.model_validate( 

549 orm_flow_run, from_attributes=True 

550 ) 

551 for orm_flow_run in orm_flow_runs 

552 ] 

553 

554 await docket.add(mark_deployments_ready)( 1afdc

555 deployment_ids=deployment_ids, 

556 ) 

557 

558 return flow_run_responses 1afdc

559 

560 

561@router.post("/count") 1b

562async def count_deployments( 1b

563 flows: Optional[schemas.filters.FlowFilter] = None, 

564 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

565 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

566 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

567 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

568 work_pool_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

569 db: PrefectDBInterface = Depends(provide_database_interface), 

570) -> int: 

571 """ 

572 Count deployments. 

573 """ 

574 async with db.session_context() as session: 1ac

575 return await models.deployments.count_deployments( 1ac

576 session=session, 

577 flow_filter=flows, 

578 flow_run_filter=flow_runs, 

579 task_run_filter=task_runs, 

580 deployment_filter=deployments, 

581 work_pool_filter=work_pools, 

582 work_queue_filter=work_pool_queues, 

583 ) 

584 

585 

586@router.delete("/{id:uuid}", status_code=status.HTTP_204_NO_CONTENT) 1b

587async def delete_deployment( 1b

588 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

589 db: PrefectDBInterface = Depends(provide_database_interface), 

590) -> None: 

591 """ 

592 Delete a deployment by id. 

593 """ 

594 async with db.session_context(begin_transaction=True) as session: 1adc

595 result = await models.deployments.delete_deployment( 1adc

596 session=session, deployment_id=deployment_id 

597 ) 

598 if not result: 1adc

599 raise HTTPException( 1adc

600 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found" 

601 ) 

602 

603 

604@router.post("/{id:uuid}/schedule") 1b

605async def schedule_deployment( 1b

606 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

607 start_time: datetime.datetime = Body( 

608 None, description="The earliest date to schedule" 

609 ), 

610 end_time: datetime.datetime = Body(None, description="The latest date to schedule"), 

611 # Workaround for the fact that FastAPI does not let us configure ser_json_timedelta 

612 # to represent timedeltas as floats in JSON. 

613 min_time: float = Body( 

614 None, 

615 description=( 

616 "Runs will be scheduled until at least this long after the `start_time`" 

617 ), 

618 json_schema_extra={"format": "time-delta"}, 

619 ), 

620 min_runs: int = Body(None, description="The minimum number of runs to schedule"), 

621 max_runs: int = Body(None, description="The maximum number of runs to schedule"), 

622 db: PrefectDBInterface = Depends(provide_database_interface), 

623) -> None: 

624 """ 

625 Schedule runs for a deployment. For backfills, provide start/end times in the past. 

626 

627 This function will generate the minimum number of runs that satisfy the min 

628 and max times, and the min and max counts. Specifically, the following order 

629 will be respected. 

630 

631 - Runs will be generated starting on or after the `start_time` 

632 - No more than `max_runs` runs will be generated 

633 - No runs will be generated after `end_time` is reached 

634 - At least `min_runs` runs will be generated 

635 - Runs will be generated until at least `start_time + min_time` is reached 

636 """ 

637 if isinstance(min_time, float): 637 ↛ 638line 637 didn't jump to line 638 because the condition on line 637 was never true1a

638 min_time = datetime.timedelta(seconds=min_time) 

639 

640 async with db.session_context(begin_transaction=True) as session: 1a

641 await models.deployments.schedule_runs( 1a

642 session=session, 

643 deployment_id=deployment_id, 

644 start_time=start_time, 

645 min_time=min_time, 

646 end_time=end_time, 

647 min_runs=min_runs, 

648 max_runs=max_runs, 

649 ) 

650 

651 

652@router.post("/{id:uuid}/resume_deployment") 1b

653async def resume_deployment( 1b

654 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

655 db: PrefectDBInterface = Depends(provide_database_interface), 

656) -> None: 

657 """ 

658 Set a deployment schedule to active. Runs will be scheduled immediately. 

659 """ 

660 async with db.session_context(begin_transaction=True) as session: 1a

661 deployment = await models.deployments.read_deployment( 1a

662 session=session, deployment_id=deployment_id 

663 ) 

664 if not deployment: 

665 raise HTTPException( 

666 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found" 

667 ) 

668 deployment.paused = False 

669 

670 

671@router.post("/{id:uuid}/pause_deployment") 1b

672async def pause_deployment( 1b

673 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

674 db: PrefectDBInterface = Depends(provide_database_interface), 

675) -> None: 

676 """ 

677 Set a deployment schedule to inactive. Any auto-scheduled runs still in a Scheduled 

678 state will be deleted. 

679 """ 

680 async with db.session_context(begin_transaction=False) as session: 1a

681 deployment = await models.deployments.read_deployment( 1a

682 session=session, deployment_id=deployment_id 

683 ) 

684 if not deployment: 

685 raise HTTPException( 

686 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found" 

687 ) 

688 deployment.paused = True 

689 

690 # commit here to make the inactive schedule "visible" to the scheduler service 

691 await session.commit() 

692 

693 # delete any auto scheduled runs 

694 await models.deployments._delete_scheduled_runs( 

695 session=session, 

696 deployment_id=deployment_id, 

697 auto_scheduled_only=True, 

698 ) 

699 

700 await session.commit() 

701 

702 

703@router.post("/{id:uuid}/create_flow_run") 1b

704async def create_flow_run_from_deployment( 1b

705 flow_run: schemas.actions.DeploymentFlowRunCreate, 

706 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

707 created_by: Optional[schemas.core.CreatedBy] = Depends(dependencies.get_created_by), 

708 db: PrefectDBInterface = Depends(provide_database_interface), 

709 worker_lookups: WorkerLookups = Depends(WorkerLookups), 

710 response: Response = None, 

711) -> schemas.responses.FlowRunResponse: 

712 """ 

713 Create a flow run from a deployment. 

714 

715 Any parameters not provided will be inferred from the deployment's parameters. 

716 If tags are not provided, the deployment's tags will be used. 

717 

718 If no state is provided, the flow run will be created in a SCHEDULED state. 

719 """ 

720 async with db.session_context(begin_transaction=True) as session: 1afdgc

721 # get relevant info from the deployment 

722 deployment = await models.deployments.read_deployment( 1afdgc

723 session=session, deployment_id=deployment_id 

724 ) 

725 

726 if not deployment: 

727 raise HTTPException( 

728 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found" 

729 ) 

730 

731 try: 

732 dehydrated_params = deployment.parameters 

733 dehydrated_params.update(flow_run.parameters or {}) 

734 ctx = await HydrationContext.build( 

735 session=session, 

736 raise_on_error=True, 

737 render_jinja=True, 

738 render_workspace_variables=True, 

739 ) 

740 parameters = hydrate(dehydrated_params, ctx) 

741 except HydrationError as exc: 

742 raise HTTPException( 

743 status.HTTP_400_BAD_REQUEST, 

744 detail=f"Error hydrating flow run parameters: {exc}", 

745 ) 

746 

747 # default 

748 enforce_parameter_schema = deployment.enforce_parameter_schema 

749 

750 # run override 

751 if flow_run.enforce_parameter_schema is not None: 

752 enforce_parameter_schema = flow_run.enforce_parameter_schema 

753 

754 if enforce_parameter_schema: 

755 if not isinstance(deployment.parameter_openapi_schema, dict): 

756 raise HTTPException( 

757 status.HTTP_409_CONFLICT, 

758 detail=( 

759 "Error updating deployment: Cannot update parameters because" 

760 " parameter schema enforcement is enabled and the deployment" 

761 " does not have a valid parameter schema." 

762 ), 

763 ) 

764 try: 

765 validate( 

766 parameters, deployment.parameter_openapi_schema, raise_on_error=True 

767 ) 

768 except ValidationError as exc: 

769 raise HTTPException( 

770 status.HTTP_409_CONFLICT, 

771 detail=f"Error creating flow run: {exc}", 

772 ) 

773 except CircularSchemaRefError: 

774 raise HTTPException( 

775 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

776 detail="Invalid schema: Unable to validate schema with circular references.", 

777 ) 

778 

779 await validate_job_variables_for_deployment_flow_run( 

780 session, deployment, flow_run 

781 ) 

782 

783 work_queue_name = deployment.work_queue_name 

784 work_queue_id = deployment.work_queue_id 

785 

786 if flow_run.work_queue_name: 

787 # can't mutate the ORM model or else it will commit the changes back 

788 if deployment.work_queue is None or deployment.work_queue.work_pool is None: 

789 raise HTTPException( 

790 status_code=status.HTTP_400_BAD_REQUEST, 

791 detail=f"Cannot create flow run in work queue {flow_run.work_queue_name} because deployment {deployment_id} is not associated with a work pool. Please remove work_pool_name and try again.", 

792 ) 

793 

794 work_queue_id = await worker_lookups._get_work_queue_id_from_name( 

795 session=session, 

796 work_pool_name=deployment.work_queue.work_pool.name, 

797 work_queue_name=flow_run.work_queue_name, 

798 create_queue_if_not_found=True, 

799 ) 

800 work_queue_name = flow_run.work_queue_name 

801 

802 # hydrate the input model into a full flow run / state model 

803 flow_run = schemas.core.FlowRun( 

804 **flow_run.model_dump( 

805 exclude={ 

806 "parameters", 

807 "tags", 

808 "infrastructure_document_id", 

809 "work_queue_name", 

810 "enforce_parameter_schema", 

811 } 

812 ), 

813 flow_id=deployment.flow_id, 

814 deployment_id=deployment.id, 

815 deployment_version=deployment.version, 

816 parameters=parameters, 

817 tags=set(deployment.tags).union(flow_run.tags), 

818 infrastructure_document_id=( 

819 flow_run.infrastructure_document_id 

820 or deployment.infrastructure_document_id 

821 ), 

822 work_queue_name=work_queue_name, 

823 work_queue_id=work_queue_id, 

824 created_by=created_by, 

825 ) 

826 

827 if not flow_run.state: 

828 flow_run.state = schemas.states.Scheduled() 

829 

830 right_now = now("UTC") 

831 model = await models.flow_runs.create_flow_run( 

832 session=session, flow_run=flow_run 

833 ) 

834 if model.created >= right_now: 

835 response.status_code = status.HTTP_201_CREATED 

836 return schemas.responses.FlowRunResponse.model_validate( 

837 model, from_attributes=True 

838 ) 

839 

840 

841# DEPRECATED 

842@router.get("/{id:uuid}/work_queue_check", deprecated=True) 1b

843async def work_queue_check_for_deployment( 1b

844 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

845 db: PrefectDBInterface = Depends(provide_database_interface), 

846) -> List[schemas.core.WorkQueue]: 

847 """ 

848 Get list of work-queues that are able to pick up the specified deployment. 

849 

850 This endpoint is intended to be used by the UI to provide users warnings 

851 about deployments that are unable to be executed because there are no work 

852 queues that will pick up their runs, based on existing filter criteria. It 

853 may be deprecated in the future because there is not a strict relationship 

854 between work queues and deployments. 

855 """ 

856 try: 1a

857 async with db.session_context() as session: 1a

858 work_queues = await models.deployments.check_work_queues_for_deployment( 1a

859 session=session, deployment_id=deployment_id 

860 ) 

861 except ObjectNotFoundError: 1a

862 raise HTTPException( 1a

863 status_code=status.HTTP_404_NOT_FOUND, detail="Deployment not found" 

864 ) 

865 return work_queues 

866 

867 

868@router.get("/{id:uuid}/schedules") 1b

869async def read_deployment_schedules( 1b

870 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

871 db: PrefectDBInterface = Depends(provide_database_interface), 

872) -> List[schemas.core.DeploymentSchedule]: 

873 async with db.session_context() as session: 1a

874 deployment = await models.deployments.read_deployment( 1a

875 session=session, deployment_id=deployment_id 

876 ) 

877 

878 if not deployment: 

879 raise HTTPException( 

880 status.HTTP_404_NOT_FOUND, detail="Deployment not found." 

881 ) 

882 

883 return await models.deployments.read_deployment_schedules( 

884 session=session, 

885 deployment_id=deployment.id, 

886 ) 

887 

888 

889@router.post("/{id:uuid}/schedules", status_code=status.HTTP_201_CREATED) 1b

890async def create_deployment_schedules( 1b

891 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

892 schedules: List[schemas.actions.DeploymentScheduleCreate] = Body( 

893 default=..., description="The schedules to create" 

894 ), 

895 db: PrefectDBInterface = Depends(provide_database_interface), 

896) -> List[schemas.core.DeploymentSchedule]: 

897 async with db.session_context(begin_transaction=True) as session: 1ac

898 deployment = await models.deployments.read_deployment( 1ac

899 session=session, deployment_id=deployment_id 

900 ) 

901 

902 if not deployment: 

903 raise HTTPException( 

904 status.HTTP_404_NOT_FOUND, detail="Deployment not found." 

905 ) 

906 

907 try: 

908 created = await models.deployments.create_deployment_schedules( 

909 session=session, 

910 deployment_id=deployment.id, 

911 schedules=schedules, 

912 ) 

913 except sa.exc.IntegrityError as e: 

914 if "duplicate key value violates unique constraint" in str(e): 

915 raise HTTPException( 

916 status.HTTP_409_CONFLICT, 

917 detail="Schedule slugs must be unique within a deployment.", 

918 ) 

919 raise 

920 return created 

921 

922 

923@router.patch( 1b

924 "/{id:uuid}/schedules/{schedule_id:uuid}", status_code=status.HTTP_204_NO_CONTENT 

925) 

926async def update_deployment_schedule( 1b

927 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

928 schedule_id: UUID = Path(..., description="The schedule id", alias="schedule_id"), 

929 schedule: schemas.actions.DeploymentScheduleUpdate = Body( 

930 default=..., description="The updated schedule" 

931 ), 

932 db: PrefectDBInterface = Depends(provide_database_interface), 

933) -> None: 

934 async with db.session_context(begin_transaction=True) as session: 1ac

935 deployment = await models.deployments.read_deployment( 1ac

936 session=session, deployment_id=deployment_id 

937 ) 

938 

939 if not deployment: 

940 raise HTTPException( 

941 status.HTTP_404_NOT_FOUND, detail="Deployment not found." 

942 ) 

943 

944 updated = await models.deployments.update_deployment_schedule( 

945 session=session, 

946 deployment_id=deployment_id, 

947 deployment_schedule_id=schedule_id, 

948 schedule=schedule, 

949 ) 

950 

951 if not updated: 

952 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Schedule not found.") 

953 

954 await models.deployments._delete_scheduled_runs( 

955 session=session, 

956 deployment_id=deployment_id, 

957 auto_scheduled_only=True, 

958 future_only=True, 

959 ) 

960 

961 

962@router.delete( 1b

963 "/{id:uuid}/schedules/{schedule_id:uuid}", status_code=status.HTTP_204_NO_CONTENT 

964) 

965async def delete_deployment_schedule( 1b

966 deployment_id: UUID = Path(..., description="The deployment id", alias="id"), 

967 schedule_id: UUID = Path(..., description="The schedule id", alias="schedule_id"), 

968 db: PrefectDBInterface = Depends(provide_database_interface), 

969) -> None: 

970 async with db.session_context(begin_transaction=True) as session: 1a

971 deployment = await models.deployments.read_deployment( 1a

972 session=session, deployment_id=deployment_id 

973 ) 

974 

975 if not deployment: 

976 raise HTTPException( 

977 status.HTTP_404_NOT_FOUND, detail="Deployment not found." 

978 ) 

979 

980 deleted = await models.deployments.delete_deployment_schedule( 

981 session=session, 

982 deployment_id=deployment_id, 

983 deployment_schedule_id=schedule_id, 

984 ) 

985 

986 if not deleted: 

987 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Schedule not found.") 

988 

989 await models.deployments._delete_scheduled_runs( 

990 session=session, 

991 deployment_id=deployment_id, 

992 auto_scheduled_only=True, 

993 )