Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/artifacts.py: 75%
139 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from typing import Any, Optional, Sequence, TypeVar, Union 1c
2from uuid import UUID 1c
4import sqlalchemy as sa 1c
5from sqlalchemy import select 1c
6from sqlalchemy.ext.asyncio import AsyncSession 1c
7from sqlalchemy.sql import Select 1c
9from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1c
10from prefect.server.schemas import actions, filters, sorting 1c
11from prefect.server.schemas.core import Artifact 1c
12from prefect.types._datetime import DateTime, now 1c
14T = TypeVar("T", bound=tuple[Any, ...]) 1c
17@db_injector 1c
18async def _insert_into_artifact_collection( 1c
19 db: PrefectDBInterface,
20 session: AsyncSession,
21 artifact: Artifact,
22 now: Optional[DateTime] = None,
23) -> orm_models.ArtifactCollection:
24 """
25 Inserts a new artifact into the artifact_collection table or updates it.
26 """
27 insert_values = artifact.model_dump_for_orm( 1ba
28 exclude_unset=True, exclude={"id", "updated", "created"}
29 )
30 upsert_new_latest_id = ( 1ba
31 db.queries.insert(db.ArtifactCollection)
32 .values(latest_id=artifact.id, updated=now, created=now, **insert_values)
33 .on_conflict_do_update(
34 index_elements=db.orm.artifact_collection_unique_upsert_columns,
35 set_=dict(
36 latest_id=artifact.id,
37 updated=now,
38 **insert_values,
39 ),
40 )
41 )
43 await session.execute(upsert_new_latest_id) 1ba
45 query = (
46 sa.select(db.ArtifactCollection)
47 .where(sa.and_(db.ArtifactCollection.key == artifact.key))
48 .execution_options(populate_existing=True)
49 )
51 result = await session.execute(query) 1ba
53 model = result.scalar()
55 if model is not None:
56 if model.latest_id != artifact.id:
57 raise ValueError(
58 f"Artifact {artifact.id} was not inserted into the artifact collection"
59 " table."
60 )
61 if model is None:
62 raise ValueError(
63 f"Artifact {artifact.id} was not inserted into the artifact collection"
64 " table."
65 )
67 return model
70@db_injector 1c
71async def _insert_into_artifact( 1c
72 db: PrefectDBInterface,
73 session: AsyncSession,
74 artifact: Artifact,
75 now: Optional[DateTime] = None,
76) -> orm_models.Artifact:
77 """
78 Inserts a new artifact into the artifact table.
79 """
80 artifact_id = artifact.id 1bda
81 insert_stmt = db.queries.insert(db.Artifact).values( 1bda
82 created=now,
83 updated=now,
84 **artifact.model_dump_for_orm(exclude={"created", "updated"}),
85 )
86 await session.execute(insert_stmt) 1bda
88 query = (
89 sa.select(db.Artifact)
90 .where(db.Artifact.id == artifact_id)
91 .limit(1)
92 .execution_options(populate_existing=True)
93 )
95 result = await session.execute(query) 1bda
96 return result.scalar_one()
99async def create_artifact( 1c
100 session: AsyncSession,
101 artifact: Artifact,
102) -> orm_models.Artifact:
103 right_now = now("UTC") 1bda
105 if artifact.key is not None: 1bda
106 await _insert_into_artifact_collection( 1ba
107 session=session, now=right_now, artifact=artifact
108 )
110 result = await _insert_into_artifact( 1bda
111 session=session,
112 now=right_now,
113 artifact=artifact,
114 )
116 return result
119@db_injector 1c
120async def read_latest_artifact( 1c
121 db: PrefectDBInterface,
122 session: AsyncSession,
123 key: str,
124) -> Union[orm_models.ArtifactCollection, None]:
125 """
126 Reads the latest artifact by key.
127 Args:
128 session: A database session
129 key: The artifact key
130 Returns:
131 Artifact: The latest artifact
132 """
133 latest_artifact_query = sa.select(db.ArtifactCollection).where( 1ba
134 db.ArtifactCollection.key == key
135 )
136 result = await session.execute(latest_artifact_query) 1ba
137 return result.scalar()
140@db_injector 1c
141async def read_artifact( 1c
142 db: PrefectDBInterface,
143 session: AsyncSession,
144 artifact_id: UUID,
145) -> Union[orm_models.Artifact, None]:
146 """
147 Reads an artifact by id.
148 """
150 query = sa.select(db.Artifact).where(db.Artifact.id == artifact_id) 1ba
152 result = await session.execute(query) 1ba
153 return result.scalar()
156async def _apply_artifact_filters( 1c
157 db: PrefectDBInterface,
158 query: Select[T],
159 flow_run_filter: Optional[filters.FlowRunFilter] = None,
160 task_run_filter: Optional[filters.TaskRunFilter] = None,
161 artifact_filter: Optional[filters.ArtifactFilter] = None,
162 deployment_filter: Optional[filters.DeploymentFilter] = None,
163 flow_filter: Optional[filters.FlowFilter] = None,
164) -> Select[T]:
165 """Applies filters to an artifact query as a combination of EXISTS subqueries."""
166 if artifact_filter: 1bdea
167 query = query.where(artifact_filter.as_sql_filter()) 1bdea
169 if flow_filter or flow_run_filter or deployment_filter: 1bdea
170 flow_run_exists_clause = select(db.FlowRun).where( 1ba
171 db.Artifact.flow_run_id == db.FlowRun.id
172 )
173 if flow_run_filter: 1ba
174 flow_run_exists_clause = flow_run_exists_clause.where( 1ba
175 flow_run_filter.as_sql_filter()
176 )
178 if flow_filter: 1ba
179 flow_run_exists_clause = flow_run_exists_clause.join( 1ba
180 db.Flow, db.Flow.id == db.FlowRun.flow_id
181 ).where(flow_filter.as_sql_filter())
183 if deployment_filter: 1ba
184 flow_run_exists_clause = flow_run_exists_clause.join( 1ba
185 db.Deployment, db.Deployment.id == db.FlowRun.deployment_id
186 ).where(deployment_filter.as_sql_filter())
188 query = query.where(flow_run_exists_clause.exists()) 1ba
190 if task_run_filter: 1bdea
191 task_run_exists_clause = select(db.TaskRun).where( 1ba
192 db.Artifact.task_run_id == db.TaskRun.id
193 )
194 task_run_exists_clause = task_run_exists_clause.where( 1ba
195 task_run_filter.as_sql_filter()
196 )
198 query = query.where(task_run_exists_clause.exists()) 1ba
200 return query 1bdea
203async def _apply_artifact_collection_filters( 1c
204 db: PrefectDBInterface,
205 query: Select[T],
206 flow_run_filter: Optional[filters.FlowRunFilter] = None,
207 task_run_filter: Optional[filters.TaskRunFilter] = None,
208 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None,
209 deployment_filter: Optional[filters.DeploymentFilter] = None,
210 flow_filter: Optional[filters.FlowFilter] = None,
211) -> Select[T]:
212 """Applies filters to an artifact collection query as a combination of EXISTS subqueries."""
213 if artifact_filter: 1ba
214 query = query.where(artifact_filter.as_sql_filter())
216 if flow_filter or flow_run_filter or deployment_filter: 1ba
217 flow_run_exists_clause = select(db.FlowRun).where( 1ba
218 db.ArtifactCollection.flow_run_id == db.FlowRun.id
219 )
220 if flow_run_filter: 220 ↛ 225line 220 didn't jump to line 225 because the condition on line 220 was always true1ba
221 flow_run_exists_clause = flow_run_exists_clause.where( 1ba
222 flow_run_filter.as_sql_filter()
223 )
225 if flow_filter: 1ba
226 flow_run_exists_clause = flow_run_exists_clause.join(
227 db.Flow, db.Flow.id == db.FlowRun.flow_id
228 ).where(flow_filter.as_sql_filter())
230 if deployment_filter: 1ba
231 flow_run_exists_clause = flow_run_exists_clause.join(
232 db.Deployment, db.Deployment.id == db.FlowRun.deployment_id
233 ).where(deployment_filter.as_sql_filter())
235 query = query.where(flow_run_exists_clause.exists()) 1ba
237 if task_run_filter: 1ba
238 task_run_exists_clause = select(db.TaskRun).where(
239 db.ArtifactCollection.task_run_id == db.TaskRun.id
240 )
241 task_run_exists_clause = task_run_exists_clause.where(
242 task_run_filter.as_sql_filter()
243 )
245 query = query.where(task_run_exists_clause.exists())
247 return query 1ba
250@db_injector 1c
251async def read_artifacts( 1c
252 db: PrefectDBInterface,
253 session: AsyncSession,
254 offset: Optional[int] = None,
255 limit: Optional[int] = None,
256 artifact_filter: Optional[filters.ArtifactFilter] = None,
257 flow_run_filter: Optional[filters.FlowRunFilter] = None,
258 task_run_filter: Optional[filters.TaskRunFilter] = None,
259 deployment_filter: Optional[filters.DeploymentFilter] = None,
260 flow_filter: Optional[filters.FlowFilter] = None,
261 sort: sorting.ArtifactSort = sorting.ArtifactSort.ID_DESC,
262) -> Sequence[orm_models.Artifact]:
263 """
264 Reads artifacts.
266 Args:
267 session: A database session
268 offset: Query offset
269 limit: Query limit
270 artifact_filter: Only select artifacts matching this filter
271 flow_run_filter: Only select artifacts whose flow runs matching this filter
272 task_run_filter: Only select artifacts whose task runs matching this filter
273 deployment_filter: Only select artifacts whose flow runs belong to deployments matching this filter
274 flow_filter: Only select artifacts whose flow runs belong to flows matching this filter
275 work_pool_filter: Only select artifacts whose flow runs belong to work pools matching this filter
276 """
277 query = sa.select(db.Artifact).order_by(*sort.as_sql_sort()) 1bdea
279 query = await _apply_artifact_filters( 1bdea
280 db,
281 query,
282 artifact_filter=artifact_filter,
283 flow_run_filter=flow_run_filter,
284 task_run_filter=task_run_filter,
285 deployment_filter=deployment_filter,
286 flow_filter=flow_filter,
287 )
289 if offset is not None: 289 ↛ 291line 289 didn't jump to line 291 because the condition on line 289 was always true1bdea
290 query = query.offset(offset) 1bdea
291 if limit is not None: 291 ↛ 294line 291 didn't jump to line 294 because the condition on line 291 was always true1bdea
292 query = query.limit(limit) 1bdea
294 result = await session.execute(query) 1bdea
295 return result.scalars().unique().all()
298@db_injector 1c
299async def read_latest_artifacts( 1c
300 db: PrefectDBInterface,
301 session: AsyncSession,
302 offset: Optional[int] = None,
303 limit: Optional[int] = None,
304 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None,
305 flow_run_filter: Optional[filters.FlowRunFilter] = None,
306 task_run_filter: Optional[filters.TaskRunFilter] = None,
307 deployment_filter: Optional[filters.DeploymentFilter] = None,
308 flow_filter: Optional[filters.FlowFilter] = None,
309 sort: sorting.ArtifactCollectionSort = sorting.ArtifactCollectionSort.ID_DESC,
310) -> Sequence[orm_models.ArtifactCollection]:
311 """
312 Reads artifacts.
314 Args:
315 session: A database session
316 offset: Query offset
317 limit: Query limit
318 artifact_filter: Only select artifacts matching this filter
319 flow_run_filter: Only select artifacts whose flow runs matching this filter
320 task_run_filter: Only select artifacts whose task runs matching this filter
321 deployment_filter: Only select artifacts whose flow runs belong to deployments matching this filter
322 flow_filter: Only select artifacts whose flow runs belong to flows matching this filter
323 work_pool_filter: Only select artifacts whose flow runs belong to work pools matching this filter
324 """
325 query = sa.select(db.ArtifactCollection).order_by(*sort.as_sql_sort()) 1ba
326 query = await _apply_artifact_collection_filters( 1ba
327 db,
328 query,
329 artifact_filter=artifact_filter,
330 flow_run_filter=flow_run_filter,
331 task_run_filter=task_run_filter,
332 deployment_filter=deployment_filter,
333 flow_filter=flow_filter,
334 )
336 if offset is not None: 336 ↛ 338line 336 didn't jump to line 338 because the condition on line 336 was always true1ba
337 query = query.offset(offset) 1ba
338 if limit is not None: 338 ↛ 341line 338 didn't jump to line 341 because the condition on line 338 was always true1ba
339 query = query.limit(limit) 1ba
341 result = await session.execute(query) 1ba
342 return result.scalars().unique().all()
345@db_injector 1c
346async def count_artifacts( 1c
347 db: PrefectDBInterface,
348 session: AsyncSession,
349 artifact_filter: Optional[filters.ArtifactFilter] = None,
350 flow_run_filter: Optional[filters.FlowRunFilter] = None,
351 task_run_filter: Optional[filters.TaskRunFilter] = None,
352 deployment_filter: Optional[filters.DeploymentFilter] = None,
353 flow_filter: Optional[filters.FlowFilter] = None,
354) -> int:
355 """
356 Counts artifacts.
357 Args:
358 session: A database session
359 artifact_filter: Only select artifacts matching this filter
360 flow_run_filter: Only select artifacts whose flow runs matching this filter
361 task_run_filter: Only select artifacts whose task runs matching this filter
362 """
363 query = sa.select(sa.func.count(db.Artifact.id)) 1ba
365 query = await _apply_artifact_filters( 1ba
366 db,
367 query,
368 artifact_filter=artifact_filter,
369 flow_run_filter=flow_run_filter,
370 task_run_filter=task_run_filter,
371 deployment_filter=deployment_filter,
372 flow_filter=flow_filter,
373 )
375 result = await session.execute(query) 1ba
376 return result.scalar_one()
379@db_injector 1c
380async def count_latest_artifacts( 1c
381 db: PrefectDBInterface,
382 session: AsyncSession,
383 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None,
384 flow_run_filter: Optional[filters.FlowRunFilter] = None,
385 task_run_filter: Optional[filters.TaskRunFilter] = None,
386 deployment_filter: Optional[filters.DeploymentFilter] = None,
387 flow_filter: Optional[filters.FlowFilter] = None,
388) -> int:
389 """
390 Counts artifacts.
391 Args:
392 session: A database session
393 artifact_filter: Only select artifacts matching this filter
394 flow_run_filter: Only select artifacts whose flow runs matching this filter
395 task_run_filter: Only select artifacts whose task runs matching this filter
396 """
397 query = sa.select(sa.func.count(db.ArtifactCollection.id)) 1ba
399 query = await _apply_artifact_collection_filters( 1ba
400 db,
401 query,
402 artifact_filter=artifact_filter,
403 flow_run_filter=flow_run_filter,
404 task_run_filter=task_run_filter,
405 deployment_filter=deployment_filter,
406 flow_filter=flow_filter,
407 )
409 result = await session.execute(query) 1ba
410 return result.scalar_one()
413@db_injector 1c
414async def update_artifact( 1c
415 db: PrefectDBInterface,
416 session: AsyncSession,
417 artifact_id: UUID,
418 artifact: actions.ArtifactUpdate,
419) -> bool:
420 """
421 Updates an artifact by id.
423 Args:
424 session: A database session
425 artifact_id (UUID): The artifact id to update
426 artifact: An artifact model
428 Returns:
429 bool: True if the update was successful, False otherwise
430 """
431 update_artifact_data = artifact.model_dump_for_orm(exclude_unset=True) 1ba
433 update_artifact_stmt = ( 1ba
434 sa.update(db.Artifact)
435 .where(db.Artifact.id == artifact_id)
436 .values(**update_artifact_data)
437 )
439 artifact_result = await session.execute(update_artifact_stmt) 1ba
441 update_artifact_collection_data = artifact.model_dump_for_orm(exclude_unset=True)
442 update_artifact_collection_stmt = (
443 sa.update(db.ArtifactCollection)
444 .where(db.ArtifactCollection.latest_id == artifact_id)
445 .values(**update_artifact_collection_data)
446 )
447 collection_result = await session.execute(update_artifact_collection_stmt) 1ba
449 return artifact_result.rowcount + collection_result.rowcount > 0
452@db_injector 1c
453async def delete_artifact( 1c
454 db: PrefectDBInterface, session: AsyncSession, artifact_id: UUID
455) -> bool:
456 """
457 Deletes an artifact by id.
459 The ArtifactCollection table is used to track the latest version of an artifact
460 by key. If we are deleting the latest version of an artifact from the Artifact
461 table, we need to first update the latest version referenced in ArtifactCollection
462 so that it points to the next latest version of the artifact.
464 Example:
465 If we have the following artifacts in Artifact:
466 - key: "foo", id: 1, created: 2020-01-01
467 - key: "foo", id: 2, created: 2020-01-02
468 - key: "foo", id: 3, created: 2020-01-03
470 the ArtifactCollection table has the following entry:
471 - key: "foo", latest_id: 3
473 If we delete the artifact with id 3, we need to update the latest version of the
474 artifact with key "foo" to be the artifact with id 2.
476 Args:
477 session: A database session
478 artifact_id (UUID): The artifact id to delete
480 Returns:
481 bool: True if the delete was successful, False otherwise
482 """
483 artifact = await session.get(db.Artifact, artifact_id) 1bda
484 if artifact is None:
485 return False
487 is_latest_version = (
488 await session.execute(
489 sa.select(db.ArtifactCollection)
490 .where(db.ArtifactCollection.key == artifact.key)
491 .where(db.ArtifactCollection.latest_id == artifact_id)
492 )
493 ).scalar_one_or_none() is not None
495 if is_latest_version:
496 next_latest_version = (
497 await session.execute(
498 sa.select(db.Artifact)
499 .where(db.Artifact.key == artifact.key)
500 .where(db.Artifact.id != artifact_id)
501 .order_by(db.Artifact.created.desc())
502 .limit(1)
503 )
504 ).scalar_one_or_none()
506 if next_latest_version is not None:
507 set_next_latest_version = (
508 sa.update(db.ArtifactCollection)
509 .where(db.ArtifactCollection.key == artifact.key)
510 .values(
511 latest_id=next_latest_version.id,
512 data=next_latest_version.data,
513 description=next_latest_version.description,
514 type=next_latest_version.type,
515 created=next_latest_version.created,
516 updated=next_latest_version.updated,
517 flow_run_id=next_latest_version.flow_run_id,
518 task_run_id=next_latest_version.task_run_id,
519 metadata_=next_latest_version.metadata_,
520 )
521 )
522 await session.execute(set_next_latest_version)
524 else:
525 await session.execute(
526 sa.delete(db.ArtifactCollection)
527 .where(db.ArtifactCollection.key == artifact.key)
528 .where(db.ArtifactCollection.latest_id == artifact_id)
529 )
531 delete_stmt = sa.delete(db.Artifact).where(db.Artifact.id == artifact_id)
533 result = await session.execute(delete_stmt) 1da
534 return result.rowcount > 0