Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/artifacts.py: 39%
61 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Routes for interacting with artifact objects.
3"""
5from typing import List 1a
6from uuid import UUID 1a
8from fastapi import Body, Depends, HTTPException, Path, Response, status 1a
10import prefect.server.api.dependencies as dependencies 1a
11from prefect.server import models 1a
12from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
13from prefect.server.schemas import actions, core, filters, sorting 1a
14from prefect.server.utilities.server import PrefectRouter 1a
15from prefect.types._datetime import now 1a
17router: PrefectRouter = PrefectRouter( 1a
18 prefix="/artifacts",
19 tags=["Artifacts"],
20)
23@router.post("/") 1a
24async def create_artifact( 1a
25 artifact: actions.ArtifactCreate,
26 response: Response,
27 db: PrefectDBInterface = Depends(provide_database_interface),
28) -> core.Artifact:
29 """
30 Create an artifact.
32 For more information, see https://docs.prefect.io/v3/concepts/artifacts.
33 """
34 artifact = core.Artifact(**artifact.model_dump())
36 right_now = now("UTC")
38 async with db.session_context(begin_transaction=True) as session:
39 model = await models.artifacts.create_artifact(
40 session=session,
41 artifact=artifact,
42 )
44 if model.created >= right_now:
45 response.status_code = status.HTTP_201_CREATED
46 return model
49@router.get("/{id:uuid}") 1a
50async def read_artifact( 1a
51 artifact_id: UUID = Path(
52 ..., description="The ID of the artifact to retrieve.", alias="id"
53 ),
54 db: PrefectDBInterface = Depends(provide_database_interface),
55) -> core.Artifact:
56 """
57 Retrieve an artifact from the database.
58 """
59 async with db.session_context() as session:
60 artifact = await models.artifacts.read_artifact(
61 session=session, artifact_id=artifact_id
62 )
64 if artifact is None:
65 raise HTTPException(status_code=404, detail="Artifact not found.")
66 return artifact
69@router.get("/{key}/latest") 1a
70async def read_latest_artifact( 1a
71 key: str = Path(
72 ...,
73 description="The key of the artifact to retrieve.",
74 ),
75 db: PrefectDBInterface = Depends(provide_database_interface),
76) -> core.Artifact:
77 """
78 Retrieve the latest artifact from the artifact table.
79 """
80 async with db.session_context() as session:
81 artifact = await models.artifacts.read_latest_artifact(session=session, key=key)
83 if artifact is None:
84 raise HTTPException(status_code=404, detail="Artifact not found.")
85 return artifact
88@router.post("/filter") 1a
89async def read_artifacts( 1a
90 sort: sorting.ArtifactSort = Body(sorting.ArtifactSort.ID_DESC),
91 limit: int = dependencies.LimitBody(),
92 offset: int = Body(0, ge=0),
93 artifacts: filters.ArtifactFilter = None,
94 flow_runs: filters.FlowRunFilter = None,
95 task_runs: filters.TaskRunFilter = None,
96 flows: filters.FlowFilter = None,
97 deployments: filters.DeploymentFilter = None,
98 db: PrefectDBInterface = Depends(provide_database_interface),
99) -> List[core.Artifact]:
100 """
101 Retrieve artifacts from the database.
102 """
103 async with db.session_context() as session:
104 return await models.artifacts.read_artifacts(
105 session=session,
106 artifact_filter=artifacts,
107 flow_run_filter=flow_runs,
108 task_run_filter=task_runs,
109 flow_filter=flows,
110 deployment_filter=deployments,
111 offset=offset,
112 limit=limit,
113 sort=sort,
114 )
117@router.post("/latest/filter") 1a
118async def read_latest_artifacts( 1a
119 sort: sorting.ArtifactCollectionSort = Body(sorting.ArtifactCollectionSort.ID_DESC),
120 limit: int = dependencies.LimitBody(),
121 offset: int = Body(0, ge=0),
122 artifacts: filters.ArtifactCollectionFilter = None,
123 flow_runs: filters.FlowRunFilter = None,
124 task_runs: filters.TaskRunFilter = None,
125 flows: filters.FlowFilter = None,
126 deployments: filters.DeploymentFilter = None,
127 db: PrefectDBInterface = Depends(provide_database_interface),
128) -> List[core.ArtifactCollection]:
129 """
130 Retrieve artifacts from the database.
131 """
132 async with db.session_context() as session:
133 return await models.artifacts.read_latest_artifacts(
134 session=session,
135 artifact_filter=artifacts,
136 flow_run_filter=flow_runs,
137 task_run_filter=task_runs,
138 flow_filter=flows,
139 deployment_filter=deployments,
140 offset=offset,
141 limit=limit,
142 sort=sort,
143 )
146@router.post("/count") 1a
147async def count_artifacts( 1a
148 artifacts: filters.ArtifactFilter = None,
149 flow_runs: filters.FlowRunFilter = None,
150 task_runs: filters.TaskRunFilter = None,
151 flows: filters.FlowFilter = None,
152 deployments: filters.DeploymentFilter = None,
153 db: PrefectDBInterface = Depends(provide_database_interface),
154) -> int:
155 """
156 Count artifacts from the database.
157 """
158 async with db.session_context() as session:
159 return await models.artifacts.count_artifacts(
160 session=session,
161 artifact_filter=artifacts,
162 flow_run_filter=flow_runs,
163 task_run_filter=task_runs,
164 flow_filter=flows,
165 deployment_filter=deployments,
166 )
169@router.post("/latest/count") 1a
170async def count_latest_artifacts( 1a
171 artifacts: filters.ArtifactCollectionFilter = None,
172 flow_runs: filters.FlowRunFilter = None,
173 task_runs: filters.TaskRunFilter = None,
174 flows: filters.FlowFilter = None,
175 deployments: filters.DeploymentFilter = None,
176 db: PrefectDBInterface = Depends(provide_database_interface),
177) -> int:
178 """
179 Count artifacts from the database.
180 """
181 async with db.session_context() as session:
182 return await models.artifacts.count_latest_artifacts(
183 session=session,
184 artifact_filter=artifacts,
185 flow_run_filter=flow_runs,
186 task_run_filter=task_runs,
187 flow_filter=flows,
188 deployment_filter=deployments,
189 )
192@router.patch("/{id:uuid}", status_code=204) 1a
193async def update_artifact( 1a
194 artifact: actions.ArtifactUpdate,
195 artifact_id: UUID = Path(
196 ..., description="The ID of the artifact to update.", alias="id"
197 ),
198 db: PrefectDBInterface = Depends(provide_database_interface),
199) -> None:
200 """
201 Update an artifact in the database.
202 """
203 async with db.session_context(begin_transaction=True) as session:
204 result = await models.artifacts.update_artifact(
205 session=session,
206 artifact_id=artifact_id,
207 artifact=artifact,
208 )
209 if not result:
210 raise HTTPException(status_code=404, detail="Artifact not found.")
213@router.delete("/{id:uuid}", status_code=204) 1a
214async def delete_artifact( 1a
215 artifact_id: UUID = Path(
216 ..., description="The ID of the artifact to delete.", alias="id"
217 ),
218 db: PrefectDBInterface = Depends(provide_database_interface),
219) -> None:
220 """
221 Delete an artifact from the database.
222 """
223 async with db.session_context(begin_transaction=True) as session:
224 result = await models.artifacts.delete_artifact(
225 session=session,
226 artifact_id=artifact_id,
227 )
228 if not result:
229 raise HTTPException(status_code=404, detail="Artifact not found.")