Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/run_history.py: 33%

36 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Utilities for querying flow and task run history. 

3""" 

4 

5import datetime 1a

6import json 1a

7from typing import TYPE_CHECKING, List, Optional 1a

8 

9import pydantic 1a

10import sqlalchemy as sa 1a

11from typing_extensions import Literal 1a

12 

13import prefect.server.models as models 1a

14import prefect.server.schemas as schemas 1a

15from prefect.logging import get_logger 1a

16from prefect.server.database import PrefectDBInterface, db_injector 1a

17from prefect.types import DateTime 1a

18 

19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a

20 import logging 

21 

22logger: "logging.Logger" = get_logger("server.api") 1a

23 

24 

25@db_injector 1a

26async def run_history( 1a

27 db: PrefectDBInterface, 

28 session: sa.orm.Session, 

29 run_type: Literal["flow_run", "task_run"], 

30 history_start: DateTime, 

31 history_end: DateTime, 

32 history_interval: datetime.timedelta, 

33 flows: Optional[schemas.filters.FlowFilter] = None, 

34 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

35 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

36 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

37 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

38 work_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

39) -> list[schemas.responses.HistoryResponse]: 

40 """ 

41 Produce a history of runs aggregated by interval and state 

42 """ 

43 

44 # SQLite has issues with very small intervals 

45 # (by 0.001 seconds it stops incrementing the interval) 

46 if history_interval < datetime.timedelta(seconds=1): 

47 raise ValueError("History interval must not be less than 1 second.") 

48 

49 # prepare run-specific models 

50 if run_type == "flow_run": 

51 run_model = db.FlowRun 

52 run_filter_function = models.flow_runs._apply_flow_run_filters 

53 elif run_type == "task_run": 

54 run_model = db.TaskRun 

55 run_filter_function = models.task_runs._apply_task_run_filters 

56 else: 

57 raise ValueError( 

58 f"Unknown run type {run_type!r}. Expected 'flow_run' or 'task_run'." 

59 ) 

60 

61 # create a CTE for timestamp intervals 

62 intervals = db.queries.make_timestamp_intervals( 

63 history_start, 

64 history_end, 

65 history_interval, 

66 ).cte("intervals") 

67 

68 # apply filters to the flow runs (and related states) 

69 runs = ( 

70 await run_filter_function( 

71 db, 

72 sa.select( 

73 run_model.id, 

74 run_model.expected_start_time, 

75 run_model.estimated_run_time, 

76 run_model.estimated_start_time_delta, 

77 run_model.state_type, 

78 run_model.state_name, 

79 ).select_from(run_model), 

80 flow_filter=flows, 

81 flow_run_filter=flow_runs, 

82 task_run_filter=task_runs, 

83 deployment_filter=deployments, 

84 work_pool_filter=work_pools, 

85 work_queue_filter=work_queues, 

86 ) 

87 ).alias("runs") 

88 # outer join intervals to the filtered runs to create a dataset composed of 

89 # every interval and the aggregate of all its runs. The runs aggregate is represented 

90 # by a descriptive JSON object 

91 counts = ( 

92 sa.select( 

93 intervals.c.interval_start, 

94 intervals.c.interval_end, 

95 # build a JSON object, ignoring the case where the count of runs is 0 

96 sa.case( 

97 (sa.func.count(runs.c.id) == 0, None), 

98 else_=db.queries.build_json_object( 

99 "state_type", 

100 runs.c.state_type, 

101 "state_name", 

102 runs.c.state_name, 

103 "count_runs", 

104 sa.func.count(runs.c.id), 

105 # estimated run times only includes positive run times (to avoid any unexpected corner cases) 

106 "sum_estimated_run_time", 

107 sa.func.sum( 

108 sa.func.greatest( 

109 0, sa.extract("epoch", runs.c.estimated_run_time) 

110 ) 

111 ), 

112 # estimated lateness is the sum of any positive start time deltas 

113 "sum_estimated_lateness", 

114 sa.func.sum( 

115 sa.func.greatest( 

116 0, sa.extract("epoch", runs.c.estimated_start_time_delta) 

117 ) 

118 ), 

119 ), 

120 ).label("state_agg"), 

121 ) 

122 .select_from(intervals) 

123 .join( 

124 runs, 

125 sa.and_( 

126 runs.c.expected_start_time >= intervals.c.interval_start, 

127 runs.c.expected_start_time < intervals.c.interval_end, 

128 ), 

129 isouter=True, 

130 ) 

131 .group_by( 

132 intervals.c.interval_start, 

133 intervals.c.interval_end, 

134 runs.c.state_type, 

135 runs.c.state_name, 

136 ) 

137 ).alias("counts") 

138 

139 # aggregate all state-aggregate objects into a single array for each interval, 

140 # ensuring that intervals with no runs have an empty array 

141 query = ( 

142 sa.select( 

143 counts.c.interval_start, 

144 counts.c.interval_end, 

145 sa.func.coalesce( 

146 db.queries.json_arr_agg( 

147 db.queries.cast_to_json(counts.c.state_agg) 

148 ).filter(counts.c.state_agg.is_not(None)), 

149 sa.literal("[]", literal_execute=True), 

150 ).label("states"), 

151 ) 

152 .group_by(counts.c.interval_start, counts.c.interval_end) 

153 .order_by(counts.c.interval_start) 

154 # return no more than 500 bars 

155 .limit(500) 

156 ) 

157 

158 # issue the query 

159 result = await session.execute(query) 

160 records = result.mappings() 

161 

162 # load and parse the record if the database returns JSON as strings 

163 if db.queries.uses_json_strings: 

164 records = [dict(r) for r in records] 

165 for r in records: 

166 r["states"] = json.loads(r["states"]) 

167 

168 return pydantic.TypeAdapter( 

169 List[schemas.responses.HistoryResponse] 

170 ).validate_python(records)