Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flows.py: 88%

72 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Functions for interacting with flow ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6from typing import Optional, Sequence, TypeVar, Union 1d

7from uuid import UUID 1d

8 

9import sqlalchemy as sa 1d

10from sqlalchemy import delete, select 1d

11from sqlalchemy.ext.asyncio import AsyncSession 1d

12from sqlalchemy.sql import Select 1d

13 

14import prefect.server.schemas as schemas 1d

15from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1d

16 

17T = TypeVar("T", bound=tuple) 1d

18 

19 

20@db_injector 1d

21async def create_flow( 1d

22 db: PrefectDBInterface, session: AsyncSession, flow: schemas.core.Flow 

23) -> orm_models.Flow: 

24 """ 

25 Creates a new flow. 

26 

27 If a flow with the same name already exists, the existing flow is returned. 

28 

29 Args: 

30 session: a database session 

31 flow: a flow model 

32 

33 Returns: 

34 orm_models.Flow: the newly-created or existing flow 

35 """ 

36 

37 insert_stmt = ( 1acb

38 db.queries.insert(db.Flow) 

39 .values(**flow.model_dump_for_orm(exclude_unset=True)) 

40 .on_conflict_do_nothing( 

41 index_elements=db.orm.flow_unique_upsert_columns, 

42 ) 

43 ) 

44 await session.execute(insert_stmt) 1acb

45 

46 query = ( 

47 sa.select(db.Flow) 

48 .where(db.Flow.name == flow.name) 

49 .limit(1) 

50 .execution_options(populate_existing=True) 

51 ) 

52 result = await session.execute(query) 1acb

53 model = result.scalar_one() 

54 return model 

55 

56 

57@db_injector 1d

58async def update_flow( 1d

59 db: PrefectDBInterface, 

60 session: AsyncSession, 

61 flow_id: UUID, 

62 flow: schemas.actions.FlowUpdate, 

63) -> bool: 

64 """ 

65 Updates a flow. 

66 

67 Args: 

68 session: a database session 

69 flow_id: the flow id to update 

70 flow: a flow update model 

71 

72 Returns: 

73 bool: whether or not matching rows were found to update 

74 """ 

75 update_stmt = ( 1ab

76 sa.update(db.Flow) 

77 .where(db.Flow.id == flow_id) 

78 # exclude_unset=True allows us to only update values provided by 

79 # the user, ignoring any defaults on the model 

80 .values(**flow.model_dump_for_orm(exclude_unset=True)) 

81 ) 

82 result = await session.execute(update_stmt) 1ab

83 return result.rowcount > 0 

84 

85 

86@db_injector 1d

87async def read_flow( 1d

88 db: PrefectDBInterface, session: AsyncSession, flow_id: UUID 

89) -> Optional[orm_models.Flow]: 

90 """ 

91 Reads a flow by id. 

92 

93 Args: 

94 session: A database session 

95 flow_id: a flow id 

96 

97 Returns: 

98 orm_models.Flow: the flow 

99 """ 

100 return await session.get(db.Flow, flow_id) 1acb

101 

102 

103@db_injector 1d

104async def read_flow_by_name( 1d

105 db: PrefectDBInterface, session: AsyncSession, name: str 

106) -> Optional[orm_models.Flow]: 

107 """ 

108 Reads a flow by name. 

109 

110 Args: 

111 session: A database session 

112 name: a flow name 

113 

114 Returns: 

115 orm_models.Flow: the flow 

116 """ 

117 

118 result = await session.execute(select(db.Flow).filter_by(name=name)) 1ab

119 return result.scalar() 

120 

121 

122async def _apply_flow_filters( 1d

123 db: PrefectDBInterface, 

124 query: Select[T], 

125 flow_filter: Union[schemas.filters.FlowFilter, None] = None, 

126 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None, 

127 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None, 

128 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None, 

129 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None, 

130) -> Select[T]: 

131 """ 

132 Applies filters to a flow query as a combination of EXISTS subqueries. 

133 """ 

134 

135 if flow_filter: 1aceb

136 query = query.where(flow_filter.as_sql_filter()) 1acb

137 

138 if deployment_filter or work_pool_filter: 1aceb

139 deployment_exists_clause = select(db.Deployment).where( 1acb

140 db.Deployment.flow_id == db.Flow.id 

141 ) 

142 

143 if deployment_filter: 1acb

144 deployment_exists_clause = deployment_exists_clause.where( 1acb

145 deployment_filter.as_sql_filter(), 

146 ) 

147 

148 if work_pool_filter: 1acb

149 deployment_exists_clause = deployment_exists_clause.join( 1acb

150 db.WorkQueue, 

151 db.WorkQueue.id == db.Deployment.work_queue_id, 

152 ) 

153 deployment_exists_clause = deployment_exists_clause.join( 1acb

154 db.WorkPool, 

155 db.WorkPool.id == db.WorkQueue.work_pool_id, 

156 ).where(work_pool_filter.as_sql_filter()) 

157 

158 query = query.where(deployment_exists_clause.exists()) 1acb

159 

160 if flow_run_filter or task_run_filter: 1aceb

161 flow_run_exists_clause = select(db.FlowRun).where( 1aceb

162 db.FlowRun.flow_id == db.Flow.id 

163 ) 

164 

165 if flow_run_filter: 1aceb

166 flow_run_exists_clause = flow_run_exists_clause.where( 1acb

167 flow_run_filter.as_sql_filter() 

168 ) 

169 

170 if task_run_filter: 1aceb

171 flow_run_exists_clause = flow_run_exists_clause.join( 1aceb

172 db.TaskRun, 

173 db.TaskRun.flow_run_id == db.FlowRun.id, 

174 ).where(task_run_filter.as_sql_filter()) 

175 

176 query = query.where(flow_run_exists_clause.exists()) 1aceb

177 

178 return query 1aceb

179 

180 

181@db_injector 1d

182async def read_flows( 1d

183 db: PrefectDBInterface, 

184 session: AsyncSession, 

185 flow_filter: Union[schemas.filters.FlowFilter, None] = None, 

186 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None, 

187 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None, 

188 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None, 

189 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None, 

190 sort: schemas.sorting.FlowSort = schemas.sorting.FlowSort.NAME_ASC, 

191 offset: Union[int, None] = None, 

192 limit: Union[int, None] = None, 

193) -> Sequence[orm_models.Flow]: 

194 """ 

195 Read multiple flows. 

196 

197 Args: 

198 session: A database session 

199 flow_filter: only select flows that match these filters 

200 flow_run_filter: only select flows whose flow runs match these filters 

201 task_run_filter: only select flows whose task runs match these filters 

202 deployment_filter: only select flows whose deployments match these filters 

203 work_pool_filter: only select flows whose work pools match these filters 

204 offset: Query offset 

205 limit: Query limit 

206 

207 Returns: 

208 List[orm_models.Flow]: flows 

209 """ 

210 

211 query = select(db.Flow).order_by(*sort.as_sql_sort()) 1aceb

212 

213 query = await _apply_flow_filters( 1aceb

214 db, 

215 query, 

216 flow_filter=flow_filter, 

217 flow_run_filter=flow_run_filter, 

218 task_run_filter=task_run_filter, 

219 deployment_filter=deployment_filter, 

220 work_pool_filter=work_pool_filter, 

221 ) 

222 

223 if offset is not None: 223 ↛ 226line 223 didn't jump to line 226 because the condition on line 223 was always true1aceb

224 query = query.offset(offset) 1aceb

225 

226 if limit is not None: 226 ↛ 229line 226 didn't jump to line 229 because the condition on line 226 was always true1aceb

227 query = query.limit(limit) 1aceb

228 

229 result = await session.execute(query) 1aceb

230 return result.scalars().unique().all() 

231 

232 

233@db_injector 1d

234async def count_flows( 1d

235 db: PrefectDBInterface, 

236 session: AsyncSession, 

237 flow_filter: Union[schemas.filters.FlowFilter, None] = None, 

238 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None, 

239 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None, 

240 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None, 

241 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None, 

242) -> int: 

243 """ 

244 Count flows. 

245 

246 Args: 

247 session: A database session 

248 flow_filter: only count flows that match these filters 

249 flow_run_filter: only count flows whose flow runs match these filters 

250 task_run_filter: only count flows whose task runs match these filters 

251 deployment_filter: only count flows whose deployments match these filters 

252 work_pool_filter: only count flows whose work pools match these filters 

253 

254 Returns: 

255 int: count of flows 

256 """ 

257 

258 query = select(sa.func.count(None)).select_from(db.Flow) 1acb

259 

260 query = await _apply_flow_filters( 1acb

261 db, 

262 query, 

263 flow_filter=flow_filter, 

264 flow_run_filter=flow_run_filter, 

265 task_run_filter=task_run_filter, 

266 deployment_filter=deployment_filter, 

267 work_pool_filter=work_pool_filter, 

268 ) 

269 

270 result = await session.execute(query) 1acb

271 return result.scalar_one() 

272 

273 

274@db_injector 1d

275async def delete_flow( 1d

276 db: PrefectDBInterface, session: AsyncSession, flow_id: UUID 

277) -> bool: 

278 """ 

279 Delete a flow by id. 

280 

281 Args: 

282 session: A database session 

283 flow_id: a flow id 

284 

285 Returns: 

286 bool: whether or not the flow was deleted 

287 """ 

288 

289 result = await session.execute(delete(db.Flow).where(db.Flow.id == flow_id)) 1acb

290 return result.rowcount > 0 

291 

292 

293@db_injector 1d

294async def read_flow_labels( 1d

295 db: PrefectDBInterface, 

296 session: AsyncSession, 

297 flow_id: UUID, 

298) -> Union[schemas.core.KeyValueLabels, None]: 

299 result = await session.execute(select(db.Flow.labels).where(db.Flow.id == flow_id)) 1acb

300 

301 return result.scalar()