Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/deployments.py: 18%

300 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Functions for interacting with deployment ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6from __future__ import annotations 1a

7 

8import datetime 1a

9import logging 1a

10from collections.abc import Iterable, Sequence 1a

11from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast 1a

12from uuid import UUID 1a

13 

14import sqlalchemy as sa 1a

15from docket import Depends, Retry 1a

16from sqlalchemy import delete, or_, select 1a

17from sqlalchemy.dialects.postgresql import JSONB 1a

18from sqlalchemy.ext.asyncio import AsyncSession 1a

19from sqlalchemy.sql import Select 1a

20 

21from prefect._internal.uuid7 import uuid7 1a

22from prefect.logging import get_logger 1a

23from prefect.server import models, schemas 1a

24from prefect.server.database import ( 1a

25 PrefectDBInterface, 

26 db_injector, 

27 orm_models, 

28 provide_database_interface, 

29) 

30from prefect.server.events.clients import PrefectServerEventsClient 1a

31from prefect.server.exceptions import ObjectNotFoundError 1a

32from prefect.server.models.events import deployment_status_event 1a

33from prefect.server.schemas.statuses import DeploymentStatus 1a

34from prefect.settings import ( 1a

35 PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS, 

36 PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME, 

37 PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS, 

38 PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME, 

39) 

40from prefect.types._datetime import DateTime, now 1a

41 

42T = TypeVar("T", bound=tuple[Any, ...]) 1a

43 

44logger: logging.Logger = get_logger("prefect.server.models.deployments") 1a

45 

46 

47@db_injector 1a

48async def _delete_scheduled_runs( 1a

49 db: PrefectDBInterface, 

50 session: AsyncSession, 

51 deployment_id: UUID, 

52 auto_scheduled_only: bool = False, 

53 future_only: bool = False, 

54) -> None: 

55 """ 

56 This utility function deletes all of a deployment's runs that are in a Scheduled state 

57 and haven't run yet. It should be run any time a deployment is created or 

58 modified in order to ensure that future runs comply with the deployment's latest values. 

59 

60 Args: 

61 deployment_id: the deployment for which we should delete runs. 

62 auto_scheduled_only: if True, only delete auto scheduled runs. Defaults to `False`. 

63 future_only: if True, only delete runs that are scheduled to run in the future. 

64 Defaults to `False`. 

65 """ 

66 delete_query = sa.delete(db.FlowRun).where( 

67 db.FlowRun.deployment_id == deployment_id, 

68 db.FlowRun.state_type == schemas.states.StateType.SCHEDULED.value, 

69 db.FlowRun.state_name != schemas.states.AwaitingConcurrencySlot().name, 

70 db.FlowRun.run_count == 0, 

71 ) 

72 

73 if auto_scheduled_only: 

74 delete_query = delete_query.where( 

75 db.FlowRun.auto_scheduled.is_(True), 

76 ) 

77 

78 if future_only: 

79 delete_query = delete_query.where( 

80 db.FlowRun.next_scheduled_start_time > now("UTC"), 

81 ) 

82 

83 await session.execute(delete_query) 

84 

85 

86@db_injector 1a

87async def create_deployment( 1a

88 db: PrefectDBInterface, 

89 session: AsyncSession, 

90 deployment: schemas.core.Deployment | schemas.actions.DeploymentCreate, 

91) -> Optional[orm_models.Deployment]: 

92 """Upserts a deployment. 

93 

94 Args: 

95 session: a database session 

96 deployment: a deployment model 

97 

98 Returns: 

99 orm_models.Deployment: the newly-created or updated deployment 

100 

101 """ 

102 

103 # set `updated` manually 

104 # known limitation of `on_conflict_do_update`, will not use `Column.onupdate` 

105 # https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#the-set-clause 

106 deployment.updated = now("UTC") # type: ignore[assignment] 

107 

108 deployment.labels = await with_system_labels_for_deployment(session, deployment) 

109 

110 schedules = deployment.schedules 

111 insert_values = deployment.model_dump_for_orm( 

112 exclude_unset=True, exclude={"schedules", "version_info"} 

113 ) 

114 

115 requested_concurrency_limit = insert_values.pop("concurrency_limit", "unset") 

116 

117 # The job_variables field in client and server schemas is named 

118 # infra_overrides in the database. 

119 job_variables = insert_values.pop("job_variables", None) 

120 if job_variables: 

121 insert_values["infra_overrides"] = job_variables 

122 

123 conflict_update_fields = deployment.model_dump_for_orm( 

124 exclude_unset=True, 

125 exclude={ 

126 "id", 

127 "created", 

128 "created_by", 

129 "schedules", 

130 "job_variables", 

131 "concurrency_limit", 

132 "version_info", 

133 }, 

134 ) 

135 if job_variables: 

136 conflict_update_fields["infra_overrides"] = job_variables 

137 

138 insert_stmt = ( 

139 db.queries.insert(db.Deployment) 

140 .values(**insert_values) 

141 .on_conflict_do_update( 

142 index_elements=db.orm.deployment_unique_upsert_columns, 

143 set_={**conflict_update_fields}, 

144 ) 

145 ) 

146 

147 await session.execute(insert_stmt) 

148 

149 # Get the id of the deployment we just created or updated 

150 result = await session.execute( 

151 sa.select(db.Deployment.id).where( 

152 sa.and_( 

153 db.Deployment.flow_id == deployment.flow_id, 

154 db.Deployment.name == deployment.name, 

155 ) 

156 ) 

157 ) 

158 deployment_id = result.scalar_one_or_none() 

159 

160 if not deployment_id: 

161 return None 

162 

163 # Because this was possibly an upsert, we need to delete any existing 

164 # schedules and any runs from the old deployment. 

165 

166 await _delete_scheduled_runs( 

167 session=session, 

168 deployment_id=deployment_id, 

169 auto_scheduled_only=True, 

170 future_only=True, 

171 ) 

172 

173 await delete_schedules_for_deployment(session=session, deployment_id=deployment_id) 

174 

175 if schedules: 

176 await create_deployment_schedules( 

177 session=session, 

178 deployment_id=deployment_id, 

179 schedules=[ 

180 schemas.actions.DeploymentScheduleCreate( 

181 schedule=schedule.schedule, 

182 active=schedule.active, 

183 parameters=schedule.parameters, 

184 slug=schedule.slug, 

185 ) 

186 for schedule in schedules 

187 ], 

188 ) 

189 

190 if requested_concurrency_limit != "unset": 

191 await _create_or_update_deployment_concurrency_limit( 

192 db, session, deployment_id, deployment.concurrency_limit 

193 ) 

194 

195 query = ( 

196 sa.select(db.Deployment) 

197 .where( 

198 sa.and_( 

199 db.Deployment.flow_id == deployment.flow_id, 

200 db.Deployment.name == deployment.name, 

201 ) 

202 ) 

203 .execution_options(populate_existing=True) 

204 ) 

205 refreshed_result = await session.execute(query) 

206 return refreshed_result.scalar() 

207 

208 

209@db_injector 1a

210async def update_deployment( 1a

211 db: PrefectDBInterface, 

212 session: AsyncSession, 

213 deployment_id: UUID, 

214 deployment: schemas.actions.DeploymentUpdate, 

215) -> bool: 

216 """Updates a deployment. 

217 

218 Args: 

219 session: a database session 

220 deployment_id: the ID of the deployment to modify 

221 deployment: changes to a deployment model 

222 

223 Returns: 

224 bool: whether the deployment was updated 

225 

226 """ 

227 

228 from prefect.server.api.workers import WorkerLookups 

229 

230 schedules = deployment.schedules 

231 

232 # exclude_unset=True allows us to only update values provided by 

233 # the user, ignoring any defaults on the model 

234 update_data = deployment.model_dump_for_orm( 

235 exclude_unset=True, 

236 exclude={"work_pool_name", "version_info"}, 

237 ) 

238 

239 requested_global_concurrency_limit_update = update_data.pop( 

240 "global_concurrency_limit_id", "unset" 

241 ) 

242 requested_concurrency_limit_update = update_data.pop("concurrency_limit", "unset") 

243 

244 if requested_global_concurrency_limit_update != "unset": 

245 update_data["concurrency_limit_id"] = requested_global_concurrency_limit_update 

246 

247 # The job_variables field in client and server schemas is named 

248 # infra_overrides in the database. 

249 job_variables = update_data.pop("job_variables", None) 

250 if job_variables: 

251 update_data["infra_overrides"] = job_variables 

252 

253 should_update_schedules = update_data.pop("schedules", None) is not None 

254 

255 if deployment.work_pool_name and deployment.work_queue_name: 

256 # If a specific pool name/queue name combination was provided, get the 

257 # ID for that work pool queue. 

258 update_data[ 

259 "work_queue_id" 

260 ] = await WorkerLookups()._get_work_queue_id_from_name( 

261 session=session, 

262 work_pool_name=deployment.work_pool_name, 

263 work_queue_name=deployment.work_queue_name, 

264 create_queue_if_not_found=True, 

265 ) 

266 elif deployment.work_pool_name: 

267 # If just a pool name was provided, get the ID for its default 

268 # work pool queue. 

269 update_data[ 

270 "work_queue_id" 

271 ] = await WorkerLookups()._get_default_work_queue_id_from_work_pool_name( 

272 session=session, 

273 work_pool_name=deployment.work_pool_name, 

274 ) 

275 elif deployment.work_queue_name: 

276 # If just a queue name was provided, ensure the queue exists and 

277 # get its ID. 

278 work_queue = await models.work_queues.ensure_work_queue_exists( 

279 session=session, name=update_data["work_queue_name"] 

280 ) 

281 update_data["work_queue_id"] = work_queue.id 

282 

283 update_stmt = ( 

284 sa.update(db.Deployment) 

285 .where(db.Deployment.id == deployment_id) 

286 .values(**update_data) 

287 ) 

288 result = await session.execute(update_stmt) 

289 

290 # delete any auto scheduled runs that would have reflected the old deployment config 

291 await _delete_scheduled_runs( 

292 session=session, 

293 deployment_id=deployment_id, 

294 auto_scheduled_only=True, 

295 future_only=True, 

296 ) 

297 

298 if should_update_schedules: 

299 # If schedules were provided, remove the existing schedules and 

300 # replace them with the new ones. 

301 await delete_schedules_for_deployment( 

302 session=session, deployment_id=deployment_id 

303 ) 

304 await create_deployment_schedules( 

305 session=session, 

306 deployment_id=deployment_id, 

307 schedules=[ 

308 schemas.actions.DeploymentScheduleCreate( 

309 schedule=schedule.schedule, 

310 active=schedule.active if schedule.active is not None else True, 

311 parameters=schedule.parameters, 

312 slug=schedule.slug, 

313 ) 

314 for schedule in schedules 

315 if schedule.schedule is not None 

316 ], 

317 ) 

318 

319 if requested_concurrency_limit_update != "unset": 

320 await _create_or_update_deployment_concurrency_limit( 

321 db, session, deployment_id, deployment.concurrency_limit 

322 ) 

323 

324 return result.rowcount > 0 

325 

326 

327async def _create_or_update_deployment_concurrency_limit( 1a

328 db: PrefectDBInterface, 

329 session: AsyncSession, 

330 deployment_id: UUID, 

331 limit: Optional[int], 

332): 

333 deployment = await session.get(db.Deployment, deployment_id) 

334 assert deployment is not None 

335 

336 if ( 

337 deployment.global_concurrency_limit 

338 and deployment.global_concurrency_limit.limit == limit 

339 ) or (deployment.global_concurrency_limit is None and limit is None): 

340 return 

341 

342 deployment._concurrency_limit = limit 

343 if limit is None: 

344 await _delete_related_concurrency_limit( 

345 db, session=session, deployment_id=deployment_id 

346 ) 

347 await session.refresh(deployment) 

348 elif deployment.global_concurrency_limit: 

349 deployment.global_concurrency_limit.limit = limit 

350 else: 

351 limit_name = f"deployment:{deployment_id}" 

352 new_limit = db.ConcurrencyLimitV2(name=limit_name, limit=limit) 

353 deployment.global_concurrency_limit = new_limit 

354 

355 session.add(deployment) 

356 

357 

358@db_injector 1a

359async def read_deployment( 1a

360 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID 

361) -> Optional[orm_models.Deployment]: 

362 """Reads a deployment by id. 

363 

364 Args: 

365 session: A database session 

366 deployment_id: a deployment id 

367 

368 Returns: 

369 orm_models.Deployment: the deployment 

370 """ 

371 

372 return await session.get(db.Deployment, deployment_id) 

373 

374 

375@db_injector 1a

376async def read_deployment_by_name( 1a

377 db: PrefectDBInterface, session: AsyncSession, name: str, flow_name: str 

378) -> Optional[orm_models.Deployment]: 

379 """Reads a deployment by name. 

380 

381 Args: 

382 session: A database session 

383 name: a deployment name 

384 flow_name: the name of the flow the deployment belongs to 

385 

386 Returns: 

387 orm_models.Deployment: the deployment 

388 """ 

389 

390 result = await session.execute( 

391 select(db.Deployment) 

392 .join(db.Flow, db.Deployment.flow_id == db.Flow.id) 

393 .where( 

394 sa.and_( 

395 db.Flow.name == flow_name, 

396 db.Deployment.name == name, 

397 ) 

398 ) 

399 .limit(1) 

400 ) 

401 return result.scalar() 

402 

403 

404async def _apply_deployment_filters( 1a

405 db: PrefectDBInterface, 

406 query: Select[T], 

407 flow_filter: Optional[schemas.filters.FlowFilter] = None, 

408 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, 

409 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, 

410 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, 

411 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, 

412 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, 

413) -> Select[T]: 

414 """ 

415 Applies filters to a deployment query as a combination of EXISTS subqueries. 

416 """ 

417 

418 if deployment_filter: 

419 query = query.where(deployment_filter.as_sql_filter()) 

420 

421 if flow_filter: 

422 flow_exists_clause = select(db.Deployment.id).where( 

423 db.Deployment.flow_id == db.Flow.id, 

424 flow_filter.as_sql_filter(), 

425 ) 

426 

427 query = query.where(flow_exists_clause.exists()) 

428 

429 if flow_run_filter or task_run_filter: 

430 flow_run_exists_clause = select(db.FlowRun).where( 

431 db.Deployment.id == db.FlowRun.deployment_id 

432 ) 

433 

434 if flow_run_filter: 

435 flow_run_exists_clause = flow_run_exists_clause.where( 

436 flow_run_filter.as_sql_filter() 

437 ) 

438 if task_run_filter: 

439 flow_run_exists_clause = flow_run_exists_clause.join( 

440 db.TaskRun, 

441 db.TaskRun.flow_run_id == db.FlowRun.id, 

442 ).where(task_run_filter.as_sql_filter()) 

443 

444 query = query.where(flow_run_exists_clause.exists()) 

445 

446 if work_pool_filter or work_queue_filter: 

447 work_pool_exists_clause = select(db.WorkQueue).where( 

448 db.Deployment.work_queue_id == db.WorkQueue.id 

449 ) 

450 

451 if work_queue_filter: 

452 work_pool_exists_clause = work_pool_exists_clause.where( 

453 work_queue_filter.as_sql_filter() 

454 ) 

455 

456 if work_pool_filter: 

457 work_pool_exists_clause = work_pool_exists_clause.join( 

458 db.WorkPool, 

459 db.WorkPool.id == db.WorkQueue.work_pool_id, 

460 ).where(work_pool_filter.as_sql_filter()) 

461 

462 query = query.where(work_pool_exists_clause.exists()) 

463 

464 return query 

465 

466 

467@db_injector 1a

468async def read_deployments( 1a

469 db: PrefectDBInterface, 

470 session: AsyncSession, 

471 offset: Optional[int] = None, 

472 limit: Optional[int] = None, 

473 flow_filter: Optional[schemas.filters.FlowFilter] = None, 

474 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, 

475 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, 

476 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, 

477 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, 

478 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, 

479 sort: schemas.sorting.DeploymentSort = schemas.sorting.DeploymentSort.NAME_ASC, 

480) -> Sequence[orm_models.Deployment]: 

481 """ 

482 Read deployments. 

483 

484 Args: 

485 session: A database session 

486 offset: Query offset 

487 limit: Query limit 

488 flow_filter: only select deployments whose flows match these criteria 

489 flow_run_filter: only select deployments whose flow runs match these criteria 

490 task_run_filter: only select deployments whose task runs match these criteria 

491 deployment_filter: only select deployment that match these filters 

492 work_pool_filter: only select deployments whose work pools match these criteria 

493 work_queue_filter: only select deployments whose work pool queues match these criteria 

494 sort: the sort criteria for selected deployments. Defaults to `name` ASC. 

495 

496 Returns: 

497 list[orm_models.Deployment]: deployments 

498 """ 

499 

500 query = select(db.Deployment).order_by(*sort.as_sql_sort()) 

501 

502 query = await _apply_deployment_filters( 

503 db, 

504 query=query, 

505 flow_filter=flow_filter, 

506 flow_run_filter=flow_run_filter, 

507 task_run_filter=task_run_filter, 

508 deployment_filter=deployment_filter, 

509 work_pool_filter=work_pool_filter, 

510 work_queue_filter=work_queue_filter, 

511 ) 

512 

513 if offset is not None: 

514 query = query.offset(offset) 

515 if limit is not None: 

516 query = query.limit(limit) 

517 

518 result = await session.execute(query) 

519 return result.scalars().unique().all() 

520 

521 

522@db_injector 1a

523async def count_deployments( 1a

524 db: PrefectDBInterface, 

525 session: AsyncSession, 

526 flow_filter: Optional[schemas.filters.FlowFilter] = None, 

527 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None, 

528 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None, 

529 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None, 

530 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None, 

531 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None, 

532) -> int: 

533 """ 

534 Count deployments. 

535 

536 Args: 

537 session: A database session 

538 flow_filter: only count deployments whose flows match these criteria 

539 flow_run_filter: only count deployments whose flow runs match these criteria 

540 task_run_filter: only count deployments whose task runs match these criteria 

541 deployment_filter: only count deployment that match these filters 

542 work_pool_filter: only count deployments that match these work pool filters 

543 work_queue_filter: only count deployments that match these work pool queue filters 

544 

545 Returns: 

546 int: the number of deployments matching filters 

547 """ 

548 

549 query = select(sa.func.count(None)).select_from(db.Deployment) 

550 

551 query = await _apply_deployment_filters( 

552 db, 

553 query=query, 

554 flow_filter=flow_filter, 

555 flow_run_filter=flow_run_filter, 

556 task_run_filter=task_run_filter, 

557 deployment_filter=deployment_filter, 

558 work_pool_filter=work_pool_filter, 

559 work_queue_filter=work_queue_filter, 

560 ) 

561 

562 result = await session.execute(query) 

563 return result.scalar_one() 

564 

565 

566@db_injector 1a

567async def delete_deployment( 1a

568 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID 

569) -> bool: 

570 """ 

571 Delete a deployment by id. 

572 

573 Args: 

574 session: A database session 

575 deployment_id: a deployment id 

576 

577 Returns: 

578 bool: whether or not the deployment was deleted 

579 """ 

580 

581 # delete scheduled runs, both auto- and user- created. 

582 await _delete_scheduled_runs( 

583 session=session, deployment_id=deployment_id, auto_scheduled_only=False 

584 ) 

585 

586 await _delete_related_concurrency_limit( 

587 db, session=session, deployment_id=deployment_id 

588 ) 

589 

590 result = await session.execute( 

591 delete(db.Deployment).where(db.Deployment.id == deployment_id) 

592 ) 

593 return result.rowcount > 0 

594 

595 

596async def _delete_related_concurrency_limit( 1a

597 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID 

598): 

599 return await session.execute( 

600 delete(db.ConcurrencyLimitV2).where( 

601 db.ConcurrencyLimitV2.id 

602 == sa.select(db.Deployment.concurrency_limit_id) 

603 .where(db.Deployment.id == deployment_id) 

604 .scalar_subquery() 

605 ) 

606 ) 

607 

608 

609@db_injector 1a

610async def schedule_runs( 1a

611 db: PrefectDBInterface, 

612 session: AsyncSession, 

613 deployment_id: UUID, 

614 start_time: Optional[datetime.datetime] = None, 

615 end_time: Optional[datetime.datetime] = None, 

616 min_time: Optional[datetime.timedelta] = None, 

617 min_runs: Optional[int] = None, 

618 max_runs: Optional[int] = None, 

619 auto_scheduled: bool = True, 

620) -> Sequence[UUID]: 

621 """ 

622 Schedule flow runs for a deployment 

623 

624 Args: 

625 session: a database session 

626 deployment_id: the id of the deployment to schedule 

627 start_time: the time from which to start scheduling runs 

628 end_time: runs will be scheduled until at most this time 

629 min_time: runs will be scheduled until at least this far in the future 

630 min_runs: a minimum amount of runs to schedule 

631 max_runs: a maximum amount of runs to schedule 

632 

633 This function will generate the minimum number of runs that satisfy the min 

634 and max times, and the min and max counts. Specifically, the following order 

635 will be respected. 

636 

637 - Runs will be generated starting on or after the `start_time` 

638 - No more than `max_runs` runs will be generated 

639 - No runs will be generated after `end_time` is reached 

640 - At least `min_runs` runs will be generated 

641 - Runs will be generated until at least `start_time` + `min_time` is reached 

642 

643 Returns: 

644 a list of flow run ids scheduled for the deployment 

645 """ 

646 if min_runs is None: 

647 min_runs = PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS.value() 

648 assert min_runs is not None 

649 if max_runs is None: 

650 max_runs = PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS.value() 

651 assert max_runs is not None 

652 if start_time is None: 

653 start_time = now("UTC") 

654 if end_time is None: 

655 end_time = start_time + ( 

656 PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value() 

657 ) 

658 if min_time is None: 

659 min_time = PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME.value() 

660 assert min_time is not None 

661 

662 actual_start_time = start_time 

663 if TYPE_CHECKING: 

664 assert end_time is not None 

665 actual_end_time = end_time 

666 

667 runs = await _generate_scheduled_flow_runs( 

668 db, 

669 session=session, 

670 deployment_id=deployment_id, 

671 start_time=actual_start_time, 

672 end_time=actual_end_time, 

673 min_time=min_time, 

674 min_runs=min_runs, 

675 max_runs=max_runs, 

676 auto_scheduled=auto_scheduled, 

677 ) 

678 return await _insert_scheduled_flow_runs(session=session, runs=runs) 

679 

680 

681async def _generate_scheduled_flow_runs( 1a

682 db: PrefectDBInterface, 

683 session: AsyncSession, 

684 deployment_id: UUID, 

685 start_time: datetime.datetime, 

686 end_time: datetime.datetime, 

687 min_time: datetime.timedelta, 

688 min_runs: int, 

689 max_runs: int, 

690 auto_scheduled: bool = True, 

691) -> list[dict[str, Any]]: 

692 """ 

693 Given a `deployment_id` and schedule, generates a list of flow run objects and 

694 associated scheduled states that represent scheduled flow runs. This method 

695 does NOT insert generated runs into the database, in order to facilitate 

696 batch operations. Call `_insert_scheduled_flow_runs()` to insert these runs. 

697 

698 Runs include an idempotency key which prevents duplicate runs from being inserted 

699 if the output from this function is used more than once. 

700 

701 Args: 

702 session: a database session 

703 deployment_id: the id of the deployment to schedule 

704 start_time: the time from which to start scheduling runs 

705 end_time: runs will be scheduled until at most this time 

706 min_time: runs will be scheduled until at least this far in the future 

707 min_runs: a minimum amount of runs to schedule 

708 max_runs: a maximum amount of runs to schedule 

709 

710 This function will generate the minimum number of runs that satisfy the min 

711 and max times, and the min and max counts. Specifically, the following order 

712 will be respected. 

713 

714 - Runs will be generated starting on or after the `start_time` 

715 - No more than `max_runs` runs will be generated 

716 - No runs will be generated after `end_time` is reached 

717 - At least `min_runs` runs will be generated 

718 - Runs will be generated until at least `start_time + min_time` is reached 

719 

720 Returns: 

721 a list of dictionary representations of the `FlowRun` objects to schedule 

722 """ 

723 runs: list[dict[str, Any]] = [] 

724 

725 deployment = await session.get(db.Deployment, deployment_id) 

726 

727 if not deployment: 

728 return [] 

729 

730 active_deployment_schedules = await read_deployment_schedules( 

731 session=session, 

732 deployment_id=deployment.id, 

733 deployment_schedule_filter=schemas.filters.DeploymentScheduleFilter( 

734 active=schemas.filters.DeploymentScheduleFilterActive(eq_=True) 

735 ), 

736 ) 

737 

738 for deployment_schedule in active_deployment_schedules: 

739 dates: list[DateTime] = [] 

740 

741 # generate up to `n` dates satisfying the min of `max_runs` and `end_time` 

742 for dt in deployment_schedule.schedule._get_dates_generator( 

743 n=max_runs, start=start_time, end=end_time 

744 ): 

745 dates.append(dt) 

746 

747 # at any point, if we satisfy both of the minimums, we can stop 

748 if len(dates) >= min_runs and dt >= (start_time + min_time): 

749 break 

750 

751 tags = deployment.tags 

752 if auto_scheduled: 

753 tags = ["auto-scheduled"] + tags 

754 

755 parameters = { 

756 **deployment.parameters, 

757 **deployment_schedule.parameters, 

758 } 

759 

760 # Generate system labels for flow runs from this deployment 

761 labels = await with_system_labels_for_deployment_flow_run( 

762 session=session, 

763 deployment=deployment, 

764 ) 

765 

766 for date in dates: 

767 runs.append( 

768 { 

769 "id": uuid7(), 

770 "flow_id": deployment.flow_id, 

771 "deployment_id": deployment_id, 

772 "deployment_version": deployment.version, 

773 "work_queue_name": deployment.work_queue_name, 

774 "work_queue_id": deployment.work_queue_id, 

775 "parameters": parameters, 

776 "infrastructure_document_id": deployment.infrastructure_document_id, 

777 "idempotency_key": f"scheduled {deployment.id} {deployment_schedule.id} {date}", 

778 "tags": tags, 

779 "labels": labels, 

780 "auto_scheduled": auto_scheduled, 

781 "state": schemas.states.Scheduled( 

782 scheduled_time=date, 

783 message="Flow run scheduled", 

784 ).model_dump(), 

785 "state_type": schemas.states.StateType.SCHEDULED, 

786 "state_name": "Scheduled", 

787 "next_scheduled_start_time": date, 

788 "expected_start_time": date, 

789 "created_by": { 

790 "id": deployment_schedule.id, 

791 "display_value": deployment_schedule.slug 

792 or deployment_schedule.schedule.__class__.__name__, 

793 "type": "SCHEDULE", 

794 }, 

795 } 

796 ) 

797 

798 return runs 

799 

800 

801@db_injector 1a

802async def _insert_scheduled_flow_runs( 1a

803 db: PrefectDBInterface, session: AsyncSession, runs: list[dict[str, Any]] 

804) -> Sequence[UUID]: 

805 """ 

806 Given a list of flow runs to schedule, as generated by `_generate_scheduled_flow_runs`, 

807 inserts them into the database. Note this is a separate method to facilitate batch 

808 operations on many scheduled runs. 

809 

810 Args: 

811 session: a database session 

812 runs: a list of dicts representing flow runs to insert 

813 

814 Returns: 

815 a list of flow run ids that were created 

816 """ 

817 

818 if not runs: 

819 return [] 

820 

821 # gracefully insert the flow runs against the idempotency key 

822 # this syntax (insert statement, values to insert) is most efficient 

823 # because it uses a single bind parameter 

824 await session.execute( 

825 db.queries.insert(db.FlowRun).on_conflict_do_nothing( 

826 index_elements=db.orm.flow_run_unique_upsert_columns 

827 ), 

828 runs, 

829 ) 

830 

831 # query for the rows that were newly inserted (by checking for any flow runs with 

832 # no corresponding flow run states) 

833 inserted_rows = sa.select(db.FlowRun.id).where( 

834 db.FlowRun.id.in_([r["id"] for r in runs]), 

835 ~select(db.FlowRunState.id) 

836 .where(db.FlowRunState.flow_run_id == db.FlowRun.id) 

837 .exists(), 

838 ) 

839 inserted_flow_run_ids = (await session.execute(inserted_rows)).scalars().all() 

840 

841 # insert flow run states that correspond to the newly-insert rows 

842 insert_flow_run_states: list[dict[str, Any]] = [ 

843 {"id": uuid7(), "flow_run_id": r["id"], **r["state"]} 

844 for r in runs 

845 if r["id"] in inserted_flow_run_ids 

846 ] 

847 if insert_flow_run_states: 

848 # this syntax (insert statement, values to insert) is most efficient 

849 # because it uses a single bind parameter 

850 await session.execute( 

851 db.FlowRunState.__table__.insert(), # type: ignore[attr-defined] 

852 insert_flow_run_states, 

853 ) 

854 

855 # set the `state_id` on the newly inserted runs 

856 stmt = db.queries.set_state_id_on_inserted_flow_runs_statement( 

857 inserted_flow_run_ids=inserted_flow_run_ids, 

858 insert_flow_run_states=insert_flow_run_states, 

859 ) 

860 

861 await session.execute(stmt) 

862 

863 return inserted_flow_run_ids 

864 

865 

866@db_injector 1a

867async def check_work_queues_for_deployment( 1a

868 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID 

869) -> Sequence[orm_models.WorkQueue]: 

870 """ 

871 Get work queues that can pick up the specified deployment. 

872 

873 Work queues will pick up a deployment when all of the following are met. 

874 

875 - The deployment has ALL tags that the work queue has (i.e. the work 

876 queue's tags must be a subset of the deployment's tags). 

877 - The work queue's specified deployment IDs match the deployment's ID, 

878 or the work queue does NOT have specified deployment IDs. 

879 - The work queue's specified flow runners match the deployment's flow 

880 runner or the work queue does NOT have a specified flow runner. 

881 

882 Notes on the query: 

883 

884 - Our database currently allows either "null" and empty lists as 

885 null values in filters, so we need to catch both cases with "or". 

886 - `A.contains(B)` should be interpreted as "True if A 

887 contains B". 

888 

889 Returns: 

890 List[orm_models.WorkQueue]: WorkQueues 

891 """ 

892 deployment = await session.get(db.Deployment, deployment_id) 

893 if not deployment: 

894 raise ObjectNotFoundError(f"Deployment with id {deployment_id} not found") 

895 

896 def json_contains(a: Any, b: Any) -> sa.ColumnElement[bool]: 

897 return sa.type_coerce(a, type_=JSONB).contains(sa.type_coerce(b, type_=JSONB)) 

898 

899 query = ( 

900 select(db.WorkQueue) 

901 # work queue tags are a subset of deployment tags 

902 .filter( 

903 or_( 

904 json_contains(deployment.tags, db.WorkQueue.filter["tags"]), 

905 json_contains([], db.WorkQueue.filter["tags"]), 

906 json_contains(None, db.WorkQueue.filter["tags"]), 

907 ) 

908 ) 

909 # deployment_ids is null or contains the deployment's ID 

910 .filter( 

911 or_( 

912 json_contains( 

913 db.WorkQueue.filter["deployment_ids"], 

914 str(deployment.id), 

915 ), 

916 json_contains(None, db.WorkQueue.filter["deployment_ids"]), 

917 json_contains([], db.WorkQueue.filter["deployment_ids"]), 

918 ) 

919 ) 

920 ) 

921 

922 result = await session.execute(query) 

923 return result.scalars().unique().all() 

924 

925 

926@db_injector 1a

927async def create_deployment_schedules( 1a

928 db: PrefectDBInterface, 

929 session: AsyncSession, 

930 deployment_id: UUID, 

931 schedules: list[schemas.actions.DeploymentScheduleCreate], 

932) -> list[schemas.core.DeploymentSchedule]: 

933 """ 

934 Creates a deployment's schedules. 

935 

936 Args: 

937 session: A database session 

938 deployment_id: a deployment id 

939 schedules: a list of deployment schedule create actions 

940 """ 

941 

942 schedules_with_deployment_id: list[dict[str, Any]] = [] 

943 for schedule in schedules: 

944 data = schedule.model_dump() 

945 data["deployment_id"] = deployment_id 

946 schedules_with_deployment_id.append(data) 

947 

948 models = [ 

949 db.DeploymentSchedule(**schedule) for schedule in schedules_with_deployment_id 

950 ] 

951 session.add_all(models) 

952 await session.flush() 

953 

954 return [ 

955 schemas.core.DeploymentSchedule.model_validate(m, from_attributes=True) 

956 for m in models 

957 ] 

958 

959 

960@db_injector 1a

961async def read_deployment_schedules( 1a

962 db: PrefectDBInterface, 

963 session: AsyncSession, 

964 deployment_id: UUID, 

965 deployment_schedule_filter: Optional[ 

966 schemas.filters.DeploymentScheduleFilter 

967 ] = None, 

968) -> list[schemas.core.DeploymentSchedule]: 

969 """ 

970 Reads a deployment's schedules. 

971 

972 Args: 

973 session: A database session 

974 deployment_id: a deployment id 

975 

976 Returns: 

977 list[schemas.core.DeploymentSchedule]: the deployment's schedules 

978 """ 

979 

980 query = ( 

981 sa.select(db.DeploymentSchedule) 

982 .where(db.DeploymentSchedule.deployment_id == deployment_id) 

983 .order_by(db.DeploymentSchedule.updated.desc()) 

984 ) 

985 

986 if deployment_schedule_filter: 

987 query = query.where(deployment_schedule_filter.as_sql_filter()) 

988 

989 result = await session.execute(query) 

990 

991 return [ 

992 schemas.core.DeploymentSchedule.model_validate(s, from_attributes=True) 

993 for s in result.scalars().all() 

994 ] 

995 

996 

997@db_injector 1a

998async def update_deployment_schedule( 1a

999 db: PrefectDBInterface, 

1000 session: AsyncSession, 

1001 deployment_id: UUID, 

1002 schedule: schemas.actions.DeploymentScheduleUpdate, 

1003 deployment_schedule_id: UUID | None = None, 

1004 deployment_schedule_slug: str | None = None, 

1005) -> bool: 

1006 """ 

1007 Updates a deployment's schedules. 

1008 

1009 Args: 

1010 session: A database session 

1011 deployment_schedule_id: a deployment schedule id 

1012 schedule: a deployment schedule update action 

1013 """ 

1014 if deployment_schedule_id: 

1015 result = await session.execute( 

1016 sa.update(db.DeploymentSchedule) 

1017 .where( 

1018 sa.and_( 

1019 db.DeploymentSchedule.id == deployment_schedule_id, 

1020 db.DeploymentSchedule.deployment_id == deployment_id, 

1021 ) 

1022 ) 

1023 .values(**schedule.model_dump(exclude_none=True)) 

1024 ) 

1025 elif deployment_schedule_slug: 

1026 result = await session.execute( 

1027 sa.update(db.DeploymentSchedule) 

1028 .where( 

1029 sa.and_( 

1030 db.DeploymentSchedule.slug == deployment_schedule_slug, 

1031 db.DeploymentSchedule.deployment_id == deployment_id, 

1032 ) 

1033 ) 

1034 .values(**schedule.model_dump(exclude_none=True)) 

1035 ) 

1036 else: 

1037 raise ValueError( 

1038 "Either deployment_schedule_id or deployment_schedule_slug must be provided" 

1039 ) 

1040 

1041 return result.rowcount > 0 

1042 

1043 

1044@db_injector 1a

1045async def delete_schedules_for_deployment( 1a

1046 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID 

1047) -> bool: 

1048 """ 

1049 Deletes a deployment schedule. 

1050 

1051 Args: 

1052 session: A database session 

1053 deployment_id: a deployment id 

1054 """ 

1055 

1056 deployment = await session.get(db.Deployment, deployment_id) 

1057 assert deployment is not None 

1058 

1059 result = await session.execute( 

1060 sa.delete(db.DeploymentSchedule).where( 

1061 db.DeploymentSchedule.deployment_id == deployment_id 

1062 ) 

1063 ) 

1064 

1065 await session.refresh(deployment) 

1066 return result.rowcount > 0 

1067 

1068 

1069@db_injector 1a

1070async def delete_deployment_schedule( 1a

1071 db: PrefectDBInterface, 

1072 session: AsyncSession, 

1073 deployment_id: UUID, 

1074 deployment_schedule_id: UUID, 

1075) -> bool: 

1076 """ 

1077 Deletes a deployment schedule. 

1078 

1079 Args: 

1080 session: A database session 

1081 deployment_schedule_id: a deployment schedule id 

1082 """ 

1083 

1084 result = await session.execute( 

1085 sa.delete(db.DeploymentSchedule).where( 

1086 sa.and_( 

1087 db.DeploymentSchedule.id == deployment_schedule_id, 

1088 db.DeploymentSchedule.deployment_id == deployment_id, 

1089 ) 

1090 ) 

1091 ) 

1092 

1093 return result.rowcount > 0 

1094 

1095 

1096async def mark_deployments_ready( 1a

1097 *, 

1098 db: PrefectDBInterface = Depends(provide_database_interface), 

1099 deployment_ids: Optional[Iterable[UUID]] = None, 

1100 work_queue_ids: Optional[Iterable[UUID]] = None, 

1101 retry: Retry = Retry(attempts=5, delay=datetime.timedelta(seconds=0.5)), 

1102) -> None: 

1103 deployment_ids = deployment_ids or [] 

1104 work_queue_ids = work_queue_ids or [] 

1105 

1106 if not deployment_ids and not work_queue_ids: 

1107 return 

1108 

1109 async with db.session_context( 

1110 begin_transaction=True, 

1111 ) as session: 

1112 result = await session.execute( 

1113 select(db.Deployment.id).where( 

1114 sa.or_( 

1115 db.Deployment.id.in_(deployment_ids), 

1116 db.Deployment.work_queue_id.in_(work_queue_ids), 

1117 ), 

1118 db.Deployment.status == DeploymentStatus.NOT_READY, 

1119 ) 

1120 ) 

1121 unready_deployments = list(result.scalars().unique().all()) 

1122 

1123 last_polled = now("UTC") 

1124 

1125 await session.execute( 

1126 sa.update(db.Deployment) 

1127 .where( 

1128 sa.or_( 

1129 db.Deployment.id.in_(deployment_ids), 

1130 db.Deployment.work_queue_id.in_(work_queue_ids), 

1131 ) 

1132 ) 

1133 .values(status=DeploymentStatus.READY, last_polled=last_polled) 

1134 ) 

1135 

1136 if not unready_deployments: 

1137 return 

1138 

1139 async with PrefectServerEventsClient() as events: 

1140 for deployment_id in unready_deployments: 

1141 await events.emit( 

1142 await deployment_status_event( 

1143 session=session, 

1144 deployment_id=deployment_id, 

1145 status=DeploymentStatus.READY, 

1146 occurred=last_polled, 

1147 ) 

1148 ) 

1149 

1150 

1151@db_injector 1a

1152async def mark_deployments_not_ready( 1a

1153 db: PrefectDBInterface, 

1154 deployment_ids: Optional[Iterable[UUID]] = None, 

1155 work_queue_ids: Optional[Iterable[UUID]] = None, 

1156) -> None: 

1157 try: 1b

1158 deployment_ids = deployment_ids or [] 1b

1159 work_queue_ids = work_queue_ids or [] 1b

1160 

1161 if not deployment_ids and not work_queue_ids: 1161 ↛ 1164line 1161 didn't jump to line 1164 because the condition on line 1161 was always true1b

1162 return 1b

1163 

1164 async with db.session_context( 

1165 begin_transaction=True, 

1166 ) as session: 

1167 result = await session.execute( 

1168 select(db.Deployment.id).where( 

1169 sa.or_( 

1170 db.Deployment.id.in_(deployment_ids), 

1171 db.Deployment.work_queue_id.in_(work_queue_ids), 

1172 ), 

1173 db.Deployment.status == DeploymentStatus.READY, 

1174 ) 

1175 ) 

1176 ready_deployments = list(result.scalars().unique().all()) 

1177 

1178 await session.execute( 

1179 sa.update(db.Deployment) 

1180 .where( 

1181 sa.or_( 

1182 db.Deployment.id.in_(deployment_ids), 

1183 db.Deployment.work_queue_id.in_(work_queue_ids), 

1184 ) 

1185 ) 

1186 .values(status=DeploymentStatus.NOT_READY) 

1187 ) 

1188 

1189 if not ready_deployments: 

1190 return 

1191 

1192 async with PrefectServerEventsClient() as events: 

1193 for deployment_id in ready_deployments: 

1194 await events.emit( 

1195 await deployment_status_event( 

1196 session=session, 

1197 deployment_id=deployment_id, 

1198 status=DeploymentStatus.NOT_READY, 

1199 occurred=now("UTC"), 

1200 ) 

1201 ) 

1202 except Exception as exc: 

1203 logger.error(f"Error marking deployments as not ready: {exc}", exc_info=True) 

1204 

1205 

1206async def with_system_labels_for_deployment( 1a

1207 session: AsyncSession, 

1208 deployment: schemas.core.Deployment, 

1209) -> schemas.core.KeyValueLabels: 

1210 """Augment user supplied labels with system default labels for a deployment.""" 

1211 default_labels = cast( 

1212 schemas.core.KeyValueLabels, 

1213 { 

1214 "prefect.flow.id": str(deployment.flow_id), 

1215 }, 

1216 ) 

1217 

1218 user_supplied_labels = deployment.labels or {} 

1219 

1220 parent_labels = ( 

1221 await models.flows.read_flow_labels(session, deployment.flow_id) 

1222 ) or {} 

1223 

1224 return parent_labels | default_labels | user_supplied_labels 

1225 

1226 

1227async def with_system_labels_for_deployment_flow_run( 1a

1228 session: AsyncSession, 

1229 deployment: orm_models.Deployment, 

1230 user_supplied_labels: Optional[schemas.core.KeyValueLabels] = None, 

1231) -> schemas.core.KeyValueLabels: 

1232 """Generate system labels for a flow run created from a deployment. 

1233 

1234 Args: 

1235 session: Database session 

1236 deployment: The deployment the flow run is created from 

1237 user_supplied_labels: Optional user-supplied labels to include 

1238 

1239 Returns: 

1240 Complete set of labels for the flow run 

1241 """ 

1242 system_labels = cast( 

1243 schemas.core.KeyValueLabels, 

1244 { 

1245 "prefect.flow.id": str(deployment.flow_id), 

1246 "prefect.deployment.id": str(deployment.id), 

1247 }, 

1248 ) 

1249 

1250 # Use deployment labels as parent labels for flow runs 

1251 parent_labels = deployment.labels or {} 

1252 user_labels = user_supplied_labels or {} 

1253 

1254 return parent_labels | system_labels | user_labels