Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/flows.py: 29%
72 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Functions for interacting with flow ORM objects.
3Intended for internal use by the Prefect REST API.
4"""
6from typing import Optional, Sequence, TypeVar, Union 1a
7from uuid import UUID 1a
9import sqlalchemy as sa 1a
10from sqlalchemy import delete, select 1a
11from sqlalchemy.ext.asyncio import AsyncSession 1a
12from sqlalchemy.sql import Select 1a
14import prefect.server.schemas as schemas 1a
15from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a
17T = TypeVar("T", bound=tuple) 1a
20@db_injector 1a
21async def create_flow( 1a
22 db: PrefectDBInterface, session: AsyncSession, flow: schemas.core.Flow
23) -> orm_models.Flow:
24 """
25 Creates a new flow.
27 If a flow with the same name already exists, the existing flow is returned.
29 Args:
30 session: a database session
31 flow: a flow model
33 Returns:
34 orm_models.Flow: the newly-created or existing flow
35 """
37 insert_stmt = (
38 db.queries.insert(db.Flow)
39 .values(**flow.model_dump_for_orm(exclude_unset=True))
40 .on_conflict_do_nothing(
41 index_elements=db.orm.flow_unique_upsert_columns,
42 )
43 )
44 await session.execute(insert_stmt)
46 query = (
47 sa.select(db.Flow)
48 .where(db.Flow.name == flow.name)
49 .limit(1)
50 .execution_options(populate_existing=True)
51 )
52 result = await session.execute(query)
53 model = result.scalar_one()
54 return model
57@db_injector 1a
58async def update_flow( 1a
59 db: PrefectDBInterface,
60 session: AsyncSession,
61 flow_id: UUID,
62 flow: schemas.actions.FlowUpdate,
63) -> bool:
64 """
65 Updates a flow.
67 Args:
68 session: a database session
69 flow_id: the flow id to update
70 flow: a flow update model
72 Returns:
73 bool: whether or not matching rows were found to update
74 """
75 update_stmt = (
76 sa.update(db.Flow)
77 .where(db.Flow.id == flow_id)
78 # exclude_unset=True allows us to only update values provided by
79 # the user, ignoring any defaults on the model
80 .values(**flow.model_dump_for_orm(exclude_unset=True))
81 )
82 result = await session.execute(update_stmt)
83 return result.rowcount > 0
86@db_injector 1a
87async def read_flow( 1a
88 db: PrefectDBInterface, session: AsyncSession, flow_id: UUID
89) -> Optional[orm_models.Flow]:
90 """
91 Reads a flow by id.
93 Args:
94 session: A database session
95 flow_id: a flow id
97 Returns:
98 orm_models.Flow: the flow
99 """
100 return await session.get(db.Flow, flow_id)
103@db_injector 1a
104async def read_flow_by_name( 1a
105 db: PrefectDBInterface, session: AsyncSession, name: str
106) -> Optional[orm_models.Flow]:
107 """
108 Reads a flow by name.
110 Args:
111 session: A database session
112 name: a flow name
114 Returns:
115 orm_models.Flow: the flow
116 """
118 result = await session.execute(select(db.Flow).filter_by(name=name))
119 return result.scalar()
122async def _apply_flow_filters( 1a
123 db: PrefectDBInterface,
124 query: Select[T],
125 flow_filter: Union[schemas.filters.FlowFilter, None] = None,
126 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None,
127 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None,
128 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None,
129 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None,
130) -> Select[T]:
131 """
132 Applies filters to a flow query as a combination of EXISTS subqueries.
133 """
135 if flow_filter:
136 query = query.where(flow_filter.as_sql_filter())
138 if deployment_filter or work_pool_filter:
139 deployment_exists_clause = select(db.Deployment).where(
140 db.Deployment.flow_id == db.Flow.id
141 )
143 if deployment_filter:
144 deployment_exists_clause = deployment_exists_clause.where(
145 deployment_filter.as_sql_filter(),
146 )
148 if work_pool_filter:
149 deployment_exists_clause = deployment_exists_clause.join(
150 db.WorkQueue,
151 db.WorkQueue.id == db.Deployment.work_queue_id,
152 )
153 deployment_exists_clause = deployment_exists_clause.join(
154 db.WorkPool,
155 db.WorkPool.id == db.WorkQueue.work_pool_id,
156 ).where(work_pool_filter.as_sql_filter())
158 query = query.where(deployment_exists_clause.exists())
160 if flow_run_filter or task_run_filter:
161 flow_run_exists_clause = select(db.FlowRun).where(
162 db.FlowRun.flow_id == db.Flow.id
163 )
165 if flow_run_filter:
166 flow_run_exists_clause = flow_run_exists_clause.where(
167 flow_run_filter.as_sql_filter()
168 )
170 if task_run_filter:
171 flow_run_exists_clause = flow_run_exists_clause.join(
172 db.TaskRun,
173 db.TaskRun.flow_run_id == db.FlowRun.id,
174 ).where(task_run_filter.as_sql_filter())
176 query = query.where(flow_run_exists_clause.exists())
178 return query
181@db_injector 1a
182async def read_flows( 1a
183 db: PrefectDBInterface,
184 session: AsyncSession,
185 flow_filter: Union[schemas.filters.FlowFilter, None] = None,
186 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None,
187 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None,
188 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None,
189 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None,
190 sort: schemas.sorting.FlowSort = schemas.sorting.FlowSort.NAME_ASC,
191 offset: Union[int, None] = None,
192 limit: Union[int, None] = None,
193) -> Sequence[orm_models.Flow]:
194 """
195 Read multiple flows.
197 Args:
198 session: A database session
199 flow_filter: only select flows that match these filters
200 flow_run_filter: only select flows whose flow runs match these filters
201 task_run_filter: only select flows whose task runs match these filters
202 deployment_filter: only select flows whose deployments match these filters
203 work_pool_filter: only select flows whose work pools match these filters
204 offset: Query offset
205 limit: Query limit
207 Returns:
208 List[orm_models.Flow]: flows
209 """
211 query = select(db.Flow).order_by(*sort.as_sql_sort())
213 query = await _apply_flow_filters(
214 db,
215 query,
216 flow_filter=flow_filter,
217 flow_run_filter=flow_run_filter,
218 task_run_filter=task_run_filter,
219 deployment_filter=deployment_filter,
220 work_pool_filter=work_pool_filter,
221 )
223 if offset is not None:
224 query = query.offset(offset)
226 if limit is not None:
227 query = query.limit(limit)
229 result = await session.execute(query)
230 return result.scalars().unique().all()
233@db_injector 1a
234async def count_flows( 1a
235 db: PrefectDBInterface,
236 session: AsyncSession,
237 flow_filter: Union[schemas.filters.FlowFilter, None] = None,
238 flow_run_filter: Union[schemas.filters.FlowRunFilter, None] = None,
239 task_run_filter: Union[schemas.filters.TaskRunFilter, None] = None,
240 deployment_filter: Union[schemas.filters.DeploymentFilter, None] = None,
241 work_pool_filter: Union[schemas.filters.WorkPoolFilter, None] = None,
242) -> int:
243 """
244 Count flows.
246 Args:
247 session: A database session
248 flow_filter: only count flows that match these filters
249 flow_run_filter: only count flows whose flow runs match these filters
250 task_run_filter: only count flows whose task runs match these filters
251 deployment_filter: only count flows whose deployments match these filters
252 work_pool_filter: only count flows whose work pools match these filters
254 Returns:
255 int: count of flows
256 """
258 query = select(sa.func.count(None)).select_from(db.Flow)
260 query = await _apply_flow_filters(
261 db,
262 query,
263 flow_filter=flow_filter,
264 flow_run_filter=flow_run_filter,
265 task_run_filter=task_run_filter,
266 deployment_filter=deployment_filter,
267 work_pool_filter=work_pool_filter,
268 )
270 result = await session.execute(query)
271 return result.scalar_one()
274@db_injector 1a
275async def delete_flow( 1a
276 db: PrefectDBInterface, session: AsyncSession, flow_id: UUID
277) -> bool:
278 """
279 Delete a flow by id.
281 Args:
282 session: A database session
283 flow_id: a flow id
285 Returns:
286 bool: whether or not the flow was deleted
287 """
289 result = await session.execute(delete(db.Flow).where(db.Flow.id == flow_id))
290 return result.rowcount > 0
293@db_injector 1a
294async def read_flow_labels( 1a
295 db: PrefectDBInterface,
296 session: AsyncSession,
297 flow_id: UUID,
298) -> Union[schemas.core.KeyValueLabels, None]:
299 result = await session.execute(select(db.Flow.labels).where(db.Flow.id == flow_id))
301 return result.scalar()