Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/ui/task_runs.py: 64%

58 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from datetime import datetime 1a

2from typing import TYPE_CHECKING, List, Optional 1a

3from uuid import UUID 1a

4 

5import sqlalchemy as sa 1a

6from fastapi import Depends, HTTPException, Path 1a

7from pydantic import Field, model_serializer 1a

8 

9import prefect.server.schemas as schemas 1a

10from prefect._internal.compatibility.starlette import status 1a

11from prefect.logging import get_logger 1a

12from prefect.server import models 1a

13from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

14from prefect.server.utilities.schemas.bases import PrefectBaseModel 1a

15from prefect.server.utilities.server import PrefectRouter 1a

16from prefect.types._datetime import end_of_period, now 1a

17 

18if TYPE_CHECKING: 18 ↛ 19line 18 didn't jump to line 19 because the condition on line 18 was never true1a

19 import logging 

20 

21logger: "logging.Logger" = get_logger("server.api.ui.task_runs") 1a

22 

23router: PrefectRouter = PrefectRouter(prefix="/ui/task_runs", tags=["Task Runs", "UI"]) 1a

24 

25FAILED_STATES = [schemas.states.StateType.CRASHED, schemas.states.StateType.FAILED] 1a

26 

27 

28class TaskRunCount(PrefectBaseModel): 1a

29 completed: int = Field( 1a

30 default=..., description="The number of completed task runs." 

31 ) 

32 failed: int = Field(default=..., description="The number of failed task runs.") 1a

33 

34 @model_serializer 1a

35 def ser_model(self) -> dict[str, int]: 1a

36 return { 

37 "completed": int(self.completed), 

38 "failed": int(self.failed), 

39 } 

40 

41 

42@router.post("/dashboard/counts") 1a

43async def read_dashboard_task_run_counts( 1a

44 task_runs: schemas.filters.TaskRunFilter, 

45 flows: Optional[schemas.filters.FlowFilter] = None, 

46 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

47 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

48 work_pools: Optional[schemas.filters.WorkPoolFilter] = None, 

49 work_queues: Optional[schemas.filters.WorkQueueFilter] = None, 

50 db: PrefectDBInterface = Depends(provide_database_interface), 

51) -> List[TaskRunCount]: 

52 if task_runs.start_time is None or task_runs.start_time.after_ is None: 52 ↛ 60line 52 didn't jump to line 60 because the condition on line 52 was always true1bc

53 raise HTTPException( 1bc

54 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

55 detail="task_runs.start_time.after_ is required", 

56 ) 

57 

58 # We only care about task runs that are in a terminal state, all others 

59 # should be ignored. 

60 task_runs.state = schemas.filters.TaskRunFilterState( 

61 type=schemas.filters.TaskRunFilterStateType( 

62 any_=list(schemas.states.TERMINAL_STATES) 

63 ) 

64 ) 

65 

66 bucket_count = 20 

67 start_time = task_runs.start_time.after_.replace(microsecond=0, second=0) 

68 end_time = ( 

69 end_of_period(task_runs.start_time.before_, "minute") 

70 if task_runs.start_time.before_ 

71 else end_of_period(now("UTC"), "minute") 

72 ) 

73 window = end_time - start_time 

74 delta = window / bucket_count 

75 

76 async with db.session_context(begin_transaction=False) as session: 

77 # Gather the raw counts. The counts are divided into buckets of time 

78 # and each bucket contains the number of successful and failed task 

79 # runs. 

80 # SQLAlchemy doesn't play nicely with our DateTime type so we convert it 

81 # to a datetime object. 

82 start_datetime = datetime( 

83 start_time.year, 

84 start_time.month, 

85 start_time.day, 

86 start_time.hour, 

87 start_time.minute, 

88 start_time.second, 

89 start_time.microsecond, 

90 start_time.tzinfo, 

91 ) 

92 bucket_expression = sa.func.floor( 

93 sa.func.date_diff_seconds(db.TaskRun.start_time, start_datetime) 

94 / delta.total_seconds() 

95 ).label("bucket") 

96 

97 raw_counts = ( 

98 ( 

99 await models.task_runs._apply_task_run_filters( 

100 db, 

101 sa.select( 

102 bucket_expression, 

103 sa.func.min(db.TaskRun.end_time).label("oldest"), 

104 sa.func.sum( 

105 sa.case( 

106 ( 

107 db.TaskRun.state_type.in_(FAILED_STATES), 

108 1, 

109 ), 

110 else_=0, 

111 ) 

112 ).label("failed_count"), 

113 sa.func.sum( 

114 sa.case( 

115 ( 

116 db.TaskRun.state_type.notin_(FAILED_STATES), 

117 1, 

118 ), 

119 else_=0, 

120 ) 

121 ).label("successful_count"), 

122 ), 

123 flow_filter=flows, 

124 flow_run_filter=flow_runs, 

125 task_run_filter=task_runs, 

126 deployment_filter=deployments, 

127 work_pool_filter=work_pools, 

128 work_queue_filter=work_queues, 

129 ) 

130 ) 

131 .group_by("bucket", db.TaskRun.start_time) 

132 .subquery() 

133 ) 

134 

135 # Aggregate the raw counts by bucket 

136 query = ( 

137 sa.select( 

138 raw_counts.c.bucket.label("bucket"), 

139 sa.func.min(raw_counts.c.oldest).label("oldest"), 

140 sa.func.sum(raw_counts.c.failed_count).label("failed_count"), 

141 sa.func.sum(raw_counts.c.successful_count).label("successful_count"), 

142 ) 

143 .select_from(raw_counts) 

144 .group_by(raw_counts.c.bucket) 

145 .order_by(sa.asc("oldest")) 

146 ) 

147 

148 result = await session.execute(query) 

149 

150 # Ensure that all buckets of time are present in the result even if no 

151 # matching task runs occurred during the given time period. 

152 buckets = [TaskRunCount(completed=0, failed=0) for _ in range(bucket_count)] 

153 

154 for row in result: 

155 index = int(row.bucket) 

156 buckets[index].completed = row.successful_count 

157 buckets[index].failed = row.failed_count 

158 

159 return buckets 

160 

161 

162@router.post("/count") 1a

163async def read_task_run_counts_by_state( 1a

164 flows: Optional[schemas.filters.FlowFilter] = None, 

165 flow_runs: Optional[schemas.filters.FlowRunFilter] = None, 

166 task_runs: Optional[schemas.filters.TaskRunFilter] = None, 

167 deployments: Optional[schemas.filters.DeploymentFilter] = None, 

168 db: PrefectDBInterface = Depends(provide_database_interface), 

169) -> schemas.states.CountByState: 

170 async with db.session_context(begin_transaction=False) as session: 1bc

171 return await models.task_runs.count_task_runs_by_state( 1bc

172 session=session, 

173 flow_filter=flows, 

174 flow_run_filter=flow_runs, 

175 task_run_filter=task_runs, 

176 deployment_filter=deployments, 

177 ) 

178 

179 

180@router.get("/{id:uuid}") 1a

181async def read_task_run_with_flow_run_name( 1a

182 task_run_id: UUID = Path(..., description="The task run id", alias="id"), 

183 db: PrefectDBInterface = Depends(provide_database_interface), 

184) -> schemas.ui.UITaskRun: 

185 """ 

186 Get a task run by id. 

187 """ 

188 async with db.session_context() as session: 1bc

189 task_run = await models.task_runs.read_task_run_with_flow_run_name( 1bc

190 session=session, task_run_id=task_run_id 

191 ) 

192 

193 if not task_run: 1bc

194 raise HTTPException(status.HTTP_404_NOT_FOUND, detail="Task not found") 1b

195 

196 return schemas.ui.UITaskRun.model_validate(task_run)