Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/models/artifacts.py: 18%
139 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from typing import Any, Optional, Sequence, TypeVar, Union 1a
2from uuid import UUID 1a
4import sqlalchemy as sa 1a
5from sqlalchemy import select 1a
6from sqlalchemy.ext.asyncio import AsyncSession 1a
7from sqlalchemy.sql import Select 1a
9from prefect.server.database import PrefectDBInterface, db_injector, orm_models 1a
10from prefect.server.schemas import actions, filters, sorting 1a
11from prefect.server.schemas.core import Artifact 1a
12from prefect.types._datetime import DateTime, now 1a
14T = TypeVar("T", bound=tuple[Any, ...]) 1a
17@db_injector 1a
18async def _insert_into_artifact_collection( 1a
19 db: PrefectDBInterface,
20 session: AsyncSession,
21 artifact: Artifact,
22 now: Optional[DateTime] = None,
23) -> orm_models.ArtifactCollection:
24 """
25 Inserts a new artifact into the artifact_collection table or updates it.
26 """
27 insert_values = artifact.model_dump_for_orm(
28 exclude_unset=True, exclude={"id", "updated", "created"}
29 )
30 upsert_new_latest_id = (
31 db.queries.insert(db.ArtifactCollection)
32 .values(latest_id=artifact.id, updated=now, created=now, **insert_values)
33 .on_conflict_do_update(
34 index_elements=db.orm.artifact_collection_unique_upsert_columns,
35 set_=dict(
36 latest_id=artifact.id,
37 updated=now,
38 **insert_values,
39 ),
40 )
41 )
43 await session.execute(upsert_new_latest_id)
45 query = (
46 sa.select(db.ArtifactCollection)
47 .where(sa.and_(db.ArtifactCollection.key == artifact.key))
48 .execution_options(populate_existing=True)
49 )
51 result = await session.execute(query)
53 model = result.scalar()
55 if model is not None:
56 if model.latest_id != artifact.id:
57 raise ValueError(
58 f"Artifact {artifact.id} was not inserted into the artifact collection"
59 " table."
60 )
61 if model is None:
62 raise ValueError(
63 f"Artifact {artifact.id} was not inserted into the artifact collection"
64 " table."
65 )
67 return model
70@db_injector 1a
71async def _insert_into_artifact( 1a
72 db: PrefectDBInterface,
73 session: AsyncSession,
74 artifact: Artifact,
75 now: Optional[DateTime] = None,
76) -> orm_models.Artifact:
77 """
78 Inserts a new artifact into the artifact table.
79 """
80 artifact_id = artifact.id
81 insert_stmt = db.queries.insert(db.Artifact).values(
82 created=now,
83 updated=now,
84 **artifact.model_dump_for_orm(exclude={"created", "updated"}),
85 )
86 await session.execute(insert_stmt)
88 query = (
89 sa.select(db.Artifact)
90 .where(db.Artifact.id == artifact_id)
91 .limit(1)
92 .execution_options(populate_existing=True)
93 )
95 result = await session.execute(query)
96 return result.scalar_one()
99async def create_artifact( 1a
100 session: AsyncSession,
101 artifact: Artifact,
102) -> orm_models.Artifact:
103 right_now = now("UTC")
105 if artifact.key is not None:
106 await _insert_into_artifact_collection(
107 session=session, now=right_now, artifact=artifact
108 )
110 result = await _insert_into_artifact(
111 session=session,
112 now=right_now,
113 artifact=artifact,
114 )
116 return result
119@db_injector 1a
120async def read_latest_artifact( 1a
121 db: PrefectDBInterface,
122 session: AsyncSession,
123 key: str,
124) -> Union[orm_models.ArtifactCollection, None]:
125 """
126 Reads the latest artifact by key.
127 Args:
128 session: A database session
129 key: The artifact key
130 Returns:
131 Artifact: The latest artifact
132 """
133 latest_artifact_query = sa.select(db.ArtifactCollection).where(
134 db.ArtifactCollection.key == key
135 )
136 result = await session.execute(latest_artifact_query)
137 return result.scalar()
140@db_injector 1a
141async def read_artifact( 1a
142 db: PrefectDBInterface,
143 session: AsyncSession,
144 artifact_id: UUID,
145) -> Union[orm_models.Artifact, None]:
146 """
147 Reads an artifact by id.
148 """
150 query = sa.select(db.Artifact).where(db.Artifact.id == artifact_id)
152 result = await session.execute(query)
153 return result.scalar()
156async def _apply_artifact_filters( 1a
157 db: PrefectDBInterface,
158 query: Select[T],
159 flow_run_filter: Optional[filters.FlowRunFilter] = None,
160 task_run_filter: Optional[filters.TaskRunFilter] = None,
161 artifact_filter: Optional[filters.ArtifactFilter] = None,
162 deployment_filter: Optional[filters.DeploymentFilter] = None,
163 flow_filter: Optional[filters.FlowFilter] = None,
164) -> Select[T]:
165 """Applies filters to an artifact query as a combination of EXISTS subqueries."""
166 if artifact_filter:
167 query = query.where(artifact_filter.as_sql_filter())
169 if flow_filter or flow_run_filter or deployment_filter:
170 flow_run_exists_clause = select(db.FlowRun).where(
171 db.Artifact.flow_run_id == db.FlowRun.id
172 )
173 if flow_run_filter:
174 flow_run_exists_clause = flow_run_exists_clause.where(
175 flow_run_filter.as_sql_filter()
176 )
178 if flow_filter:
179 flow_run_exists_clause = flow_run_exists_clause.join(
180 db.Flow, db.Flow.id == db.FlowRun.flow_id
181 ).where(flow_filter.as_sql_filter())
183 if deployment_filter:
184 flow_run_exists_clause = flow_run_exists_clause.join(
185 db.Deployment, db.Deployment.id == db.FlowRun.deployment_id
186 ).where(deployment_filter.as_sql_filter())
188 query = query.where(flow_run_exists_clause.exists())
190 if task_run_filter:
191 task_run_exists_clause = select(db.TaskRun).where(
192 db.Artifact.task_run_id == db.TaskRun.id
193 )
194 task_run_exists_clause = task_run_exists_clause.where(
195 task_run_filter.as_sql_filter()
196 )
198 query = query.where(task_run_exists_clause.exists())
200 return query
203async def _apply_artifact_collection_filters( 1a
204 db: PrefectDBInterface,
205 query: Select[T],
206 flow_run_filter: Optional[filters.FlowRunFilter] = None,
207 task_run_filter: Optional[filters.TaskRunFilter] = None,
208 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None,
209 deployment_filter: Optional[filters.DeploymentFilter] = None,
210 flow_filter: Optional[filters.FlowFilter] = None,
211) -> Select[T]:
212 """Applies filters to an artifact collection query as a combination of EXISTS subqueries."""
213 if artifact_filter:
214 query = query.where(artifact_filter.as_sql_filter())
216 if flow_filter or flow_run_filter or deployment_filter:
217 flow_run_exists_clause = select(db.FlowRun).where(
218 db.ArtifactCollection.flow_run_id == db.FlowRun.id
219 )
220 if flow_run_filter:
221 flow_run_exists_clause = flow_run_exists_clause.where(
222 flow_run_filter.as_sql_filter()
223 )
225 if flow_filter:
226 flow_run_exists_clause = flow_run_exists_clause.join(
227 db.Flow, db.Flow.id == db.FlowRun.flow_id
228 ).where(flow_filter.as_sql_filter())
230 if deployment_filter:
231 flow_run_exists_clause = flow_run_exists_clause.join(
232 db.Deployment, db.Deployment.id == db.FlowRun.deployment_id
233 ).where(deployment_filter.as_sql_filter())
235 query = query.where(flow_run_exists_clause.exists())
237 if task_run_filter:
238 task_run_exists_clause = select(db.TaskRun).where(
239 db.ArtifactCollection.task_run_id == db.TaskRun.id
240 )
241 task_run_exists_clause = task_run_exists_clause.where(
242 task_run_filter.as_sql_filter()
243 )
245 query = query.where(task_run_exists_clause.exists())
247 return query
250@db_injector 1a
251async def read_artifacts( 1a
252 db: PrefectDBInterface,
253 session: AsyncSession,
254 offset: Optional[int] = None,
255 limit: Optional[int] = None,
256 artifact_filter: Optional[filters.ArtifactFilter] = None,
257 flow_run_filter: Optional[filters.FlowRunFilter] = None,
258 task_run_filter: Optional[filters.TaskRunFilter] = None,
259 deployment_filter: Optional[filters.DeploymentFilter] = None,
260 flow_filter: Optional[filters.FlowFilter] = None,
261 sort: sorting.ArtifactSort = sorting.ArtifactSort.ID_DESC,
262) -> Sequence[orm_models.Artifact]:
263 """
264 Reads artifacts.
266 Args:
267 session: A database session
268 offset: Query offset
269 limit: Query limit
270 artifact_filter: Only select artifacts matching this filter
271 flow_run_filter: Only select artifacts whose flow runs matching this filter
272 task_run_filter: Only select artifacts whose task runs matching this filter
273 deployment_filter: Only select artifacts whose flow runs belong to deployments matching this filter
274 flow_filter: Only select artifacts whose flow runs belong to flows matching this filter
275 work_pool_filter: Only select artifacts whose flow runs belong to work pools matching this filter
276 """
277 query = sa.select(db.Artifact).order_by(*sort.as_sql_sort())
279 query = await _apply_artifact_filters(
280 db,
281 query,
282 artifact_filter=artifact_filter,
283 flow_run_filter=flow_run_filter,
284 task_run_filter=task_run_filter,
285 deployment_filter=deployment_filter,
286 flow_filter=flow_filter,
287 )
289 if offset is not None:
290 query = query.offset(offset)
291 if limit is not None:
292 query = query.limit(limit)
294 result = await session.execute(query)
295 return result.scalars().unique().all()
298@db_injector 1a
299async def read_latest_artifacts( 1a
300 db: PrefectDBInterface,
301 session: AsyncSession,
302 offset: Optional[int] = None,
303 limit: Optional[int] = None,
304 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None,
305 flow_run_filter: Optional[filters.FlowRunFilter] = None,
306 task_run_filter: Optional[filters.TaskRunFilter] = None,
307 deployment_filter: Optional[filters.DeploymentFilter] = None,
308 flow_filter: Optional[filters.FlowFilter] = None,
309 sort: sorting.ArtifactCollectionSort = sorting.ArtifactCollectionSort.ID_DESC,
310) -> Sequence[orm_models.ArtifactCollection]:
311 """
312 Reads artifacts.
314 Args:
315 session: A database session
316 offset: Query offset
317 limit: Query limit
318 artifact_filter: Only select artifacts matching this filter
319 flow_run_filter: Only select artifacts whose flow runs matching this filter
320 task_run_filter: Only select artifacts whose task runs matching this filter
321 deployment_filter: Only select artifacts whose flow runs belong to deployments matching this filter
322 flow_filter: Only select artifacts whose flow runs belong to flows matching this filter
323 work_pool_filter: Only select artifacts whose flow runs belong to work pools matching this filter
324 """
325 query = sa.select(db.ArtifactCollection).order_by(*sort.as_sql_sort())
326 query = await _apply_artifact_collection_filters(
327 db,
328 query,
329 artifact_filter=artifact_filter,
330 flow_run_filter=flow_run_filter,
331 task_run_filter=task_run_filter,
332 deployment_filter=deployment_filter,
333 flow_filter=flow_filter,
334 )
336 if offset is not None:
337 query = query.offset(offset)
338 if limit is not None:
339 query = query.limit(limit)
341 result = await session.execute(query)
342 return result.scalars().unique().all()
345@db_injector 1a
346async def count_artifacts( 1a
347 db: PrefectDBInterface,
348 session: AsyncSession,
349 artifact_filter: Optional[filters.ArtifactFilter] = None,
350 flow_run_filter: Optional[filters.FlowRunFilter] = None,
351 task_run_filter: Optional[filters.TaskRunFilter] = None,
352 deployment_filter: Optional[filters.DeploymentFilter] = None,
353 flow_filter: Optional[filters.FlowFilter] = None,
354) -> int:
355 """
356 Counts artifacts.
357 Args:
358 session: A database session
359 artifact_filter: Only select artifacts matching this filter
360 flow_run_filter: Only select artifacts whose flow runs matching this filter
361 task_run_filter: Only select artifacts whose task runs matching this filter
362 """
363 query = sa.select(sa.func.count(db.Artifact.id))
365 query = await _apply_artifact_filters(
366 db,
367 query,
368 artifact_filter=artifact_filter,
369 flow_run_filter=flow_run_filter,
370 task_run_filter=task_run_filter,
371 deployment_filter=deployment_filter,
372 flow_filter=flow_filter,
373 )
375 result = await session.execute(query)
376 return result.scalar_one()
379@db_injector 1a
380async def count_latest_artifacts( 1a
381 db: PrefectDBInterface,
382 session: AsyncSession,
383 artifact_filter: Optional[filters.ArtifactCollectionFilter] = None,
384 flow_run_filter: Optional[filters.FlowRunFilter] = None,
385 task_run_filter: Optional[filters.TaskRunFilter] = None,
386 deployment_filter: Optional[filters.DeploymentFilter] = None,
387 flow_filter: Optional[filters.FlowFilter] = None,
388) -> int:
389 """
390 Counts artifacts.
391 Args:
392 session: A database session
393 artifact_filter: Only select artifacts matching this filter
394 flow_run_filter: Only select artifacts whose flow runs matching this filter
395 task_run_filter: Only select artifacts whose task runs matching this filter
396 """
397 query = sa.select(sa.func.count(db.ArtifactCollection.id))
399 query = await _apply_artifact_collection_filters(
400 db,
401 query,
402 artifact_filter=artifact_filter,
403 flow_run_filter=flow_run_filter,
404 task_run_filter=task_run_filter,
405 deployment_filter=deployment_filter,
406 flow_filter=flow_filter,
407 )
409 result = await session.execute(query)
410 return result.scalar_one()
413@db_injector 1a
414async def update_artifact( 1a
415 db: PrefectDBInterface,
416 session: AsyncSession,
417 artifact_id: UUID,
418 artifact: actions.ArtifactUpdate,
419) -> bool:
420 """
421 Updates an artifact by id.
423 Args:
424 session: A database session
425 artifact_id (UUID): The artifact id to update
426 artifact: An artifact model
428 Returns:
429 bool: True if the update was successful, False otherwise
430 """
431 update_artifact_data = artifact.model_dump_for_orm(exclude_unset=True)
433 update_artifact_stmt = (
434 sa.update(db.Artifact)
435 .where(db.Artifact.id == artifact_id)
436 .values(**update_artifact_data)
437 )
439 artifact_result = await session.execute(update_artifact_stmt)
441 update_artifact_collection_data = artifact.model_dump_for_orm(exclude_unset=True)
442 update_artifact_collection_stmt = (
443 sa.update(db.ArtifactCollection)
444 .where(db.ArtifactCollection.latest_id == artifact_id)
445 .values(**update_artifact_collection_data)
446 )
447 collection_result = await session.execute(update_artifact_collection_stmt)
449 return artifact_result.rowcount + collection_result.rowcount > 0
452@db_injector 1a
453async def delete_artifact( 1a
454 db: PrefectDBInterface, session: AsyncSession, artifact_id: UUID
455) -> bool:
456 """
457 Deletes an artifact by id.
459 The ArtifactCollection table is used to track the latest version of an artifact
460 by key. If we are deleting the latest version of an artifact from the Artifact
461 table, we need to first update the latest version referenced in ArtifactCollection
462 so that it points to the next latest version of the artifact.
464 Example:
465 If we have the following artifacts in Artifact:
466 - key: "foo", id: 1, created: 2020-01-01
467 - key: "foo", id: 2, created: 2020-01-02
468 - key: "foo", id: 3, created: 2020-01-03
470 the ArtifactCollection table has the following entry:
471 - key: "foo", latest_id: 3
473 If we delete the artifact with id 3, we need to update the latest version of the
474 artifact with key "foo" to be the artifact with id 2.
476 Args:
477 session: A database session
478 artifact_id (UUID): The artifact id to delete
480 Returns:
481 bool: True if the delete was successful, False otherwise
482 """
483 artifact = await session.get(db.Artifact, artifact_id)
484 if artifact is None:
485 return False
487 is_latest_version = (
488 await session.execute(
489 sa.select(db.ArtifactCollection)
490 .where(db.ArtifactCollection.key == artifact.key)
491 .where(db.ArtifactCollection.latest_id == artifact_id)
492 )
493 ).scalar_one_or_none() is not None
495 if is_latest_version:
496 next_latest_version = (
497 await session.execute(
498 sa.select(db.Artifact)
499 .where(db.Artifact.key == artifact.key)
500 .where(db.Artifact.id != artifact_id)
501 .order_by(db.Artifact.created.desc())
502 .limit(1)
503 )
504 ).scalar_one_or_none()
506 if next_latest_version is not None:
507 set_next_latest_version = (
508 sa.update(db.ArtifactCollection)
509 .where(db.ArtifactCollection.key == artifact.key)
510 .values(
511 latest_id=next_latest_version.id,
512 data=next_latest_version.data,
513 description=next_latest_version.description,
514 type=next_latest_version.type,
515 created=next_latest_version.created,
516 updated=next_latest_version.updated,
517 flow_run_id=next_latest_version.flow_run_id,
518 task_run_id=next_latest_version.task_run_id,
519 metadata_=next_latest_version.metadata_,
520 )
521 )
522 await session.execute(set_next_latest_version)
524 else:
525 await session.execute(
526 sa.delete(db.ArtifactCollection)
527 .where(db.ArtifactCollection.key == artifact.key)
528 .where(db.ArtifactCollection.latest_id == artifact_id)
529 )
531 delete_stmt = sa.delete(db.Artifact).where(db.Artifact.id == artifact_id)
533 result = await session.execute(delete_stmt)
534 return result.rowcount > 0