Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flows.py: 88%
72 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Functions for interacting with flow ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6from typing import Optional, Sequence, TypeVar, Union 1d
7from uuid import UUID 1d
9import sqlalchemy as sa 1d
10from sqlalchemy import delete, select 1d
11from sqlalchemy.ext.asyncio import AsyncSession 1d
12from sqlalchemy.sql import Select 1d
14import prefect.server.schemas as schemas 1d
15from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1d
17T = TypeVar("T", bound=tuple) 1d
20@db_injector 1d
21async def create_flow( 1d
22 db: PrefectDBInterface, session: AsyncSession, flow: schemas.core.Flow
23) -> orm_models.Flow:
24 """
25 Creates a new flow.
27 If a flow with the same name already exists, the existing flow is returned.
29 Args:
30 session: a database session
31 flow: a flow model
33 Returns:
34 orm_models.Flow: the newly-created or existing flow
35 """
37 insert_stmt = ( 1acb
38 db.queries.insert(db.Flow)
39 .values(**flow.model_dump_for_orm(exclude_unset=True))
40 .on_conflict_do_nothing(
41 index_elements=db.orm.flow_unique_upsert_columns,
42 )
43 )
44 await session.execute(insert_stmt) 1acb
46 query = (
47 sa.select(db.Flow)
48 .where(db.Flow.name == flow.name)
49 .limit(1)
50 .execution_options(populate_existing=True)
51 )
52 result = await session.execute(query) 1acb
53 model = result.scalar_one()
54 return model
57@db_injector 1d
58async def update_flow( 1d
59 db: PrefectDBInterface,
60 session: AsyncSession,
61 flow_id: UUID,
62 flow: schemas.actions.FlowUpdate,
63) -> bool:
64 """
65 Updates a flow.
67 Args:
68 session: a database session
69 flow_id: the flow id to update
70 flow: a flow update model
72 Returns:
73 bool: whether or not matching rows were found to update
74 """
75 update_stmt = ( 1ab
76 sa.update(db.Flow)
77 .where(db.Flow.id == flow_id)
78 # exclude_unset=True allows us to only update values provided by
79 # the user, ignoring any defaults on the model
80 .values(**flow.model_dump_for_orm(exclude_unset=True))
81 )
82 result = await session.execute(update_stmt) 1ab
83 return result.rowcount > 0
86@db_injector 1d
87async def read_flow( 1d
88 db: PrefectDBInterface, session: AsyncSession, flow_id: UUID
89) -> Optional[orm_models.Flow]:
90 """
91 Reads a flow by id.
93 Args:
94 session: A database session
95 flow_id: a flow id
97 Returns:
98 orm_models.Flow: the flow
99 """
100 return await session.get(db.Flow, flow_id) 1acb
103@db_injector 1d
104async def read_flow_by_name( 1d
105 db: PrefectDBInterface, session: AsyncSession, name: str
106) -> Optional[orm_models.Flow]:
107 """
108 Reads a flow by name.
110 Args:
111 session: A database session
112 name: a flow name
114 Returns:
115 orm_models.Flow: the flow
116 """
118 result = await session.execute(select(db.Flow).filter_by(name=name)) 1ab
119 return result.scalar()
122async def _apply_flow_filters( 1d
123 db: PrefectDBInterface,
124 query: Select[T],
125 flow_filter: Union[schemas.filters.FlowFilter, None] = None,
126 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None,
127 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None,
128 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None,
129 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None,
130) -> Select[T]:
131 """
132 Applies filters to a flow query as a combination of EXISTS subqueries.
133 """
135 if flow_filter: 1aceb
136 query = query.where(flow_filter.as_sql_filter()) 1acb
138 if deployment_filter or work_pool_filter: 1aceb
139 deployment_exists_clause = select(db.Deployment).where( 1acb
140 db.Deployment.flow_id == db.Flow.id
141 )
143 if deployment_filter: 1acb
144 deployment_exists_clause = deployment_exists_clause.where( 1acb
145 deployment_filter.as_sql_filter(),
146 )
148 if work_pool_filter: 1acb
149 deployment_exists_clause = deployment_exists_clause.join( 1acb
150 db.WorkQueue,
151 db.WorkQueue.id == db.Deployment.work_queue_id,
152 )
153 deployment_exists_clause = deployment_exists_clause.join( 1acb
154 db.WorkPool,
155 db.WorkPool.id == db.WorkQueue.work_pool_id,
156 ).where(work_pool_filter.as_sql_filter())
158 query = query.where(deployment_exists_clause.exists()) 1acb
160 if flow_run_filter or task_run_filter: 1aceb
161 flow_run_exists_clause = select(db.FlowRun).where( 1aceb
162 db.FlowRun.flow_id == db.Flow.id
163 )
165 if flow_run_filter: 1aceb
166 flow_run_exists_clause = flow_run_exists_clause.where( 1acb
167 flow_run_filter.as_sql_filter()
168 )
170 if task_run_filter: 1aceb
171 flow_run_exists_clause = flow_run_exists_clause.join( 1aceb
172 db.TaskRun,
173 db.TaskRun.flow_run_id == db.FlowRun.id,
174 ).where(task_run_filter.as_sql_filter())
176 query = query.where(flow_run_exists_clause.exists()) 1aceb
178 return query 1aceb
181@db_injector 1d
182async def read_flows( 1d
183 db: PrefectDBInterface,
184 session: AsyncSession,
185 flow_filter: Union[schemas.filters.FlowFilter, None] = None,
186 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None,
187 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None,
188 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None,
189 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None,
190 sort: schemas.sorting.FlowSort = schemas.sorting.FlowSort.NAME_ASC,
191 offset: Union[int, None] = None,
192 limit: Union[int, None] = None,
193) -> Sequence[orm_models.Flow]:
194 """
195 Read multiple flows.
197 Args:
198 session: A database session
199 flow_filter: only select flows that match these filters
200 flow_run_filter: only select flows whose flow runs match these filters
201 task_run_filter: only select flows whose task runs match these filters
202 deployment_filter: only select flows whose deployments match these filters
203 work_pool_filter: only select flows whose work pools match these filters
204 offset: Query offset
205 limit: Query limit
207 Returns:
208 List[orm_models.Flow]: flows
209 """
211 query = select(db.Flow).order_by(*sort.as_sql_sort()) 1aceb
213 query = await _apply_flow_filters( 1aceb
214 db,
215 query,
216 flow_filter=flow_filter,
217 flow_run_filter=flow_run_filter,
218 task_run_filter=task_run_filter,
219 deployment_filter=deployment_filter,
220 work_pool_filter=work_pool_filter,
221 )
223 if offset is not None: 223 ↛ 226line 223 didn't jump to line 226 because the condition on line 223 was always true1aceb
224 query = query.offset(offset) 1aceb
226 if limit is not None: 226 ↛ 229line 226 didn't jump to line 229 because the condition on line 226 was always true1aceb
227 query = query.limit(limit) 1aceb
229 result = await session.execute(query) 1aceb
230 return result.scalars().unique().all()
233@db_injector 1d
234async def count_flows( 1d
235 db: PrefectDBInterface,
236 session: AsyncSession,
237 flow_filter: Union[schemas.filters.FlowFilter, None] = None,
238 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None,
239 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None,
240 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None,
241 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None,
242) -> int:
243 """
244 Count flows.
246 Args:
247 session: A database session
248 flow_filter: only count flows that match these filters
249 flow_run_filter: only count flows whose flow runs match these filters
250 task_run_filter: only count flows whose task runs match these filters
251 deployment_filter: only count flows whose deployments match these filters
252 work_pool_filter: only count flows whose work pools match these filters
254 Returns:
255 int: count of flows
256 """
258 query = select(sa.func.count(None)).select_from(db.Flow) 1acb
260 query = await _apply_flow_filters( 1acb
261 db,
262 query,
263 flow_filter=flow_filter,
264 flow_run_filter=flow_run_filter,
265 task_run_filter=task_run_filter,
266 deployment_filter=deployment_filter,
267 work_pool_filter=work_pool_filter,
268 )
270 result = await session.execute(query) 1acb
271 return result.scalar_one()
274@db_injector 1d
275async def delete_flow( 1d
276 db: PrefectDBInterface, session: AsyncSession, flow_id: UUID
277) -> bool:
278 """
279 Delete a flow by id.
281 Args:
282 session: A database session
283 flow_id: a flow id
285 Returns:
286 bool: whether or not the flow was deleted
287 """
289 result = await session.execute(delete(db.Flow).where(db.Flow.id == flow_id)) 1acb
290 return result.rowcount > 0
293@db_injector 1d
294async def read_flow_labels( 1d
295 db: PrefectDBInterface,
296 session: AsyncSession,
297 flow_id: UUID,
298) -> Union[schemas.core.KeyValueLabels, None]:
299 result = await session.execute(select(db.Flow.labels).where(db.Flow.id == flow_id)) 1acb
301 return result.scalar()