Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_artifacts/client.py: 32%
94 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from typing import TYPE_CHECKING, Annotated, Optional 1a
3from httpx import HTTPStatusError 1a
4from pydantic import Field 1a
5from typing_extensions import TypedDict, Unpack 1a
7from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a
8from prefect.exceptions import ObjectNotFound 1a
10if TYPE_CHECKING: 10 ↛ 11line 10 didn't jump to line 11 because the condition on line 10 was never true1a
11 from uuid import UUID
13 from prefect.client.schemas.actions import ArtifactCreate, ArtifactUpdate
14 from prefect.client.schemas.filters import (
15 ArtifactCollectionFilter,
16 ArtifactFilter,
17 FlowRunFilter,
18 TaskRunFilter,
19 )
20 from prefect.client.schemas.objects import Artifact, ArtifactCollection
21 from prefect.client.schemas.sorting import ArtifactCollectionSort, ArtifactSort
24class BaseArtifactReadParams(TypedDict, total=False): 1a
25 flow_run_filter: Annotated[Optional["FlowRunFilter"], Field(default=None)] 1a
26 task_run_filter: Annotated[Optional["TaskRunFilter"], Field(default=None)] 1a
27 limit: Annotated[Optional[int], Field(default=None)] 1a
28 offset: Annotated[int, Field(default=0)] 1a
31class ArtifactReadParams(BaseArtifactReadParams, total=False): 1a
32 artifact_filter: Annotated[Optional["ArtifactFilter"], Field(default=None)] 1a
33 sort: Annotated[Optional["ArtifactSort"], Field(default=None)] 1a
36class ArtifactCollectionReadParams(BaseArtifactReadParams, total=False): 1a
37 artifact_filter: Annotated[ 1a
38 Optional["ArtifactCollectionFilter"], Field(default=None)
39 ]
40 sort: Annotated[Optional["ArtifactCollectionSort"], Field(default=None)] 1a
43class ArtifactClient(BaseClient): 1a
44 def create_artifact(self, artifact: "ArtifactCreate") -> "Artifact": 1a
45 response = self.request(
46 "POST",
47 "/artifacts/",
48 json=artifact.model_dump(mode="json", exclude_unset=True),
49 )
50 from prefect.client.schemas.objects import Artifact
51 from prefect.events.utilities import emit_event
53 created = Artifact.model_validate(response.json())
55 # Emit an event for artifact creation
56 resource = {
57 "prefect.resource.id": f"prefect.artifact.{created.id}",
58 }
59 if created.key:
60 resource["prefect.resource.name"] = created.key
62 payload = {
63 k: v
64 for k, v in {
65 "key": created.key,
66 "type": created.type,
67 "description": created.description,
68 }.items()
69 }
71 emit_event(
72 event="prefect.artifact.created",
73 resource=resource,
74 payload=payload,
75 )
77 return created
79 def update_artifact(self, artifact_id: "UUID", artifact: "ArtifactUpdate") -> None: 1a
80 from prefect.events.utilities import emit_event
82 self.request(
83 "PATCH",
84 "/artifacts/{id}",
85 json=artifact.model_dump(mode="json", exclude_unset=True),
86 path_params={"id": artifact_id},
87 )
88 # Emit an event for artifact update
89 resource = {
90 "prefect.resource.id": f"prefect.artifact.{artifact_id}",
91 }
92 payload = artifact.model_dump(mode="json", exclude_unset=True)
93 emit_event(
94 event="prefect.artifact.updated",
95 resource=resource,
96 payload=payload,
97 )
98 return None
100 def delete_artifact(self, artifact_id: "UUID") -> None: 1a
101 try:
102 self.request(
103 "DELETE",
104 "/artifacts/{id}",
105 path_params={"id": artifact_id},
106 )
107 except HTTPStatusError as e:
108 if e.response.status_code == 404:
109 raise ObjectNotFound(http_exc=e) from e
110 else:
111 raise
112 return None
114 def read_artifacts( 1a
115 self, **kwargs: Unpack["ArtifactReadParams"]
116 ) -> list["Artifact"]:
117 response = self.request(
118 "POST",
119 "/artifacts/filter",
120 json={
121 "artifacts": (
122 artifact_filter.model_dump(mode="json", exclude_unset=True)
123 if (artifact_filter := kwargs.get("artifact_filter"))
124 else None
125 ),
126 "flow_runs": (
127 flow_run_filter.model_dump(mode="json", exclude_unset=True)
128 if (flow_run_filter := kwargs.get("flow_run_filter"))
129 else None
130 ),
131 "task_runs": (
132 task_run_filter.model_dump(mode="json", exclude_unset=True)
133 if (task_run_filter := kwargs.get("task_run_filter"))
134 else None
135 ),
136 "limit": kwargs.get("limit"),
137 "offset": kwargs.get("offset"),
138 "sort": kwargs.get("sort"),
139 },
140 )
141 from prefect.client.schemas.objects import Artifact
143 return Artifact.model_validate_list(response.json())
146class ArtifactAsyncClient(BaseAsyncClient): 1a
147 async def create_artifact(self, artifact: "ArtifactCreate") -> "Artifact": 1a
148 response = await self.request(
149 "POST",
150 "/artifacts/",
151 json=artifact.model_dump(mode="json", exclude_unset=True),
152 )
153 from prefect.client.schemas.objects import Artifact
154 from prefect.events.utilities import emit_event
156 created = Artifact.model_validate(response.json())
158 # Emit an event for artifact creation
159 resource = {
160 "prefect.resource.id": f"prefect.artifact.{created.id}",
161 }
162 if created.key:
163 resource["prefect.resource.name"] = created.key
165 payload = {
166 k: v
167 for k, v in {
168 "key": created.key,
169 "type": created.type,
170 "description": created.description,
171 }.items()
172 if v is not None
173 }
175 emit_event(
176 event="prefect.artifact.created",
177 resource=resource,
178 payload=payload,
179 )
181 return created
183 async def update_artifact( 1a
184 self, artifact_id: "UUID", artifact: "ArtifactUpdate"
185 ) -> None:
186 from prefect.events.utilities import emit_event
188 await self.request(
189 "PATCH",
190 "/artifacts/{id}",
191 path_params={"id": artifact_id},
192 json=artifact.model_dump(mode="json", exclude_unset=True),
193 )
194 # Emit an event for artifact update
195 resource = {
196 "prefect.resource.id": f"prefect.artifact.{artifact_id}",
197 }
198 payload = artifact.model_dump(mode="json", exclude_unset=True)
199 emit_event(
200 event="prefect.artifact.updated",
201 resource=resource,
202 payload=payload or None,
203 )
204 return None
206 async def read_artifacts( 1a
207 self, **kwargs: Unpack["ArtifactReadParams"]
208 ) -> list["Artifact"]:
209 response = await self.request(
210 "POST",
211 "/artifacts/filter",
212 json={
213 "artifacts": (
214 artifact_filter.model_dump(mode="json", exclude_unset=True)
215 if (artifact_filter := kwargs.get("artifact_filter"))
216 else None
217 ),
218 "flow_runs": (
219 flow_run_filter.model_dump(mode="json", exclude_unset=True)
220 if (flow_run_filter := kwargs.get("flow_run_filter"))
221 else None
222 ),
223 "task_runs": (
224 task_run_filter.model_dump(mode="json", exclude_unset=True)
225 if (task_run_filter := kwargs.get("task_run_filter"))
226 else None
227 ),
228 "limit": kwargs.get("limit", None),
229 "offset": kwargs.get("offset", 0),
230 "sort": kwargs.get("sort", None),
231 },
232 )
233 from prefect.client.schemas.objects import Artifact
235 return Artifact.model_validate_list(response.json())
237 async def delete_artifact(self, artifact_id: "UUID") -> None: 1a
238 try:
239 await self.request(
240 "DELETE",
241 "/artifacts/{id}",
242 path_params={"id": artifact_id},
243 )
244 except HTTPStatusError as e:
245 if e.response.status_code == 404:
246 raise ObjectNotFound(http_exc=e) from e
247 else:
248 raise
251class ArtifactCollectionClient(BaseClient): 1a
252 def read_latest_artifacts( 1a
253 self, **kwargs: Unpack["ArtifactCollectionReadParams"]
254 ) -> list["ArtifactCollection"]:
255 response = self.request(
256 "POST",
257 "/artifacts/latest/filter",
258 json={
259 "artifacts": (
260 artifact_filter.model_dump(mode="json", exclude_unset=True)
261 if (artifact_filter := kwargs.get("artifact_filter"))
262 else None
263 ),
264 "flow_runs": (
265 flow_run_filter.model_dump(mode="json", exclude_unset=True)
266 if (flow_run_filter := kwargs.get("flow_run_filter"))
267 else None
268 ),
269 "task_runs": (
270 task_run_filter.model_dump(mode="json", exclude_unset=True)
271 if (task_run_filter := kwargs.get("task_run_filter"))
272 else None
273 ),
274 "limit": kwargs.get("limit", None),
275 "offset": kwargs.get("offset", 0),
276 "sort": kwargs.get("sort", None),
277 },
278 )
279 from prefect.client.schemas.objects import ArtifactCollection
281 return ArtifactCollection.model_validate_list(response.json())
284class ArtifactCollectionAsyncClient(BaseAsyncClient): 1a
285 async def read_latest_artifacts( 1a
286 self, **kwargs: Unpack["ArtifactCollectionReadParams"]
287 ) -> list["ArtifactCollection"]:
288 response = await self.request(
289 "POST",
290 "/artifacts/latest/filter",
291 json={
292 "artifacts": (
293 artifact_filter.model_dump(mode="json", exclude_unset=True)
294 if (artifact_filter := kwargs.get("artifact_filter"))
295 else None
296 ),
297 "flow_runs": (
298 flow_run_filter.model_dump(mode="json", exclude_unset=True)
299 if (flow_run_filter := kwargs.get("flow_run_filter"))
300 else None
301 ),
302 "task_runs": (
303 task_run_filter.model_dump(mode="json", exclude_unset=True)
304 if (task_run_filter := kwargs.get("task_run_filter"))
305 else None
306 ),
307 "limit": kwargs.get("limit", None),
308 "offset": kwargs.get("offset", 0),
309 "sort": kwargs.get("sort", None),
310 },
311 )
312 from prefect.client.schemas.objects import ArtifactCollection
314 return ArtifactCollection.model_validate_list(response.json())