Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flow_runs.py: 25%
191 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Functions for interacting with flow run ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6import contextlib 1a
7import datetime 1a
8from itertools import chain 1a
9from typing import ( 1a
10 TYPE_CHECKING,
11 Any,
12 Dict,
13 List,
14 Optional,
15 Sequence,
16 Tuple,
17 Type,
18 TypeVar,
19 Union,
20 cast,
21)
22from uuid import UUID 1a
24import sqlalchemy as sa 1a
25from sqlalchemy import delete, select 1a
26from sqlalchemy.ext.asyncio import AsyncSession 1a
27from sqlalchemy.orm import load_only, selectinload 1a
28from sqlalchemy.sql import Select 1a
30import prefect.server.models as models 1a
31import prefect.server.schemas as schemas 1a
32from prefect.logging.loggers import get_logger 1a
33from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a
34from prefect.server.exceptions import ObjectNotFoundError 1a
35from prefect.server.orchestration.core_policy import MinimalFlowPolicy 1a
36from prefect.server.orchestration.global_policy import GlobalFlowPolicy 1a
37from prefect.server.orchestration.policies import ( 1a
38 FlowRunOrchestrationPolicy,
39)
40from prefect.server.orchestration.rules import FlowOrchestrationContext 1a
41from prefect.server.schemas.core import TaskRunResult 1a
42from prefect.server.schemas.graph import Graph 1a
43from prefect.server.schemas.responses import OrchestrationResult 1a
44from prefect.server.schemas.states import State 1a
45from prefect.server.utilities.schemas import PrefectBaseModel 1a
46from prefect.settings import ( 1a
47 PREFECT_API_MAX_FLOW_RUN_GRAPH_ARTIFACTS,
48 PREFECT_API_MAX_FLOW_RUN_GRAPH_NODES,
49)
50from prefect.types import KeyValueLabels 1a
51from prefect.types._datetime import DateTime, earliest_possible_datetime, now 1a
53if TYPE_CHECKING: 53 ↛ 54line 53 didn't jump to line 54 because the condition on line 53 was never true1a
54 import logging
56logger: "logging.Logger" = get_logger("flow_runs") 1a
59T = TypeVar("T", bound=tuple[Any, ...]) 1a
62@db_injector 1a
63async def create_flow_run( 1a
64 db: PrefectDBInterface,
65 session: AsyncSession,
66 flow_run: schemas.core.FlowRun,
67 orchestration_parameters: Optional[dict[str, Any]] = None,
68) -> orm_models.FlowRun:
69 """Creates a new flow run.
71 If the provided flow run has a state attached, it will also be created.
73 Args:
74 session: a database session
75 flow_run: a flow run model
77 Returns:
78 orm_models.FlowRun: the newly-created flow run
79 """
80 right_now = now("UTC")
81 # model: Union[orm_models.FlowRun, None] = None
83 flow_run.labels = await with_system_labels_for_flow_run(
84 session=session, flow_run=flow_run
85 )
87 flow_run_dict = dict(
88 **flow_run.model_dump_for_orm(
89 exclude={
90 "created",
91 "state",
92 "estimated_run_time",
93 "estimated_start_time_delta",
94 },
95 exclude_unset=True,
96 ),
97 created=right_now,
98 )
100 # if no idempotency key was provided, create the run directly
101 if not flow_run.idempotency_key:
102 model = db.FlowRun(**flow_run_dict)
103 session.add(model)
104 await session.flush()
106 # otherwise let the database take care of enforcing idempotency
107 else:
108 insert_stmt = (
109 db.queries.insert(db.FlowRun)
110 .values(**flow_run_dict)
111 .on_conflict_do_nothing(
112 index_elements=db.orm.flow_run_unique_upsert_columns,
113 )
114 )
115 await session.execute(insert_stmt)
117 # read the run to see if idempotency was applied or not
118 query = (
119 sa.select(db.FlowRun)
120 .where(
121 sa.and_(
122 db.FlowRun.flow_id == flow_run.flow_id,
123 db.FlowRun.idempotency_key == flow_run.idempotency_key,
124 )
125 )
126 .limit(1)
127 .execution_options(populate_existing=True)
128 .options(
129 selectinload(db.FlowRun.work_queue).selectinload(db.WorkQueue.work_pool)
130 )
131 )
132 result = await session.execute(query)
133 model = result.scalar_one()
135 # if the flow run was created in this function call then we need to set the
136 # state. If it was created idempotently, the created time won't match.
137 if model.created == right_now and flow_run.state:
138 await models.flow_runs.set_flow_run_state(
139 session=session,
140 flow_run_id=model.id,
141 state=flow_run.state,
142 force=True,
143 orchestration_parameters=orchestration_parameters,
144 )
145 return model
148@db_injector 1a
149async def update_flow_run( 1a
150 db: PrefectDBInterface,
151 session: AsyncSession,
152 flow_run_id: UUID,
153 flow_run: schemas.actions.FlowRunUpdate,
154) -> bool:
155 """
156 Updates a flow run.
158 Args:
159 session: a database session
160 flow_run_id: the flow run id to update
161 flow_run: a flow run model
163 Returns:
164 bool: whether or not matching rows were found to update
165 """
166 update_stmt = (
167 sa.update(db.FlowRun)
168 .where(db.FlowRun.id == flow_run_id)
169 # exclude_unset=True allows us to only update values provided by
170 # the user, ignoring any defaults on the model
171 .values(**flow_run.model_dump_for_orm(exclude_unset=True))
172 )
173 result = await session.execute(update_stmt)
174 return result.rowcount > 0
177@db_injector 1a
178async def read_flow_run( 1a
179 db: PrefectDBInterface,
180 session: AsyncSession,
181 flow_run_id: UUID,
182 for_update: bool = False,
183) -> Optional[orm_models.FlowRun]:
184 """
185 Reads a flow run by id.
187 Args:
188 session: A database session
189 flow_run_id: a flow run id
191 Returns:
192 orm_models.FlowRun: the flow run
193 """
194 select = (
195 sa.select(db.FlowRun)
196 .where(db.FlowRun.id == flow_run_id)
197 .options(
198 selectinload(db.FlowRun.work_queue).selectinload(db.WorkQueue.work_pool)
199 )
200 )
202 if for_update:
203 select = select.with_for_update()
205 result = await session.execute(select)
206 return result.scalar()
209async def _apply_flow_run_filters( 1a
210 db: PrefectDBInterface,
211 query: Select[T],
212 flow_filter: Optional[schemas.filters.FlowFilter] = None,
213 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None,
214 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None,
215 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None,
216 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None,
217 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None,
218) -> Select[T]:
219 """
220 Applies filters to a flow run query as a combination of EXISTS subqueries.
221 """
223 if flow_run_filter:
224 query = query.where(flow_run_filter.as_sql_filter())
226 if deployment_filter:
227 deployment_exists_clause = select(db.Deployment).where(
228 db.Deployment.id == db.FlowRun.deployment_id,
229 deployment_filter.as_sql_filter(),
230 )
231 query = query.where(deployment_exists_clause.exists())
233 if work_pool_filter:
234 work_pool_exists_clause = select(db.WorkPool).where(
235 db.WorkQueue.id == db.FlowRun.work_queue_id,
236 db.WorkPool.id == db.WorkQueue.work_pool_id,
237 work_pool_filter.as_sql_filter(),
238 )
240 query = query.where(work_pool_exists_clause.exists())
242 if work_queue_filter:
243 work_queue_exists_clause = select(db.WorkQueue).where(
244 db.WorkQueue.id == db.FlowRun.work_queue_id,
245 work_queue_filter.as_sql_filter(),
246 )
247 query = query.where(work_queue_exists_clause.exists())
249 if flow_filter or task_run_filter:
250 flow_or_task_run_exists_clause: Union[
251 Select[Tuple[db.Flow]],
252 Select[Tuple[db.TaskRun]],
253 ]
255 if flow_filter:
256 flow_or_task_run_exists_clause = select(db.Flow).where(
257 db.Flow.id == db.FlowRun.flow_id,
258 flow_filter.as_sql_filter(),
259 )
261 if task_run_filter:
262 if not flow_filter:
263 flow_or_task_run_exists_clause = select(db.TaskRun).where(
264 db.TaskRun.flow_run_id == db.FlowRun.id
265 )
266 else:
267 flow_or_task_run_exists_clause = flow_or_task_run_exists_clause.join(
268 db.TaskRun,
269 db.TaskRun.flow_run_id == db.FlowRun.id,
270 )
271 flow_or_task_run_exists_clause = flow_or_task_run_exists_clause.where(
272 db.FlowRun.id == db.TaskRun.flow_run_id,
273 task_run_filter.as_sql_filter(),
274 )
276 query = query.where(flow_or_task_run_exists_clause.exists())
278 return query
281@db_injector 1a
282async def read_flow_runs( 1a
283 db: PrefectDBInterface,
284 session: AsyncSession,
285 columns: Optional[list[str]] = None,
286 flow_filter: Optional[schemas.filters.FlowFilter] = None,
287 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None,
288 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None,
289 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None,
290 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None,
291 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None,
292 offset: Optional[int] = None,
293 limit: Optional[int] = None,
294 sort: schemas.sorting.FlowRunSort = schemas.sorting.FlowRunSort.ID_DESC,
295) -> Sequence[orm_models.FlowRun]:
296 """
297 Read flow runs.
299 Args:
300 session: a database session
301 columns: a list of the flow run ORM columns to load, for performance
302 flow_filter: only select flow runs whose flows match these filters
303 flow_run_filter: only select flow runs match these filters
304 task_run_filter: only select flow runs whose task runs match these filters
305 deployment_filter: only select flow runs whose deployments match these filters
306 offset: Query offset
307 limit: Query limit
308 sort: Query sort
310 Returns:
311 List[orm_models.FlowRun]: flow runs
312 """
313 query = (
314 select(db.FlowRun)
315 .order_by(*sort.as_sql_sort())
316 .options(
317 selectinload(db.FlowRun.work_queue).selectinload(db.WorkQueue.work_pool)
318 )
319 )
321 if columns:
322 query = query.options(load_only(*columns))
324 query = await _apply_flow_run_filters(
325 db,
326 query,
327 flow_filter=flow_filter,
328 flow_run_filter=flow_run_filter,
329 task_run_filter=task_run_filter,
330 deployment_filter=deployment_filter,
331 work_pool_filter=work_pool_filter,
332 work_queue_filter=work_queue_filter,
333 )
335 if offset is not None:
336 query = query.offset(offset)
338 if limit is not None:
339 query = query.limit(limit)
341 result = await session.execute(query)
342 return result.scalars().unique().all()
345async def cleanup_flow_run_concurrency_slots( 1a
346 session: AsyncSession,
347 flow_run: orm_models.FlowRun,
348) -> None:
349 """
350 Cleanup flow run related resources, such as releasing concurrency slots.
351 All operations should be idempotent and safe to call multiple times.
352 IMPORTANT: This run may no longer exist in the database when this operation occurs.
353 """
355 if (
356 flow_run.deployment_id
357 and flow_run.state
358 and flow_run.state.type
359 in (
360 schemas.states.StateType.PENDING,
361 schemas.states.StateType.RUNNING,
362 schemas.states.StateType.CANCELLING,
363 )
364 ):
365 deployment = await models.deployments.read_deployment(
366 session, flow_run.deployment_id
367 )
368 if deployment and deployment.concurrency_limit_id:
369 await models.concurrency_limits_v2.bulk_decrement_active_slots(
370 session, [deployment.concurrency_limit_id], 1
371 )
374class DependencyResult(PrefectBaseModel): 1a
375 id: UUID 1a
376 name: str 1a
377 upstream_dependencies: List[TaskRunResult] 1a
378 state: Optional[State] 1a
379 expected_start_time: Optional[datetime.datetime] 1a
380 start_time: Optional[datetime.datetime] 1a
381 end_time: Optional[datetime.datetime] 1a
382 total_run_time: Optional[datetime.timedelta] 1a
383 estimated_run_time: Optional[datetime.timedelta] 1a
384 untrackable_result: bool 1a
387async def read_task_run_dependencies( 1a
388 session: AsyncSession,
389 flow_run_id: UUID,
390) -> List[DependencyResult]:
391 """
392 Get a task run dependency map for a given flow run.
393 """
394 flow_run = await models.flow_runs.read_flow_run(
395 session=session, flow_run_id=flow_run_id
396 )
397 if not flow_run:
398 raise ObjectNotFoundError(f"Flow run with id {flow_run_id} not found")
400 task_runs = await models.task_runs.read_task_runs(
401 session=session,
402 flow_run_filter=schemas.filters.FlowRunFilter(
403 id=schemas.filters.FlowRunFilterId(any_=[flow_run_id])
404 ),
405 )
407 dependency_graph = []
409 for task_run in task_runs:
410 inputs = list(set(chain(*task_run.task_inputs.values())))
411 untrackable_result_status = (
412 False
413 if task_run.state is None
414 else task_run.state.state_details.untrackable_result
415 )
416 dependency_graph.append(
417 DependencyResult(
418 id=task_run.id,
419 upstream_dependencies=inputs,
420 state=task_run.state,
421 expected_start_time=task_run.expected_start_time,
422 name=task_run.name,
423 start_time=task_run.start_time,
424 end_time=task_run.end_time,
425 total_run_time=task_run.total_run_time,
426 estimated_run_time=task_run.estimated_run_time,
427 untrackable_result=untrackable_result_status,
428 )
429 )
431 return dependency_graph
434@db_injector 1a
435async def count_flow_runs( 1a
436 db: PrefectDBInterface,
437 session: AsyncSession,
438 flow_filter: Optional[schemas.filters.FlowFilter] = None,
439 flow_run_filter: Optional[schemas.filters.FlowRunFilter] = None,
440 task_run_filter: Optional[schemas.filters.TaskRunFilter] = None,
441 deployment_filter: Optional[schemas.filters.DeploymentFilter] = None,
442 work_pool_filter: Optional[schemas.filters.WorkPoolFilter] = None,
443 work_queue_filter: Optional[schemas.filters.WorkQueueFilter] = None,
444) -> int:
445 """
446 Count flow runs.
448 Args:
449 session: a database session
450 flow_filter: only count flow runs whose flows match these filters
451 flow_run_filter: only count flow runs that match these filters
452 task_run_filter: only count flow runs whose task runs match these filters
453 deployment_filter: only count flow runs whose deployments match these filters
455 Returns:
456 int: count of flow runs
457 """
459 query = select(sa.func.count(None)).select_from(db.FlowRun)
461 query = await _apply_flow_run_filters(
462 db,
463 query,
464 flow_filter=flow_filter,
465 flow_run_filter=flow_run_filter,
466 task_run_filter=task_run_filter,
467 deployment_filter=deployment_filter,
468 work_pool_filter=work_pool_filter,
469 work_queue_filter=work_queue_filter,
470 )
472 result = await session.execute(query)
473 return result.scalar_one()
476@db_injector 1a
477async def delete_flow_run( 1a
478 db: PrefectDBInterface, session: AsyncSession, flow_run_id: UUID
479) -> bool:
480 """
481 Delete a flow run by flow_run_id, handling concurrency limits if applicable.
483 Args:
484 session: A database session
485 flow_run_id: a flow run id
487 Returns:
488 bool: whether or not the flow run was deleted
489 """
490 flow_run = await read_flow_run(session, flow_run_id)
491 if not flow_run:
492 return False
494 deployment_id = flow_run.deployment_id
496 if deployment_id:
497 await cleanup_flow_run_concurrency_slots(session=session, flow_run=flow_run)
499 # Delete the flow run
500 result = await session.execute(
501 delete(db.FlowRun).where(db.FlowRun.id == flow_run_id)
502 )
504 return result.rowcount > 0
507async def set_flow_run_state( 1a
508 session: AsyncSession,
509 flow_run_id: UUID,
510 state: schemas.states.State,
511 force: bool = False,
512 flow_policy: Optional[Type[FlowRunOrchestrationPolicy]] = None,
513 orchestration_parameters: Optional[Dict[str, Any]] = None,
514 client_version: Optional[str] = None,
515) -> OrchestrationResult:
516 """
517 Creates a new orchestrated flow run state.
519 Setting a new state on a run is the one of the principal actions that is governed by
520 Prefect's orchestration logic. Setting a new run state will not guarantee creation,
521 but instead trigger orchestration rules to govern the proposed `state` input. If
522 the state is considered valid, it will be written to the database. Otherwise, a
523 it's possible a different state, or no state, will be created. A `force` flag is
524 supplied to bypass a subset of orchestration logic.
526 Args:
527 session: a database session
528 flow_run_id: the flow run id
529 state: a flow run state model
530 force: if False, orchestration rules will be applied that may alter or prevent
531 the state transition. If True, orchestration rules are not applied.
533 Returns:
534 OrchestrationResult object
535 """
537 # load the flow run
538 run = await models.flow_runs.read_flow_run(
539 session=session,
540 flow_run_id=flow_run_id,
541 # Lock the row to prevent orchestration race conditions
542 for_update=True,
543 )
545 if not run:
546 raise ObjectNotFoundError(f"Flow run with id {flow_run_id} not found")
548 initial_state = run.state.as_state() if run.state else None
549 initial_state_type = initial_state.type if initial_state else None
550 proposed_state_type = state.type if state else None
551 intended_transition = (initial_state_type, proposed_state_type)
553 if force or flow_policy is None:
554 flow_policy = MinimalFlowPolicy
556 orchestration_rules = flow_policy.compile_transition_rules(*intended_transition) # type: ignore
557 global_rules = GlobalFlowPolicy.compile_transition_rules(*intended_transition)
559 context = FlowOrchestrationContext(
560 session=session,
561 run=run,
562 initial_state=initial_state,
563 proposed_state=state,
564 client_version=client_version,
565 )
567 if orchestration_parameters is not None:
568 context.parameters = orchestration_parameters
570 # apply orchestration rules and create the new flow run state
571 async with contextlib.AsyncExitStack() as stack:
572 for rule in orchestration_rules:
573 context = await stack.enter_async_context(
574 rule(context, *intended_transition)
575 )
577 for rule in global_rules:
578 context = await stack.enter_async_context(
579 rule(context, *intended_transition)
580 )
582 await context.validate_proposed_state()
584 if context.orchestration_error is not None:
585 raise context.orchestration_error
587 result = OrchestrationResult(
588 state=context.validated_state,
589 status=context.response_status,
590 details=context.response_details,
591 )
593 return result
596@db_injector 1a
597async def read_flow_run_graph( 1a
598 db: PrefectDBInterface,
599 session: AsyncSession,
600 flow_run_id: UUID,
601 since: datetime.datetime = earliest_possible_datetime(),
602) -> Graph:
603 """Given a flow run, return the graph of it's task and subflow runs. If a `since`
604 datetime is provided, only return items that may have changed since that time."""
605 if isinstance(since, str):
606 since = DateTime.fromisoformat(since)
608 return await db.queries.flow_run_graph_v2(
609 session=session,
610 flow_run_id=flow_run_id,
611 since=since,
612 max_nodes=PREFECT_API_MAX_FLOW_RUN_GRAPH_NODES.value(),
613 max_artifacts=PREFECT_API_MAX_FLOW_RUN_GRAPH_ARTIFACTS.value(),
614 )
617async def with_system_labels_for_flow_run( 1a
618 session: AsyncSession,
619 flow_run: Union[schemas.core.FlowRun, schemas.actions.FlowRunCreate],
620) -> schemas.core.KeyValueLabels:
621 """Augment user supplied labels with system default labels for a flow
622 run."""
624 user_supplied_labels = flow_run.labels or {}
626 # `deployment_id` is deprecated on `schemas.actions.FlowRunCreate`. Only
627 # check `deployment_id` if given an instance of a `schemas.core.FlowRun`.
628 if isinstance(flow_run, schemas.core.FlowRun) and flow_run.deployment_id:
629 deployment = await models.deployments.read_deployment(
630 session, deployment_id=flow_run.deployment_id
631 )
632 if deployment:
633 # Use the deployment flow run utility for consistent label generation
634 return await models.deployments.with_system_labels_for_deployment_flow_run(
635 session=session,
636 deployment=deployment,
637 user_supplied_labels=user_supplied_labels,
638 )
640 # If the flow run is not part of a deployment, generate basic flow labels
641 default_labels = cast(
642 schemas.core.KeyValueLabels,
643 {
644 "prefect.flow.id": str(flow_run.flow_id),
645 },
646 )
648 parent_labels = await models.flows.read_flow_labels(session, flow_run.flow_id) or {}
650 return parent_labels | default_labels | user_supplied_labels
653@db_injector 1a
654async def update_flow_run_labels( 1a
655 db: PrefectDBInterface,
656 session: AsyncSession,
657 flow_run_id: UUID,
658 labels: KeyValueLabels,
659) -> bool:
660 """
661 Update flow run labels by patching existing labels with new values.
662 Args:
663 session: A database session
664 flow_run_id: the flow run id to update
665 labels: the new labels to patch into existing labels
666 Returns:
667 bool: whether the update was successful
668 """
669 # First read the existing flow run to get current labels
670 flow_run: Optional[orm_models.FlowRun] = await read_flow_run(session, flow_run_id)
671 if not flow_run:
672 raise ObjectNotFoundError(f"Flow run with id {flow_run_id} not found")
674 # Merge existing labels with new labels
675 current_labels = flow_run.labels or {}
676 updated_labels = {**current_labels, **labels}
678 try:
679 # Update the flow run with merged labels
680 result = await session.execute(
681 sa.update(db.FlowRun)
682 .where(db.FlowRun.id == flow_run_id)
683 .values(labels=updated_labels)
684 )
685 success = result.rowcount > 0
686 if success:
687 await session.commit() # Explicitly commit
688 return success
689 except Exception:
690 raise