Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/run_history.py: 33%
36 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Utilities for querying flow and task run history.
3"""
5import datetime 1a
6import json 1a
7from typing import TYPE_CHECKING, List, Optional 1a
9import pydantic 1a
10import sqlalchemy as sa 1a
11from typing_extensions import Literal 1a
13import prefect.server.models as models 1a
14import prefect.server.schemas as schemas 1a
15from prefect.logging import get_logger 1a
16from prefect.server.database import PrefectDBInterface, db_injector 1a
17from prefect.types import DateTime 1a
19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a
20 import logging
22logger: "logging.Logger" = get_logger("server.api") 1a
25@db_injector 1a
26async def run_history( 1a
27 db: PrefectDBInterface,
28 session: sa.orm.Session,
29 run_type: Literal["flow_run", "task_run"],
30 history_start: DateTime,
31 history_end: DateTime,
32 history_interval: datetime.timedelta,
33 flows: Optional[schemas.filters.FlowFilter] = None,
34 flow_runs: Optional[schemas.filters.FlowRunFilter] = None,
35 task_runs: Optional[schemas.filters.TaskRunFilter] = None,
36 deployments: Optional[schemas.filters.DeploymentFilter] = None,
37 work_pools: Optional[schemas.filters.WorkPoolFilter] = None,
38 work_queues: Optional[schemas.filters.WorkQueueFilter] = None,
39) -> list[schemas.responses.HistoryResponse]:
40 """
41 Produce a history of runs aggregated by interval and state
42 """
44 # SQLite has issues with very small intervals
45 # (by 0.001 seconds it stops incrementing the interval)
46 if history_interval < datetime.timedelta(seconds=1):
47 raise ValueError("History interval must not be less than 1 second.")
49 # prepare run-specific models
50 if run_type == "flow_run":
51 run_model = db.FlowRun
52 run_filter_function = models.flow_runs._apply_flow_run_filters
53 elif run_type == "task_run":
54 run_model = db.TaskRun
55 run_filter_function = models.task_runs._apply_task_run_filters
56 else:
57 raise ValueError(
58 f"Unknown run type {run_type!r}. Expected 'flow_run' or 'task_run'."
59 )
61 # create a CTE for timestamp intervals
62 intervals = db.queries.make_timestamp_intervals(
63 history_start,
64 history_end,
65 history_interval,
66 ).cte("intervals")
68 # apply filters to the flow runs (and related states)
69 runs = (
70 await run_filter_function(
71 db,
72 sa.select(
73 run_model.id,
74 run_model.expected_start_time,
75 run_model.estimated_run_time,
76 run_model.estimated_start_time_delta,
77 run_model.state_type,
78 run_model.state_name,
79 ).select_from(run_model),
80 flow_filter=flows,
81 flow_run_filter=flow_runs,
82 task_run_filter=task_runs,
83 deployment_filter=deployments,
84 work_pool_filter=work_pools,
85 work_queue_filter=work_queues,
86 )
87 ).alias("runs")
88 # outer join intervals to the filtered runs to create a dataset composed of
89 # every interval and the aggregate of all its runs. The runs aggregate is represented
90 # by a descriptive JSON object
91 counts = (
92 sa.select(
93 intervals.c.interval_start,
94 intervals.c.interval_end,
95 # build a JSON object, ignoring the case where the count of runs is 0
96 sa.case(
97 (sa.func.count(runs.c.id) == 0, None),
98 else_=db.queries.build_json_object(
99 "state_type",
100 runs.c.state_type,
101 "state_name",
102 runs.c.state_name,
103 "count_runs",
104 sa.func.count(runs.c.id),
105 # estimated run times only includes positive run times (to avoid any unexpected corner cases)
106 "sum_estimated_run_time",
107 sa.func.sum(
108 sa.func.greatest(
109 0, sa.extract("epoch", runs.c.estimated_run_time)
110 )
111 ),
112 # estimated lateness is the sum of any positive start time deltas
113 "sum_estimated_lateness",
114 sa.func.sum(
115 sa.func.greatest(
116 0, sa.extract("epoch", runs.c.estimated_start_time_delta)
117 )
118 ),
119 ),
120 ).label("state_agg"),
121 )
122 .select_from(intervals)
123 .join(
124 runs,
125 sa.and_(
126 runs.c.expected_start_time >= intervals.c.interval_start,
127 runs.c.expected_start_time < intervals.c.interval_end,
128 ),
129 isouter=True,
130 )
131 .group_by(
132 intervals.c.interval_start,
133 intervals.c.interval_end,
134 runs.c.state_type,
135 runs.c.state_name,
136 )
137 ).alias("counts")
139 # aggregate all state-aggregate objects into a single array for each interval,
140 # ensuring that intervals with no runs have an empty array
141 query = (
142 sa.select(
143 counts.c.interval_start,
144 counts.c.interval_end,
145 sa.func.coalesce(
146 db.queries.json_arr_agg(
147 db.queries.cast_to_json(counts.c.state_agg)
148 ).filter(counts.c.state_agg.is_not(None)),
149 sa.literal("[]", literal_execute=True),
150 ).label("states"),
151 )
152 .group_by(counts.c.interval_start, counts.c.interval_end)
153 .order_by(counts.c.interval_start)
154 # return no more than 500 bars
155 .limit(500)
156 )
158 # issue the query
159 result = await session.execute(query)
160 records = result.mappings()
162 # load and parse the record if the database returns JSON as strings
163 if db.queries.uses_json_strings:
164 records = [dict(r) for r in records]
165 for r in records:
166 r["states"] = json.loads(r["states"])
168 return pydantic.TypeAdapter(
169 List[schemas.responses.HistoryResponse]
170 ).validate_python(records)