Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/deployments.py: 16%
300 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Functions for interacting with deployment ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6from __future__ import annotations 1a
8import datetime 1a
9import logging 1a
10from collections.abc import Iterable, Sequence 1a
11from typing import TYPE_CHECKING, Any, Optional, TypeVar, cast 1a
12from uuid import UUID 1a
14import sqlalchemy as sa 1a
15from docket import Depends, Retry 1a
16from sqlalchemy import delete, or_, select 1a
17from sqlalchemy.dialects.postgresql import JSONB 1a
18from sqlalchemy.ext.asyncio import AsyncSession 1a
19from sqlalchemy.sql import Select 1a
21from prefect._internal.uuid7 import uuid7 1a
22from prefect.logging import get_logger 1a
23from prefect.server import models, schemas 1a
24from prefect.server.database import ( 1a
25 PrefectDBInterface,
26 db_injector,
27 orm_models,
28 provide_database_interface,
29)
30from prefect.server.events.clients import PrefectServerEventsClient 1a
31from prefect.server.exceptions import ObjectNotFoundError 1a
32from prefect.server.models.events import deployment_status_event 1a
33from prefect.server.schemas.statuses import DeploymentStatus 1a
34from prefect.settings import ( 1a
35 PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS,
36 PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME,
37 PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS,
38 PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME,
39)
40from prefect.types._datetime import DateTime, now 1a
42T = TypeVar("T", bound=tuple[Any, ...]) 1a
44logger: logging.Logger = get_logger("prefect.server.models.deployments") 1a
47@db_injector 1a
48async def _delete_scheduled_runs( 1a
49 db: PrefectDBInterface,
50 session: AsyncSession,
51 deployment_id: UUID,
52 auto_scheduled_only: bool = False,
53 future_only: bool = False,
54) -> None:
55 """
56 This utility function deletes all of a deployment's runs that are in a Scheduled state
57 and haven't run yet. It should be run any time a deployment is created or
58 modified in order to ensure that future runs comply with the deployment's latest values.
60 Args:
61 deployment_id: the deployment for which we should delete runs.
62 auto_scheduled_only: if True, only delete auto scheduled runs. Defaults to `False`.
63 future_only: if True, only delete runs that are scheduled to run in the future.
64 Defaults to `False`.
65 """
66 delete_query = sa.delete(db.FlowRun).where(
67 db.FlowRun.deployment_id == deployment_id,
68 db.FlowRun.state_type == schemas.states.StateType.SCHEDULED.value,
69 db.FlowRun.state_name != schemas.states.AwaitingConcurrencySlot().name,
70 db.FlowRun.run_count == 0,
71 )
73 if auto_scheduled_only:
74 delete_query = delete_query.where(
75 db.FlowRun.auto_scheduled.is_(True),
76 )
78 if future_only:
79 delete_query = delete_query.where(
80 db.FlowRun.next_scheduled_start_time > now("UTC"),
81 )
83 await session.execute(delete_query)
86@db_injector 1a
87async def create_deployment( 1a
88 db: PrefectDBInterface,
89 session: AsyncSession,
90 deployment: schemas.core.Deployment | schemas.actions.DeploymentCreate,
91) -> Optional[orm_models.Deployment]:
92 """Upserts a deployment.
94 Args:
95 session: a database session
96 deployment: a deployment model
98 Returns:
99 orm_models.Deployment: the newly-created or updated deployment
101 """
103 # set `updated` manually
104 # known limitation of `on_conflict_do_update`, will not use `Column.onupdate`
105 # https://docs.sqlalchemy.org/en/14/dialects/sqlite.html#the-set-clause
106 deployment.updated = now("UTC") # type: ignore[assignment]
108 deployment.labels = await with_system_labels_for_deployment(session, deployment)
110 schedules = deployment.schedules
111 insert_values = deployment.model_dump_for_orm(
112 exclude_unset=True, exclude={"schedules", "version_info"}
113 )
115 requested_concurrency_limit = insert_values.pop("concurrency_limit", "unset")
117 # The job_variables field in client and server schemas is named
118 # infra_overrides in the database.
119 job_variables = insert_values.pop("job_variables", None)
120 if job_variables:
121 insert_values["infra_overrides"] = job_variables
123 conflict_update_fields = deployment.model_dump_for_orm(
124 exclude_unset=True,
125 exclude={
126 "id",
127 "created",
128 "created_by",
129 "schedules",
130 "job_variables",
131 "concurrency_limit",
132 "version_info",
133 },
134 )
135 if job_variables:
136 conflict_update_fields["infra_overrides"] = job_variables
138 insert_stmt = (
139 db.queries.insert(db.Deployment)
140 .values(**insert_values)
141 .on_conflict_do_update(
142 index_elements=db.orm.deployment_unique_upsert_columns,
143 set_={**conflict_update_fields},
144 )
145 )
147 await session.execute(insert_stmt)
149 # Get the id of the deployment we just created or updated
150 result = await session.execute(
151 sa.select(db.Deployment.id).where(
152 sa.and_(
153 db.Deployment.flow_id == deployment.flow_id,
154 db.Deployment.name == deployment.name,
155 )
156 )
157 )
158 deployment_id = result.scalar_one_or_none()
160 if not deployment_id:
161 return None
163 # Because this was possibly an upsert, we need to delete any existing
164 # schedules and any runs from the old deployment.
166 await _delete_scheduled_runs(
167 session=session,
168 deployment_id=deployment_id,
169 auto_scheduled_only=True,
170 future_only=True,
171 )
173 await delete_schedules_for_deployment(session=session, deployment_id=deployment_id)
175 if schedules:
176 await create_deployment_schedules(
177 session=session,
178 deployment_id=deployment_id,
179 schedules=[
180 schemas.actions.DeploymentScheduleCreate(
181 schedule=schedule.schedule,
182 active=schedule.active,
183 parameters=schedule.parameters,
184 slug=schedule.slug,
185 )
186 for schedule in schedules
187 ],
188 )
190 if requested_concurrency_limit != "unset":
191 await _create_or_update_deployment_concurrency_limit(
192 db, session, deployment_id, deployment.concurrency_limit
193 )
195 query = (
196 sa.select(db.Deployment)
197 .where(
198 sa.and_(
199 db.Deployment.flow_id == deployment.flow_id,
200 db.Deployment.name == deployment.name,
201 )
202 )
203 .execution_options(populate_existing=True)
204 )
205 refreshed_result = await session.execute(query)
206 return refreshed_result.scalar()
209@db_injector 1a
210async def update_deployment( 1a
211 db: PrefectDBInterface,
212 session: AsyncSession,
213 deployment_id: UUID,
214 deployment: schemas.actions.DeploymentUpdate,
215) -> bool:
216 """Updates a deployment.
218 Args:
219 session: a database session
220 deployment_id: the ID of the deployment to modify
221 deployment: changes to a deployment model
223 Returns:
224 bool: whether the deployment was updated
226 """
228 from prefect.server.api.workers import WorkerLookups
230 schedules = deployment.schedules
232 # exclude_unset=True allows us to only update values provided by
233 # the user, ignoring any defaults on the model
234 update_data = deployment.model_dump_for_orm(
235 exclude_unset=True,
236 exclude={"work_pool_name", "version_info"},
237 )
239 requested_global_concurrency_limit_update = update_data.pop(
240 "global_concurrency_limit_id", "unset"
241 )
242 requested_concurrency_limit_update = update_data.pop("concurrency_limit", "unset")
244 if requested_global_concurrency_limit_update != "unset":
245 update_data["concurrency_limit_id"] = requested_global_concurrency_limit_update
247 # The job_variables field in client and server schemas is named
248 # infra_overrides in the database.
249 job_variables = update_data.pop("job_variables", None)
250 if job_variables:
251 update_data["infra_overrides"] = job_variables
253 should_update_schedules = update_data.pop("schedules", None) is not None
255 if deployment.work_pool_name and deployment.work_queue_name:
256 # If a specific pool name/queue name combination was provided, get the
257 # ID for that work pool queue.
258 update_data[
259 "work_queue_id"
260 ] = await WorkerLookups()._get_work_queue_id_from_name(
261 session=session,
262 work_pool_name=deployment.work_pool_name,
263 work_queue_name=deployment.work_queue_name,
264 create_queue_if_not_found=True,
265 )
266 elif deployment.work_pool_name:
267 # If just a pool name was provided, get the ID for its default
268 # work pool queue.
269 update_data[
270 "work_queue_id"
271 ] = await WorkerLookups()._get_default_work_queue_id_from_work_pool_name(
272 session=session,
273 work_pool_name=deployment.work_pool_name,
274 )
275 elif deployment.work_queue_name:
276 # If just a queue name was provided, ensure the queue exists and
277 # get its ID.
278 work_queue = await models.work_queues.ensure_work_queue_exists(
279 session=session, name=update_data["work_queue_name"]
280 )
281 update_data["work_queue_id"] = work_queue.id
283 update_stmt = (
284 sa.update(db.Deployment)
285 .where(db.Deployment.id == deployment_id)
286 .values(**update_data)
287 )
288 result = await session.execute(update_stmt)
290 # delete any auto scheduled runs that would have reflected the old deployment config
291 await _delete_scheduled_runs(
292 session=session,
293 deployment_id=deployment_id,
294 auto_scheduled_only=True,
295 future_only=True,
296 )
298 if should_update_schedules:
299 # If schedules were provided, remove the existing schedules and
300 # replace them with the new ones.
301 await delete_schedules_for_deployment(
302 session=session, deployment_id=deployment_id
303 )
304 await create_deployment_schedules(
305 session=session,
306 deployment_id=deployment_id,
307 schedules=[
308 schemas.actions.DeploymentScheduleCreate(
309 schedule=schedule.schedule,
310 active=schedule.active if schedule.active is not None else True,
311 parameters=schedule.parameters,
312 slug=schedule.slug,
313 )
314 for schedule in schedules
315 if schedule.schedule is not None
316 ],
317 )
319 if requested_concurrency_limit_update != "unset":
320 await _create_or_update_deployment_concurrency_limit(
321 db, session, deployment_id, deployment.concurrency_limit
322 )
324 return result.rowcount > 0
327async def _create_or_update_deployment_concurrency_limit( 1a
328 db: PrefectDBInterface,
329 session: AsyncSession,
330 deployment_id: UUID,
331 limit: Optional[int],
332):
333 deployment = await session.get(db.Deployment, deployment_id)
334 assert deployment is not None
336 if (
337 deployment.global_concurrency_limit
338 and deployment.global_concurrency_limit.limit == limit
339 ) or (deployment.global_concurrency_limit is None and limit is None):
340 return
342 deployment._concurrency_limit = limit
343 if limit is None:
344 await _delete_related_concurrency_limit(
345 db, session=session, deployment_id=deployment_id
346 )
347 await session.refresh(deployment)
348 elif deployment.global_concurrency_limit:
349 deployment.global_concurrency_limit.limit = limit
350 else:
351 limit_name = f"deployment:{deployment_id}"
352 new_limit = db.ConcurrencyLimitV2(name=limit_name, limit=limit)
353 deployment.global_concurrency_limit = new_limit
355 session.add(deployment)
358@db_injector 1a
359async def read_deployment( 1a
360 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID
361) -> Optional[orm_models.Deployment]:
362 """Reads a deployment by id.
364 Args:
365 session: A database session
366 deployment_id: a deployment id
368 Returns:
369 orm_models.Deployment: the deployment
370 """
372 return await session.get(db.Deployment, deployment_id)
375@db_injector 1a
376async def read_deployment_by_name( 1a
377 db: PrefectDBInterface, session: AsyncSession, name: str, flow_name: str
378) -> Optional[orm_models.Deployment]:
379 """Reads a deployment by name.
381 Args:
382 session: A database session
383 name: a deployment name
384 flow_name: the name of the flow the deployment belongs to
386 Returns:
387 orm_models.Deployment: the deployment
388 """
390 result = await session.execute(
391 select(db.Deployment)
392 .join(db.Flow, db.Deployment.flow_id == db.Flow.id)
393 .where(
394 sa.and_(
395 db.Flow.name == flow_name,
396 db.Deployment.name == name,
397 )
398 )
399 .limit(1)
400 )
401 return result.scalar()
404async def _apply_deployment_filters( 1a
405 db: PrefectDBInterface,
406 query: Select[T],
407 flow_filter: Optional[schemas.filters.FlowFilter] = None,
408 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None,
409 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None,
410 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None,
411 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None,
412 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None,
413) -> Select[T]:
414 """
415 Applies filters to a deployment query as a combination of EXISTS subqueries.
416 """
418 if deployment_filter:
419 query = query.where(deployment_filter.as_sql_filter())
421 if flow_filter:
422 flow_exists_clause = select(db.Deployment.id).where(
423 db.Deployment.flow_id == db.Flow.id,
424 flow_filter.as_sql_filter(),
425 )
427 query = query.where(flow_exists_clause.exists())
429 if flow_run_filter or task_run_filter:
430 flow_run_exists_clause = select(db.FlowRun).where(
431 db.Deployment.id == db.FlowRun.deployment_id
432 )
434 if flow_run_filter:
435 flow_run_exists_clause = flow_run_exists_clause.where(
436 flow_run_filter.as_sql_filter()
437 )
438 if task_run_filter:
439 flow_run_exists_clause = flow_run_exists_clause.join(
440 db.TaskRun,
441 db.TaskRun.flow_run_id == db.FlowRun.id,
442 ).where(task_run_filter.as_sql_filter())
444 query = query.where(flow_run_exists_clause.exists())
446 if work_pool_filter or work_queue_filter:
447 work_pool_exists_clause = select(db.WorkQueue).where(
448 db.Deployment.work_queue_id == db.WorkQueue.id
449 )
451 if work_queue_filter:
452 work_pool_exists_clause = work_pool_exists_clause.where(
453 work_queue_filter.as_sql_filter()
454 )
456 if work_pool_filter:
457 work_pool_exists_clause = work_pool_exists_clause.join(
458 db.WorkPool,
459 db.WorkPool.id == db.WorkQueue.work_pool_id,
460 ).where(work_pool_filter.as_sql_filter())
462 query = query.where(work_pool_exists_clause.exists())
464 return query
467@db_injector 1a
468async def read_deployments( 1a
469 db: PrefectDBInterface,
470 session: AsyncSession,
471 offset: Optional[int] = None,
472 limit: Optional[int] = None,
473 flow_filter: Optional[schemas.filters.FlowFilter] = None,
474 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None,
475 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None,
476 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None,
477 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None,
478 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None,
479 sort: schemas.sorting.DeploymentSort = schemas.sorting.DeploymentSort.NAME_ASC,
480) -> Sequence[orm_models.Deployment]:
481 """
482 Read deployments.
484 Args:
485 session: A database session
486 offset: Query offset
487 limit: Query limit
488 flow_filter: only select deployments whose flows match these criteria
489 flow_run_filter: only select deployments whose flow runs match these criteria
490 task_run_filter: only select deployments whose task runs match these criteria
491 deployment_filter: only select deployment that match these filters
492 work_pool_filter: only select deployments whose work pools match these criteria
493 work_queue_filter: only select deployments whose work pool queues match these criteria
494 sort: the sort criteria for selected deployments. Defaults to `name` ASC.
496 Returns:
497 list[orm_models.Deployment]: deployments
498 """
500 query = select(db.Deployment).order_by(*sort.as_sql_sort())
502 query = await _apply_deployment_filters(
503 db,
504 query=query,
505 flow_filter=flow_filter,
506 flow_run_filter=flow_run_filter,
507 task_run_filter=task_run_filter,
508 deployment_filter=deployment_filter,
509 work_pool_filter=work_pool_filter,
510 work_queue_filter=work_queue_filter,
511 )
513 if offset is not None:
514 query = query.offset(offset)
515 if limit is not None:
516 query = query.limit(limit)
518 result = await session.execute(query)
519 return result.scalars().unique().all()
522@db_injector 1a
523async def count_deployments( 1a
524 db: PrefectDBInterface,
525 session: AsyncSession,
526 flow_filter: Optional[schemas.filters.FlowFilter] = None,
527 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None,
528 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None,
529 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None,
530 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None,
531 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None,
532) -> int:
533 """
534 Count deployments.
536 Args:
537 session: A database session
538 flow_filter: only count deployments whose flows match these criteria
539 flow_run_filter: only count deployments whose flow runs match these criteria
540 task_run_filter: only count deployments whose task runs match these criteria
541 deployment_filter: only count deployment that match these filters
542 work_pool_filter: only count deployments that match these work pool filters
543 work_queue_filter: only count deployments that match these work pool queue filters
545 Returns:
546 int: the number of deployments matching filters
547 """
549 query = select(sa.func.count(None)).select_from(db.Deployment)
551 query = await _apply_deployment_filters(
552 db,
553 query=query,
554 flow_filter=flow_filter,
555 flow_run_filter=flow_run_filter,
556 task_run_filter=task_run_filter,
557 deployment_filter=deployment_filter,
558 work_pool_filter=work_pool_filter,
559 work_queue_filter=work_queue_filter,
560 )
562 result = await session.execute(query)
563 return result.scalar_one()
566@db_injector 1a
567async def delete_deployment( 1a
568 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID
569) -> bool:
570 """
571 Delete a deployment by id.
573 Args:
574 session: A database session
575 deployment_id: a deployment id
577 Returns:
578 bool: whether or not the deployment was deleted
579 """
581 # delete scheduled runs, both auto- and user- created.
582 await _delete_scheduled_runs(
583 session=session, deployment_id=deployment_id, auto_scheduled_only=False
584 )
586 await _delete_related_concurrency_limit(
587 db, session=session, deployment_id=deployment_id
588 )
590 result = await session.execute(
591 delete(db.Deployment).where(db.Deployment.id == deployment_id)
592 )
593 return result.rowcount > 0
596async def _delete_related_concurrency_limit( 1a
597 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID
598):
599 return await session.execute(
600 delete(db.ConcurrencyLimitV2).where(
601 db.ConcurrencyLimitV2.id
602 == sa.select(db.Deployment.concurrency_limit_id)
603 .where(db.Deployment.id == deployment_id)
604 .scalar_subquery()
605 )
606 )
609@db_injector 1a
610async def schedule_runs( 1a
611 db: PrefectDBInterface,
612 session: AsyncSession,
613 deployment_id: UUID,
614 start_time: Optional[datetime.datetime] = None,
615 end_time: Optional[datetime.datetime] = None,
616 min_time: Optional[datetime.timedelta] = None,
617 min_runs: Optional[int] = None,
618 max_runs: Optional[int] = None,
619 auto_scheduled: bool = True,
620) -> Sequence[UUID]:
621 """
622 Schedule flow runs for a deployment
624 Args:
625 session: a database session
626 deployment_id: the id of the deployment to schedule
627 start_time: the time from which to start scheduling runs
628 end_time: runs will be scheduled until at most this time
629 min_time: runs will be scheduled until at least this far in the future
630 min_runs: a minimum amount of runs to schedule
631 max_runs: a maximum amount of runs to schedule
633 This function will generate the minimum number of runs that satisfy the min
634 and max times, and the min and max counts. Specifically, the following order
635 will be respected.
637 - Runs will be generated starting on or after the `start_time`
638 - No more than `max_runs` runs will be generated
639 - No runs will be generated after `end_time` is reached
640 - At least `min_runs` runs will be generated
641 - Runs will be generated until at least `start_time` + `min_time` is reached
643 Returns:
644 a list of flow run ids scheduled for the deployment
645 """
646 if min_runs is None:
647 min_runs = PREFECT_API_SERVICES_SCHEDULER_MIN_RUNS.value()
648 assert min_runs is not None
649 if max_runs is None:
650 max_runs = PREFECT_API_SERVICES_SCHEDULER_MAX_RUNS.value()
651 assert max_runs is not None
652 if start_time is None:
653 start_time = now("UTC")
654 if end_time is None:
655 end_time = start_time + (
656 PREFECT_API_SERVICES_SCHEDULER_MAX_SCHEDULED_TIME.value()
657 )
658 if min_time is None:
659 min_time = PREFECT_API_SERVICES_SCHEDULER_MIN_SCHEDULED_TIME.value()
660 assert min_time is not None
662 actual_start_time = start_time
663 if TYPE_CHECKING:
664 assert end_time is not None
665 actual_end_time = end_time
667 runs = await _generate_scheduled_flow_runs(
668 db,
669 session=session,
670 deployment_id=deployment_id,
671 start_time=actual_start_time,
672 end_time=actual_end_time,
673 min_time=min_time,
674 min_runs=min_runs,
675 max_runs=max_runs,
676 auto_scheduled=auto_scheduled,
677 )
678 return await _insert_scheduled_flow_runs(session=session, runs=runs)
681async def _generate_scheduled_flow_runs( 1a
682 db: PrefectDBInterface,
683 session: AsyncSession,
684 deployment_id: UUID,
685 start_time: datetime.datetime,
686 end_time: datetime.datetime,
687 min_time: datetime.timedelta,
688 min_runs: int,
689 max_runs: int,
690 auto_scheduled: bool = True,
691) -> list[dict[str, Any]]:
692 """
693 Given a `deployment_id` and schedule, generates a list of flow run objects and
694 associated scheduled states that represent scheduled flow runs. This method
695 does NOT insert generated runs into the database, in order to facilitate
696 batch operations. Call `_insert_scheduled_flow_runs()` to insert these runs.
698 Runs include an idempotency key which prevents duplicate runs from being inserted
699 if the output from this function is used more than once.
701 Args:
702 session: a database session
703 deployment_id: the id of the deployment to schedule
704 start_time: the time from which to start scheduling runs
705 end_time: runs will be scheduled until at most this time
706 min_time: runs will be scheduled until at least this far in the future
707 min_runs: a minimum amount of runs to schedule
708 max_runs: a maximum amount of runs to schedule
710 This function will generate the minimum number of runs that satisfy the min
711 and max times, and the min and max counts. Specifically, the following order
712 will be respected.
714 - Runs will be generated starting on or after the `start_time`
715 - No more than `max_runs` runs will be generated
716 - No runs will be generated after `end_time` is reached
717 - At least `min_runs` runs will be generated
718 - Runs will be generated until at least `start_time + min_time` is reached
720 Returns:
721 a list of dictionary representations of the `FlowRun` objects to schedule
722 """
723 runs: list[dict[str, Any]] = []
725 deployment = await session.get(db.Deployment, deployment_id)
727 if not deployment:
728 return []
730 active_deployment_schedules = await read_deployment_schedules(
731 session=session,
732 deployment_id=deployment.id,
733 deployment_schedule_filter=schemas.filters.DeploymentScheduleFilter(
734 active=schemas.filters.DeploymentScheduleFilterActive(eq_=True)
735 ),
736 )
738 for deployment_schedule in active_deployment_schedules:
739 dates: list[DateTime] = []
741 # generate up to `n` dates satisfying the min of `max_runs` and `end_time`
742 for dt in deployment_schedule.schedule._get_dates_generator(
743 n=max_runs, start=start_time, end=end_time
744 ):
745 dates.append(dt)
747 # at any point, if we satisfy both of the minimums, we can stop
748 if len(dates) >= min_runs and dt >= (start_time + min_time):
749 break
751 tags = deployment.tags
752 if auto_scheduled:
753 tags = ["auto-scheduled"] + tags
755 parameters = {
756 **deployment.parameters,
757 **deployment_schedule.parameters,
758 }
760 # Generate system labels for flow runs from this deployment
761 labels = await with_system_labels_for_deployment_flow_run(
762 session=session,
763 deployment=deployment,
764 )
766 for date in dates:
767 runs.append(
768 {
769 "id": uuid7(),
770 "flow_id": deployment.flow_id,
771 "deployment_id": deployment_id,
772 "deployment_version": deployment.version,
773 "work_queue_name": deployment.work_queue_name,
774 "work_queue_id": deployment.work_queue_id,
775 "parameters": parameters,
776 "infrastructure_document_id": deployment.infrastructure_document_id,
777 "idempotency_key": f"scheduled {deployment.id} {deployment_schedule.id} {date}",
778 "tags": tags,
779 "labels": labels,
780 "auto_scheduled": auto_scheduled,
781 "state": schemas.states.Scheduled(
782 scheduled_time=date,
783 message="Flow run scheduled",
784 ).model_dump(),
785 "state_type": schemas.states.StateType.SCHEDULED,
786 "state_name": "Scheduled",
787 "next_scheduled_start_time": date,
788 "expected_start_time": date,
789 "created_by": {
790 "id": deployment_schedule.id,
791 "display_value": deployment_schedule.slug
792 or deployment_schedule.schedule.__class__.__name__,
793 "type": "SCHEDULE",
794 },
795 }
796 )
798 return runs
801@db_injector 1a
802async def _insert_scheduled_flow_runs( 1a
803 db: PrefectDBInterface, session: AsyncSession, runs: list[dict[str, Any]]
804) -> Sequence[UUID]:
805 """
806 Given a list of flow runs to schedule, as generated by `_generate_scheduled_flow_runs`,
807 inserts them into the database. Note this is a separate method to facilitate batch
808 operations on many scheduled runs.
810 Args:
811 session: a database session
812 runs: a list of dicts representing flow runs to insert
814 Returns:
815 a list of flow run ids that were created
816 """
818 if not runs:
819 return []
821 # gracefully insert the flow runs against the idempotency key
822 # this syntax (insert statement, values to insert) is most efficient
823 # because it uses a single bind parameter
824 await session.execute(
825 db.queries.insert(db.FlowRun).on_conflict_do_nothing(
826 index_elements=db.orm.flow_run_unique_upsert_columns
827 ),
828 runs,
829 )
831 # query for the rows that were newly inserted (by checking for any flow runs with
832 # no corresponding flow run states)
833 inserted_rows = sa.select(db.FlowRun.id).where(
834 db.FlowRun.id.in_([r["id"] for r in runs]),
835 ~select(db.FlowRunState.id)
836 .where(db.FlowRunState.flow_run_id == db.FlowRun.id)
837 .exists(),
838 )
839 inserted_flow_run_ids = (await session.execute(inserted_rows)).scalars().all()
841 # insert flow run states that correspond to the newly-insert rows
842 insert_flow_run_states: list[dict[str, Any]] = [
843 {"id": uuid7(), "flow_run_id": r["id"], **r["state"]}
844 for r in runs
845 if r["id"] in inserted_flow_run_ids
846 ]
847 if insert_flow_run_states:
848 # this syntax (insert statement, values to insert) is most efficient
849 # because it uses a single bind parameter
850 await session.execute(
851 db.FlowRunState.__table__.insert(), # type: ignore[attr-defined]
852 insert_flow_run_states,
853 )
855 # set the `state_id` on the newly inserted runs
856 stmt = db.queries.set_state_id_on_inserted_flow_runs_statement(
857 inserted_flow_run_ids=inserted_flow_run_ids,
858 insert_flow_run_states=insert_flow_run_states,
859 )
861 await session.execute(stmt)
863 return inserted_flow_run_ids
866@db_injector 1a
867async def check_work_queues_for_deployment( 1a
868 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID
869) -> Sequence[orm_models.WorkQueue]:
870 """
871 Get work queues that can pick up the specified deployment.
873 Work queues will pick up a deployment when all of the following are met.
875 - The deployment has ALL tags that the work queue has (i.e. the work
876 queue's tags must be a subset of the deployment's tags).
877 - The work queue's specified deployment IDs match the deployment's ID,
878 or the work queue does NOT have specified deployment IDs.
879 - The work queue's specified flow runners match the deployment's flow
880 runner or the work queue does NOT have a specified flow runner.
882 Notes on the query:
884 - Our database currently allows either "null" and empty lists as
885 null values in filters, so we need to catch both cases with "or".
886 - `A.contains(B)` should be interpreted as "True if A
887 contains B".
889 Returns:
890 List[orm_models.WorkQueue]: WorkQueues
891 """
892 deployment = await session.get(db.Deployment, deployment_id)
893 if not deployment:
894 raise ObjectNotFoundError(f"Deployment with id {deployment_id} not found")
896 def json_contains(a: Any, b: Any) -> sa.ColumnElement[bool]:
897 return sa.type_coerce(a, type_=JSONB).contains(sa.type_coerce(b, type_=JSONB))
899 query = (
900 select(db.WorkQueue)
901 # work queue tags are a subset of deployment tags
902 .filter(
903 or_(
904 json_contains(deployment.tags, db.WorkQueue.filter["tags"]),
905 json_contains([], db.WorkQueue.filter["tags"]),
906 json_contains(None, db.WorkQueue.filter["tags"]),
907 )
908 )
909 # deployment_ids is null or contains the deployment's ID
910 .filter(
911 or_(
912 json_contains(
913 db.WorkQueue.filter["deployment_ids"],
914 str(deployment.id),
915 ),
916 json_contains(None, db.WorkQueue.filter["deployment_ids"]),
917 json_contains([], db.WorkQueue.filter["deployment_ids"]),
918 )
919 )
920 )
922 result = await session.execute(query)
923 return result.scalars().unique().all()
926@db_injector 1a
927async def create_deployment_schedules( 1a
928 db: PrefectDBInterface,
929 session: AsyncSession,
930 deployment_id: UUID,
931 schedules: list[schemas.actions.DeploymentScheduleCreate],
932) -> list[schemas.core.DeploymentSchedule]:
933 """
934 Creates a deployment's schedules.
936 Args:
937 session: A database session
938 deployment_id: a deployment id
939 schedules: a list of deployment schedule create actions
940 """
942 schedules_with_deployment_id: list[dict[str, Any]] = []
943 for schedule in schedules:
944 data = schedule.model_dump()
945 data["deployment_id"] = deployment_id
946 schedules_with_deployment_id.append(data)
948 models = [
949 db.DeploymentSchedule(**schedule) for schedule in schedules_with_deployment_id
950 ]
951 session.add_all(models)
952 await session.flush()
954 return [
955 schemas.core.DeploymentSchedule.model_validate(m, from_attributes=True)
956 for m in models
957 ]
960@db_injector 1a
961async def read_deployment_schedules( 1a
962 db: PrefectDBInterface,
963 session: AsyncSession,
964 deployment_id: UUID,
965 deployment_schedule_filter: Optional[
966 schemas.filters.DeploymentScheduleFilter
967 ] = None,
968) -> list[schemas.core.DeploymentSchedule]:
969 """
970 Reads a deployment's schedules.
972 Args:
973 session: A database session
974 deployment_id: a deployment id
976 Returns:
977 list[schemas.core.DeploymentSchedule]: the deployment's schedules
978 """
980 query = (
981 sa.select(db.DeploymentSchedule)
982 .where(db.DeploymentSchedule.deployment_id == deployment_id)
983 .order_by(db.DeploymentSchedule.updated.desc())
984 )
986 if deployment_schedule_filter:
987 query = query.where(deployment_schedule_filter.as_sql_filter())
989 result = await session.execute(query)
991 return [
992 schemas.core.DeploymentSchedule.model_validate(s, from_attributes=True)
993 for s in result.scalars().all()
994 ]
997@db_injector 1a
998async def update_deployment_schedule( 1a
999 db: PrefectDBInterface,
1000 session: AsyncSession,
1001 deployment_id: UUID,
1002 schedule: schemas.actions.DeploymentScheduleUpdate,
1003 deployment_schedule_id: UUID | None = None,
1004 deployment_schedule_slug: str | None = None,
1005) -> bool:
1006 """
1007 Updates a deployment's schedules.
1009 Args:
1010 session: A database session
1011 deployment_schedule_id: a deployment schedule id
1012 schedule: a deployment schedule update action
1013 """
1014 if deployment_schedule_id:
1015 result = await session.execute(
1016 sa.update(db.DeploymentSchedule)
1017 .where(
1018 sa.and_(
1019 db.DeploymentSchedule.id == deployment_schedule_id,
1020 db.DeploymentSchedule.deployment_id == deployment_id,
1021 )
1022 )
1023 .values(**schedule.model_dump(exclude_none=True))
1024 )
1025 elif deployment_schedule_slug:
1026 result = await session.execute(
1027 sa.update(db.DeploymentSchedule)
1028 .where(
1029 sa.and_(
1030 db.DeploymentSchedule.slug == deployment_schedule_slug,
1031 db.DeploymentSchedule.deployment_id == deployment_id,
1032 )
1033 )
1034 .values(**schedule.model_dump(exclude_none=True))
1035 )
1036 else:
1037 raise ValueError(
1038 "Either deployment_schedule_id or deployment_schedule_slug must be provided"
1039 )
1041 return result.rowcount > 0
1044@db_injector 1a
1045async def delete_schedules_for_deployment( 1a
1046 db: PrefectDBInterface, session: AsyncSession, deployment_id: UUID
1047) -> bool:
1048 """
1049 Deletes a deployment schedule.
1051 Args:
1052 session: A database session
1053 deployment_id: a deployment id
1054 """
1056 deployment = await session.get(db.Deployment, deployment_id)
1057 assert deployment is not None
1059 result = await session.execute(
1060 sa.delete(db.DeploymentSchedule).where(
1061 db.DeploymentSchedule.deployment_id == deployment_id
1062 )
1063 )
1065 await session.refresh(deployment)
1066 return result.rowcount > 0
1069@db_injector 1a
1070async def delete_deployment_schedule( 1a
1071 db: PrefectDBInterface,
1072 session: AsyncSession,
1073 deployment_id: UUID,
1074 deployment_schedule_id: UUID,
1075) -> bool:
1076 """
1077 Deletes a deployment schedule.
1079 Args:
1080 session: A database session
1081 deployment_schedule_id: a deployment schedule id
1082 """
1084 result = await session.execute(
1085 sa.delete(db.DeploymentSchedule).where(
1086 sa.and_(
1087 db.DeploymentSchedule.id == deployment_schedule_id,
1088 db.DeploymentSchedule.deployment_id == deployment_id,
1089 )
1090 )
1091 )
1093 return result.rowcount > 0
1096async def mark_deployments_ready( 1a
1097 *,
1098 db: PrefectDBInterface = Depends(provide_database_interface),
1099 deployment_ids: Optional[Iterable[UUID]] = None,
1100 work_queue_ids: Optional[Iterable[UUID]] = None,
1101 retry: Retry = Retry(attempts=5, delay=datetime.timedelta(seconds=0.5)),
1102) -> None:
1103 deployment_ids = deployment_ids or []
1104 work_queue_ids = work_queue_ids or []
1106 if not deployment_ids and not work_queue_ids:
1107 return
1109 async with db.session_context(
1110 begin_transaction=True,
1111 ) as session:
1112 result = await session.execute(
1113 select(db.Deployment.id).where(
1114 sa.or_(
1115 db.Deployment.id.in_(deployment_ids),
1116 db.Deployment.work_queue_id.in_(work_queue_ids),
1117 ),
1118 db.Deployment.status == DeploymentStatus.NOT_READY,
1119 )
1120 )
1121 unready_deployments = list(result.scalars().unique().all())
1123 last_polled = now("UTC")
1125 await session.execute(
1126 sa.update(db.Deployment)
1127 .where(
1128 sa.or_(
1129 db.Deployment.id.in_(deployment_ids),
1130 db.Deployment.work_queue_id.in_(work_queue_ids),
1131 )
1132 )
1133 .values(status=DeploymentStatus.READY, last_polled=last_polled)
1134 )
1136 if not unready_deployments:
1137 return
1139 async with PrefectServerEventsClient() as events:
1140 for deployment_id in unready_deployments:
1141 await events.emit(
1142 await deployment_status_event(
1143 session=session,
1144 deployment_id=deployment_id,
1145 status=DeploymentStatus.READY,
1146 occurred=last_polled,
1147 )
1148 )
1151@db_injector 1a
1152async def mark_deployments_not_ready( 1a
1153 db: PrefectDBInterface,
1154 deployment_ids: Optional[Iterable[UUID]] = None,
1155 work_queue_ids: Optional[Iterable[UUID]] = None,
1156) -> None:
1157 try:
1158 deployment_ids = deployment_ids or []
1159 work_queue_ids = work_queue_ids or []
1161 if not deployment_ids and not work_queue_ids:
1162 return
1164 async with db.session_context(
1165 begin_transaction=True,
1166 ) as session:
1167 result = await session.execute(
1168 select(db.Deployment.id).where(
1169 sa.or_(
1170 db.Deployment.id.in_(deployment_ids),
1171 db.Deployment.work_queue_id.in_(work_queue_ids),
1172 ),
1173 db.Deployment.status == DeploymentStatus.READY,
1174 )
1175 )
1176 ready_deployments = list(result.scalars().unique().all())
1178 await session.execute(
1179 sa.update(db.Deployment)
1180 .where(
1181 sa.or_(
1182 db.Deployment.id.in_(deployment_ids),
1183 db.Deployment.work_queue_id.in_(work_queue_ids),
1184 )
1185 )
1186 .values(status=DeploymentStatus.NOT_READY)
1187 )
1189 if not ready_deployments:
1190 return
1192 async with PrefectServerEventsClient() as events:
1193 for deployment_id in ready_deployments:
1194 await events.emit(
1195 await deployment_status_event(
1196 session=session,
1197 deployment_id=deployment_id,
1198 status=DeploymentStatus.NOT_READY,
1199 occurred=now("UTC"),
1200 )
1201 )
1202 except Exception as exc:
1203 logger.error(f"Error marking deployments as not ready: {exc}", exc_info=True)
1206async def with_system_labels_for_deployment( 1a
1207 session: AsyncSession,
1208 deployment: schemas.core.Deployment,
1209) -> schemas.core.KeyValueLabels:
1210 """Augment user supplied labels with system default labels for a deployment."""
1211 default_labels = cast(
1212 schemas.core.KeyValueLabels,
1213 {
1214 "prefect.flow.id": str(deployment.flow_id),
1215 },
1216 )
1218 user_supplied_labels = deployment.labels or {}
1220 parent_labels = (
1221 await models.flows.read_flow_labels(session, deployment.flow_id)
1222 ) or {}
1224 return parent_labels | default_labels | user_supplied_labels
1227async def with_system_labels_for_deployment_flow_run( 1a
1228 session: AsyncSession,
1229 deployment: orm_models.Deployment,
1230 user_supplied_labels: Optional[schemas.core.KeyValueLabels] = None,
1231) -> schemas.core.KeyValueLabels:
1232 """Generate system labels for a flow run created from a deployment.
1234 Args:
1235 session: Database session
1236 deployment: The deployment the flow run is created from
1237 user_supplied_labels: Optional user-supplied labels to include
1239 Returns:
1240 Complete set of labels for the flow run
1241 """
1242 system_labels = cast(
1243 schemas.core.KeyValueLabels,
1244 {
1245 "prefect.flow.id": str(deployment.flow_id),
1246 "prefect.deployment.id": str(deployment.id),
1247 },
1248 )
1250 # Use deployment labels as parent labels for flow runs
1251 parent_labels = deployment.labels or {}
1252 user_labels = user_supplied_labels or {}
1254 return parent_labels | system_labels | user_labels