Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/artifacts.py: 96%
61 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Routes for interacting with artifact objects.
3"""
5from typing import List 1c
6from uuid import UUID 1c
8from fastapi import Body, Depends, HTTPException, Path, Response, status 1c
10import prefect.server.api.dependencies as dependencies 1c
11from prefect.server import models 1c
12from prefect.server.database import PrefectDBInterface, provide_database_interface 1c
13from prefect.server.schemas import actions, core, filters, sorting 1c
14from prefect.server.utilities.server import PrefectRouter 1c
15from prefect.types._datetime import now 1c
17router: PrefectRouter = PrefectRouter( 1c
18 prefix="/artifacts",
19 tags=["Artifacts"],
20)
23@router.post("/") 1c
24async def create_artifact( 1c
25 artifact: actions.ArtifactCreate,
26 response: Response,
27 db: PrefectDBInterface = Depends(provide_database_interface),
28) -> core.Artifact:
29 """
30 Create an artifact.
32 For more information, see https://docs.prefect.io/v3/concepts/artifacts.
33 """
34 artifact = core.Artifact(**artifact.model_dump()) 1adb
36 right_now = now("UTC") 1adb
38 async with db.session_context(begin_transaction=True) as session: 1adb
39 model = await models.artifacts.create_artifact( 1adb
40 session=session,
41 artifact=artifact,
42 )
44 if model.created >= right_now: 44 ↛ 46line 44 didn't jump to line 46 because the condition on line 44 was always true1adb
45 response.status_code = status.HTTP_201_CREATED 1adb
46 return model 1adb
49@router.get("/{id:uuid}") 1c
50async def read_artifact( 1c
51 artifact_id: UUID = Path(
52 ..., description="The ID of the artifact to retrieve.", alias="id"
53 ),
54 db: PrefectDBInterface = Depends(provide_database_interface),
55) -> core.Artifact:
56 """
57 Retrieve an artifact from the database.
58 """
59 async with db.session_context() as session: 1ab
60 artifact = await models.artifacts.read_artifact( 1ab
61 session=session, artifact_id=artifact_id
62 )
64 if artifact is None: 1ab
65 raise HTTPException(status_code=404, detail="Artifact not found.") 1ab
66 return artifact 1ab
69@router.get("/{key}/latest") 1c
70async def read_latest_artifact( 1c
71 key: str = Path(
72 ...,
73 description="The key of the artifact to retrieve.",
74 ),
75 db: PrefectDBInterface = Depends(provide_database_interface),
76) -> core.Artifact:
77 """
78 Retrieve the latest artifact from the artifact table.
79 """
80 async with db.session_context() as session: 1ab
81 artifact = await models.artifacts.read_latest_artifact(session=session, key=key) 1ab
83 if artifact is None: 83 ↛ 85line 83 didn't jump to line 85 because the condition on line 83 was always true1ab
84 raise HTTPException(status_code=404, detail="Artifact not found.") 1ab
85 return artifact
88@router.post("/filter") 1c
89async def read_artifacts( 1c
90 sort: sorting.ArtifactSort = Body(sorting.ArtifactSort.ID_DESC),
91 limit: int = dependencies.LimitBody(),
92 offset: int = Body(0, ge=0),
93 artifacts: filters.ArtifactFilter = None,
94 flow_runs: filters.FlowRunFilter = None,
95 task_runs: filters.TaskRunFilter = None,
96 flows: filters.FlowFilter = None,
97 deployments: filters.DeploymentFilter = None,
98 db: PrefectDBInterface = Depends(provide_database_interface),
99) -> List[core.Artifact]:
100 """
101 Retrieve artifacts from the database.
102 """
103 async with db.session_context() as session: 1adeb
104 return await models.artifacts.read_artifacts( 1adeb
105 session=session,
106 artifact_filter=artifacts,
107 flow_run_filter=flow_runs,
108 task_run_filter=task_runs,
109 flow_filter=flows,
110 deployment_filter=deployments,
111 offset=offset,
112 limit=limit,
113 sort=sort,
114 )
117@router.post("/latest/filter") 1c
118async def read_latest_artifacts( 1c
119 sort: sorting.ArtifactCollectionSort = Body(sorting.ArtifactCollectionSort.ID_DESC),
120 limit: int = dependencies.LimitBody(),
121 offset: int = Body(0, ge=0),
122 artifacts: filters.ArtifactCollectionFilter = None,
123 flow_runs: filters.FlowRunFilter = None,
124 task_runs: filters.TaskRunFilter = None,
125 flows: filters.FlowFilter = None,
126 deployments: filters.DeploymentFilter = None,
127 db: PrefectDBInterface = Depends(provide_database_interface),
128) -> List[core.ArtifactCollection]:
129 """
130 Retrieve artifacts from the database.
131 """
132 async with db.session_context() as session: 1ab
133 return await models.artifacts.read_latest_artifacts( 1ab
134 session=session,
135 artifact_filter=artifacts,
136 flow_run_filter=flow_runs,
137 task_run_filter=task_runs,
138 flow_filter=flows,
139 deployment_filter=deployments,
140 offset=offset,
141 limit=limit,
142 sort=sort,
143 )
146@router.post("/count") 1c
147async def count_artifacts( 1c
148 artifacts: filters.ArtifactFilter = None,
149 flow_runs: filters.FlowRunFilter = None,
150 task_runs: filters.TaskRunFilter = None,
151 flows: filters.FlowFilter = None,
152 deployments: filters.DeploymentFilter = None,
153 db: PrefectDBInterface = Depends(provide_database_interface),
154) -> int:
155 """
156 Count artifacts from the database.
157 """
158 async with db.session_context() as session: 1ab
159 return await models.artifacts.count_artifacts( 1ab
160 session=session,
161 artifact_filter=artifacts,
162 flow_run_filter=flow_runs,
163 task_run_filter=task_runs,
164 flow_filter=flows,
165 deployment_filter=deployments,
166 )
169@router.post("/latest/count") 1c
170async def count_latest_artifacts( 1c
171 artifacts: filters.ArtifactCollectionFilter = None,
172 flow_runs: filters.FlowRunFilter = None,
173 task_runs: filters.TaskRunFilter = None,
174 flows: filters.FlowFilter = None,
175 deployments: filters.DeploymentFilter = None,
176 db: PrefectDBInterface = Depends(provide_database_interface),
177) -> int:
178 """
179 Count artifacts from the database.
180 """
181 async with db.session_context() as session: 1ab
182 return await models.artifacts.count_latest_artifacts( 1ab
183 session=session,
184 artifact_filter=artifacts,
185 flow_run_filter=flow_runs,
186 task_run_filter=task_runs,
187 flow_filter=flows,
188 deployment_filter=deployments,
189 )
192@router.patch("/{id:uuid}", status_code=204) 1c
193async def update_artifact( 1c
194 artifact: actions.ArtifactUpdate,
195 artifact_id: UUID = Path(
196 ..., description="The ID of the artifact to update.", alias="id"
197 ),
198 db: PrefectDBInterface = Depends(provide_database_interface),
199) -> None:
200 """
201 Update an artifact in the database.
202 """
203 async with db.session_context(begin_transaction=True) as session: 1ab
204 result = await models.artifacts.update_artifact( 1ab
205 session=session,
206 artifact_id=artifact_id,
207 artifact=artifact,
208 )
209 if not result: 1ab
210 raise HTTPException(status_code=404, detail="Artifact not found.") 1ab
213@router.delete("/{id:uuid}", status_code=204) 1c
214async def delete_artifact( 1c
215 artifact_id: UUID = Path(
216 ..., description="The ID of the artifact to delete.", alias="id"
217 ),
218 db: PrefectDBInterface = Depends(provide_database_interface),
219) -> None:
220 """
221 Delete an artifact from the database.
222 """
223 async with db.session_context(begin_transaction=True) as session: 1adb
224 result = await models.artifacts.delete_artifact( 1adb
225 session=session,
226 artifact_id=artifact_id,
227 )
228 if not result: 1adb
229 raise HTTPException(status_code=404, detail="Artifact not found.") 1adb