Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/ui/flows.py: 55%

60 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3from datetime import datetime 1a

4from typing import TYPE_CHECKING, Dict, List, Optional 1a

5from uuid import UUID 1a

6from zoneinfo import ZoneInfo 1a

7 

8import sqlalchemy as sa 1a

9from fastapi import Body, Depends 1a

10from pydantic import Field, field_validator 1a

11 

12from prefect.logging import get_logger 1a

13from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

14from prefect.server.schemas.states import StateType 1a

15from prefect.server.utilities.database import UUID as UUIDTypeDecorator 1a

16from prefect.server.utilities.schemas import PrefectBaseModel 1a

17from prefect.server.utilities.server import PrefectRouter 1a

18from prefect.types import DateTime 1a

19from prefect.types._datetime import create_datetime_instance, parse_datetime 1a

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1a

22 import logging 

23 

24logger: "logging.Logger" = get_logger() 1a

25 

26router: PrefectRouter = PrefectRouter(prefix="/ui/flows", tags=["Flows", "UI"]) 1a

27 

28 

29class SimpleNextFlowRun(PrefectBaseModel): 1a

30 id: UUID = Field(default=..., description="The flow run id.") 1a

31 flow_id: UUID = Field(default=..., description="The flow id.") 1a

32 name: str = Field(default=..., description="The flow run name") 1a

33 state_name: str = Field(default=..., description="The state name.") 1a

34 state_type: StateType = Field(default=..., description="The state type.") 1a

35 next_scheduled_start_time: DateTime = Field( 1a

36 default=..., description="The next scheduled start time" 

37 ) 

38 

39 @field_validator("next_scheduled_start_time", mode="before") 1a

40 @classmethod 1a

41 def validate_next_scheduled_start_time(cls, v: DateTime | datetime) -> DateTime: 1a

42 if isinstance(v, datetime): 

43 return create_datetime_instance(v) 

44 return v 

45 

46 

47@router.post("/count-deployments") 1a

48async def count_deployments_by_flow( 1a

49 flow_ids: List[UUID] = Body(default=..., embed=True, max_items=200), 

50 db: PrefectDBInterface = Depends(provide_database_interface), 

51) -> Dict[UUID, int]: 

52 """ 

53 Get deployment counts by flow id. 

54 """ 

55 async with db.session_context() as session: 

56 query = ( 

57 sa.select( 

58 db.Deployment.flow_id, 

59 sa.func.count(db.Deployment.id).label("deployment_count"), 

60 ) 

61 .where(db.Deployment.flow_id.in_(flow_ids)) 

62 .group_by(db.Deployment.flow_id) 

63 ) 

64 

65 results = await session.execute(query) 

66 

67 deployment_counts_by_flow = { 

68 flow_id: deployment_count for flow_id, deployment_count in results.all() 

69 } 

70 

71 return { 

72 flow_id: deployment_counts_by_flow.get(flow_id, 0) for flow_id in flow_ids 

73 } 

74 

75 

76def _get_postgres_next_runs_query(flow_ids: List[UUID]): 1a

77 # Here we use the raw query because CROSS LATERAL JOINS are very 

78 # difficult to express correctly in sqlalchemy. 

79 raw_query = sa.text( 

80 """ 

81 SELECT fr.id, fr.name, fr.flow_id, fr.state_name, fr.state_type, fr.state_name, fr.next_scheduled_start_time 

82 FROM ( 

83 SELECT DISTINCT flow_id FROM flow_run 

84 WHERE flow_id IN :flow_ids 

85 AND state_type = 'SCHEDULED' 

86 ) AS unique_flows 

87 CROSS JOIN LATERAL ( 

88 SELECT * 

89 FROM flow_run fr 

90 WHERE fr.flow_id = unique_flows.flow_id 

91 AND fr.state_type = 'SCHEDULED' 

92 ORDER BY fr.next_scheduled_start_time ASC 

93 LIMIT 1 

94 ) fr; 

95 """ 

96 ) 

97 

98 bindparams = [ 

99 sa.bindparam( 

100 "flow_ids", 

101 flow_ids, 

102 expanding=True, 

103 type_=UUIDTypeDecorator, 

104 ), 

105 ] 

106 

107 query = raw_query.bindparams(*bindparams) 

108 return query 

109 

110 

111def _get_sqlite_next_runs_query(flow_ids: List[UUID]): 1a

112 raw_query = sa.text( 

113 """ 

114 WITH min_times AS ( 

115 SELECT flow_id, MIN(next_scheduled_start_time) AS min_next_scheduled_start_time 

116 FROM flow_run 

117 WHERE flow_id IN :flow_ids 

118 AND state_type = 'SCHEDULED' 

119 GROUP BY flow_id 

120 ) 

121 SELECT fr.id, fr.name, fr.flow_id, fr.state_name, fr.state_type, fr.next_scheduled_start_time 

122 FROM flow_run fr 

123 JOIN min_times mt ON fr.flow_id = mt.flow_id AND fr.next_scheduled_start_time = mt.min_next_scheduled_start_time 

124 WHERE fr.state_type = 'SCHEDULED'; 

125 

126 """ 

127 ) 

128 

129 bindparams = [ 

130 sa.bindparam( 

131 "flow_ids", 

132 flow_ids, 

133 expanding=True, 

134 type_=UUIDTypeDecorator, 

135 ), 

136 ] 

137 

138 query = raw_query.bindparams(*bindparams) 

139 return query 

140 

141 

142@router.post("/next-runs") 1a

143async def next_runs_by_flow( 1a

144 flow_ids: List[UUID] = Body(default=..., embed=True, max_items=200), 

145 db: PrefectDBInterface = Depends(provide_database_interface), 

146) -> Dict[UUID, Optional[SimpleNextFlowRun]]: 

147 """ 

148 Get the next flow run by flow id. 

149 """ 

150 

151 async with db.session_context() as session: 

152 if db.dialect.name == "postgresql": 

153 query = _get_postgres_next_runs_query(flow_ids=flow_ids) 

154 else: 

155 query = _get_sqlite_next_runs_query(flow_ids=flow_ids) 

156 

157 results = await session.execute(query) 

158 

159 results_by_flow_id = { 

160 UUID(str(result.flow_id)): SimpleNextFlowRun( 

161 id=result.id, 

162 flow_id=result.flow_id, 

163 name=result.name, 

164 state_name=result.state_name, 

165 state_type=result.state_type, 

166 next_scheduled_start_time=parse_datetime( 

167 result.next_scheduled_start_time 

168 ).replace(tzinfo=ZoneInfo("UTC")) 

169 if isinstance(result.next_scheduled_start_time, str) 

170 else result.next_scheduled_start_time, 

171 ) 

172 for result in results.all() 

173 } 

174 

175 response = { 

176 flow_id: results_by_flow_id.get(flow_id, None) for flow_id in flow_ids 

177 } 

178 return response