Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flows.py: 29%

72 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Functions for interacting with flow ORM objects. 

3Intended for internal use by the Prefect REST API. 

4""" 

5 

6from typing import Optional, Sequence, TypeVar, Union 1a

7from uuid import UUID 1a

8 

9import sqlalchemy as sa 1a

10from sqlalchemy import delete, select 1a

11from sqlalchemy.ext.asyncio import AsyncSession 1a

12from sqlalchemy.sql import Select 1a

13 

14import prefect.server.schemas as schemas 1a

15from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a

16 

17T = TypeVar("T", bound=tuple) 1a

18 

19 

20@db_injector 1a

21async def create_flow( 1a

22 db: PrefectDBInterface, session: AsyncSession, flow: schemas.core.Flow 

23) -> orm_models.Flow: 

24 """ 

25 Creates a new flow. 

26 

27 If a flow with the same name already exists, the existing flow is returned. 

28 

29 Args: 

30 session: a database session 

31 flow: a flow model 

32 

33 Returns: 

34 orm_models.Flow: the newly-created or existing flow 

35 """ 

36 

37 insert_stmt = ( 

38 db.queries.insert(db.Flow) 

39 .values(**flow.model_dump_for_orm(exclude_unset=True)) 

40 .on_conflict_do_nothing( 

41 index_elements=db.orm.flow_unique_upsert_columns, 

42 ) 

43 ) 

44 await session.execute(insert_stmt) 

45 

46 query = ( 

47 sa.select(db.Flow) 

48 .where(db.Flow.name == flow.name) 

49 .limit(1) 

50 .execution_options(populate_existing=True) 

51 ) 

52 result = await session.execute(query) 

53 model = result.scalar_one() 

54 return model 

55 

56 

57@db_injector 1a

58async def update_flow( 1a

59 db: PrefectDBInterface, 

60 session: AsyncSession, 

61 flow_id: UUID, 

62 flow: schemas.actions.FlowUpdate, 

63) -> bool: 

64 """ 

65 Updates a flow. 

66 

67 Args: 

68 session: a database session 

69 flow_id: the flow id to update 

70 flow: a flow update model 

71 

72 Returns: 

73 bool: whether or not matching rows were found to update 

74 """ 

75 update_stmt = ( 

76 sa.update(db.Flow) 

77 .where(db.Flow.id == flow_id) 

78 # exclude_unset=True allows us to only update values provided by 

79 # the user, ignoring any defaults on the model 

80 .values(**flow.model_dump_for_orm(exclude_unset=True)) 

81 ) 

82 result = await session.execute(update_stmt) 

83 return result.rowcount > 0 

84 

85 

86@db_injector 1a

87async def read_flow( 1a

88 db: PrefectDBInterface, session: AsyncSession, flow_id: UUID 

89) -> Optional[orm_models.Flow]: 

90 """ 

91 Reads a flow by id. 

92 

93 Args: 

94 session: A database session 

95 flow_id: a flow id 

96 

97 Returns: 

98 orm_models.Flow: the flow 

99 """ 

100 return await session.get(db.Flow, flow_id) 

101 

102 

103@db_injector 1a

104async def read_flow_by_name( 1a

105 db: PrefectDBInterface, session: AsyncSession, name: str 

106) -> Optional[orm_models.Flow]: 

107 """ 

108 Reads a flow by name. 

109 

110 Args: 

111 session: A database session 

112 name: a flow name 

113 

114 Returns: 

115 orm_models.Flow: the flow 

116 """ 

117 

118 result = await session.execute(select(db.Flow).filter_by(name=name)) 

119 return result.scalar() 

120 

121 

122async def _apply_flow_filters( 1a

123 db: PrefectDBInterface, 

124 query: Select[T], 

125 flow_filter: Union[schemas.filters.FlowFilter, None] = None, 

126 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None, 

127 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None, 

128 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None, 

129 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None, 

130) -> Select[T]: 

131 """ 

132 Applies filters to a flow query as a combination of EXISTS subqueries. 

133 """ 

134 

135 if flow_filter: 

136 query = query.where(flow_filter.as_sql_filter()) 

137 

138 if deployment_filter or work_pool_filter: 

139 deployment_exists_clause = select(db.Deployment).where( 

140 db.Deployment.flow_id == db.Flow.id 

141 ) 

142 

143 if deployment_filter: 

144 deployment_exists_clause = deployment_exists_clause.where( 

145 deployment_filter.as_sql_filter(), 

146 ) 

147 

148 if work_pool_filter: 

149 deployment_exists_clause = deployment_exists_clause.join( 

150 db.WorkQueue, 

151 db.WorkQueue.id == db.Deployment.work_queue_id, 

152 ) 

153 deployment_exists_clause = deployment_exists_clause.join( 

154 db.WorkPool, 

155 db.WorkPool.id == db.WorkQueue.work_pool_id, 

156 ).where(work_pool_filter.as_sql_filter()) 

157 

158 query = query.where(deployment_exists_clause.exists()) 

159 

160 if flow_run_filter or task_run_filter: 

161 flow_run_exists_clause = select(db.FlowRun).where( 

162 db.FlowRun.flow_id == db.Flow.id 

163 ) 

164 

165 if flow_run_filter: 

166 flow_run_exists_clause = flow_run_exists_clause.where( 

167 flow_run_filter.as_sql_filter() 

168 ) 

169 

170 if task_run_filter: 

171 flow_run_exists_clause = flow_run_exists_clause.join( 

172 db.TaskRun, 

173 db.TaskRun.flow_run_id == db.FlowRun.id, 

174 ).where(task_run_filter.as_sql_filter()) 

175 

176 query = query.where(flow_run_exists_clause.exists()) 

177 

178 return query 

179 

180 

181@db_injector 1a

182async def read_flows( 1a

183 db: PrefectDBInterface, 

184 session: AsyncSession, 

185 flow_filter: Union[schemas.filters.FlowFilter, None] = None, 

186 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None, 

187 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None, 

188 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None, 

189 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None, 

190 sort: schemas.sorting.FlowSort = schemas.sorting.FlowSort.NAME_ASC, 

191 offset: Union[int, None] = None, 

192 limit: Union[int, None] = None, 

193) -> Sequence[orm_models.Flow]: 

194 """ 

195 Read multiple flows. 

196 

197 Args: 

198 session: A database session 

199 flow_filter: only select flows that match these filters 

200 flow_run_filter: only select flows whose flow runs match these filters 

201 task_run_filter: only select flows whose task runs match these filters 

202 deployment_filter: only select flows whose deployments match these filters 

203 work_pool_filter: only select flows whose work pools match these filters 

204 offset: Query offset 

205 limit: Query limit 

206 

207 Returns: 

208 List[orm_models.Flow]: flows 

209 """ 

210 

211 query = select(db.Flow).order_by(*sort.as_sql_sort()) 

212 

213 query = await _apply_flow_filters( 

214 db, 

215 query, 

216 flow_filter=flow_filter, 

217 flow_run_filter=flow_run_filter, 

218 task_run_filter=task_run_filter, 

219 deployment_filter=deployment_filter, 

220 work_pool_filter=work_pool_filter, 

221 ) 

222 

223 if offset is not None: 

224 query = query.offset(offset) 

225 

226 if limit is not None: 

227 query = query.limit(limit) 

228 

229 result = await session.execute(query) 

230 return result.scalars().unique().all() 

231 

232 

233@db_injector 1a

234async def count_flows( 1a

235 db: PrefectDBInterface, 

236 session: AsyncSession, 

237 flow_filter: Union[schemas.filters.FlowFilter, None] = None, 

238 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None, 

239 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None, 

240 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None, 

241 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None, 

242) -> int: 

243 """ 

244 Count flows. 

245 

246 Args: 

247 session: A database session 

248 flow_filter: only count flows that match these filters 

249 flow_run_filter: only count flows whose flow runs match these filters 

250 task_run_filter: only count flows whose task runs match these filters 

251 deployment_filter: only count flows whose deployments match these filters 

252 work_pool_filter: only count flows whose work pools match these filters 

253 

254 Returns: 

255 int: count of flows 

256 """ 

257 

258 query = select(sa.func.count(None)).select_from(db.Flow) 

259 

260 query = await _apply_flow_filters( 

261 db, 

262 query, 

263 flow_filter=flow_filter, 

264 flow_run_filter=flow_run_filter, 

265 task_run_filter=task_run_filter, 

266 deployment_filter=deployment_filter, 

267 work_pool_filter=work_pool_filter, 

268 ) 

269 

270 result = await session.execute(query) 

271 return result.scalar_one() 

272 

273 

274@db_injector 1a

275async def delete_flow( 1a

276 db: PrefectDBInterface, session: AsyncSession, flow_id: UUID 

277) -> bool: 

278 """ 

279 Delete a flow by id. 

280 

281 Args: 

282 session: A database session 

283 flow_id: a flow id 

284 

285 Returns: 

286 bool: whether or not the flow was deleted 

287 """ 

288 

289 result = await session.execute(delete(db.Flow).where(db.Flow.id == flow_id)) 

290 return result.rowcount > 0 

291 

292 

293@db_injector 1a

294async def read_flow_labels( 1a

295 db: PrefectDBInterface, 

296 session: AsyncSession, 

297 flow_id: UUID, 

298) -> Union[schemas.core.KeyValueLabels, None]: 

299 result = await session.execute(select(db.Flow.labels).where(db.Flow.id == flow_id)) 

300 

301 return result.scalar()