Coverage for /usr/local/lib/python3.12/site-packages/prefect/artifacts.py: 0%
196 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Interface for creating and reading artifacts.
3"""
5from __future__ import annotations
7import json
8import math
9import warnings
10from contextlib import nullcontext
11from typing import TYPE_CHECKING, Any, Optional, Union, cast
12from uuid import UUID
14from typing_extensions import Self
16from prefect._internal.compatibility.async_dispatch import async_dispatch
17from prefect.client.orchestration import PrefectClient, get_client
18from prefect.client.schemas.actions import ArtifactCreate as ArtifactRequest
19from prefect.client.schemas.actions import ArtifactUpdate
20from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey
21from prefect.client.schemas.objects import Artifact as ArtifactResponse
22from prefect.client.schemas.sorting import ArtifactSort
23from prefect.context import MissingContextError, get_run_context
24from prefect.logging.loggers import get_logger
25from prefect.utilities.asyncutils import asyncnullcontext
26from prefect.utilities.context import get_task_and_flow_run_ids
28if TYPE_CHECKING:
29 import logging
31logger: "logging.Logger" = get_logger("artifacts")
34class Artifact(ArtifactRequest):
35 """
36 An artifact is a piece of data that is created by a flow or task run.
37 https://docs.prefect.io/latest/develop/artifacts
39 Arguments:
40 type: A string identifying the type of artifact.
41 key: A user-provided string identifier.
42 The key must only contain lowercase letters, numbers, and dashes.
43 description: A user-specified description of the artifact.
44 data: A JSON payload that allows for a result to be retrieved.
45 """
47 async def acreate(
48 self,
49 client: "PrefectClient | None" = None,
50 ) -> "ArtifactResponse":
51 """
52 An async method to create an artifact.
54 Arguments:
55 client: The PrefectClient
57 Returns:
58 The created artifact.
59 """
61 local_client_context = asyncnullcontext(client) if client else get_client()
62 async with local_client_context as client:
63 task_run_id, flow_run_id = get_task_and_flow_run_ids()
65 try:
66 get_run_context()
67 except MissingContextError:
68 warnings.warn(
69 "Artifact creation outside of a flow or task run is deprecated and will be removed in a later version.",
70 FutureWarning,
71 )
73 return await client.create_artifact(
74 artifact=ArtifactRequest(
75 type=self.type,
76 key=self.key,
77 description=self.description,
78 task_run_id=self.task_run_id or task_run_id,
79 flow_run_id=self.flow_run_id or flow_run_id,
80 data=await self.aformat(),
81 )
82 )
84 @async_dispatch(acreate)
85 def create(
86 self: Self,
87 client: "PrefectClient | None" = None,
88 ) -> "ArtifactResponse":
89 """
90 A method to create an artifact.
92 Arguments:
93 client: The PrefectClient
95 Returns:
96 The created artifact.
97 """
99 # Create sync client since this is a sync method.
100 sync_client = get_client(sync_client=True)
101 task_run_id, flow_run_id = get_task_and_flow_run_ids()
103 try:
104 get_run_context()
105 except MissingContextError:
106 warnings.warn(
107 "Artifact creation outside of a flow or task run is deprecated and will be removed in a later version.",
108 FutureWarning,
109 )
111 return sync_client.create_artifact(
112 artifact=ArtifactRequest(
113 type=self.type,
114 key=self.key,
115 description=self.description,
116 task_run_id=self.task_run_id or task_run_id,
117 flow_run_id=self.flow_run_id or flow_run_id,
118 data=cast(str, self.format(_sync=True)), # pyright: ignore[reportCallIssue] _sync is valid because .format is wrapped in async_dispatch
119 )
120 )
122 @classmethod
123 async def aget(
124 cls,
125 key: str | None = None,
126 client: "PrefectClient | None" = None,
127 ) -> "ArtifactResponse | None":
128 """
129 A async method to get an artifact.
131 Arguments:
132 key: The key of the artifact to get.
133 client: A client to use when calling the Prefect API.
135 Returns:
136 The artifact (if found).
137 """
139 local_client_context = asyncnullcontext(client) if client else get_client()
140 async with local_client_context as client:
141 filter_key_value = None if key is None else [key]
142 artifacts = await client.read_artifacts(
143 limit=1,
144 sort=ArtifactSort.UPDATED_DESC,
145 artifact_filter=ArtifactFilter(
146 key=ArtifactFilterKey(any_=filter_key_value)
147 ),
148 )
149 return None if not artifacts else artifacts[0]
151 @classmethod
152 @async_dispatch(aget)
153 def get(
154 cls, key: str | None = None, client: "PrefectClient | None" = None
155 ) -> "ArtifactResponse | None":
156 """
157 A method to get an artifact.
159 Arguments:
160 key: The key of the artifact to get.
161 client: A client to use when calling the Prefect API.
163 Returns:
164 The artifact (if found).
165 """
167 # Create sync client since this is a sync method.
168 sync_client = get_client(sync_client=True)
170 filter_key_value = None if key is None else [key]
171 artifacts = sync_client.read_artifacts(
172 limit=1,
173 sort=ArtifactSort.UPDATED_DESC,
174 artifact_filter=ArtifactFilter(
175 key=ArtifactFilterKey(any_=filter_key_value)
176 ),
177 )
178 return None if not artifacts else artifacts[0]
180 @classmethod
181 async def aget_or_create(
182 cls,
183 key: str | None = None,
184 description: str | None = None,
185 data: dict[str, Any] | Any | None = None,
186 client: "PrefectClient | None" = None,
187 **kwargs: Any,
188 ) -> tuple["ArtifactResponse", bool]:
189 """
190 A async method to get or create an artifact.
192 Arguments:
193 key: The key of the artifact to get or create.
194 description: The description of the artifact to create.
195 data: The data of the artifact to create.
196 client: The PrefectClient
197 **kwargs: Additional keyword arguments to use when creating the artifact.
199 Returns:
200 The artifact, either retrieved or created.
201 """
202 artifact = await cls.aget(key, client)
203 if artifact:
204 return artifact, False
206 new_artifact = cls(key=key, description=description, data=data, **kwargs)
207 created_artifact = await new_artifact.acreate(client)
208 return created_artifact, True
210 @classmethod
211 @async_dispatch(aget_or_create)
212 def get_or_create(
213 cls,
214 key: str | None = None,
215 description: str | None = None,
216 data: dict[str, Any] | Any | None = None,
217 client: "PrefectClient | None" = None,
218 **kwargs: Any,
219 ) -> tuple["ArtifactResponse", bool]:
220 """
221 A method to get or create an artifact.
223 Arguments:
224 key: The key of the artifact to get or create.
225 description: The description of the artifact to create.
226 data: The data of the artifact to create.
227 client: The PrefectClient
228 **kwargs: Additional keyword arguments to use when creating the artifact.
230 Returns:
231 The artifact, either retrieved or created.
232 """
233 artifact = cast(ArtifactResponse, cls.get(key, _sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .get is wrapped in async_dispatch
234 if artifact:
235 return artifact, False
237 new_artifact = cls(key=key, description=description, data=data, **kwargs)
238 created_artifact = cast(
239 ArtifactResponse,
240 new_artifact.create(_sync=True), # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch
241 )
242 return created_artifact, True
244 # TODO: Remove this when we remove async_dispatch because it doesn't need to be async
245 async def aformat(self) -> str | float | int | dict[str, Any]:
246 return json.dumps(self.data)
248 @async_dispatch(aformat)
249 def format(self) -> str | float | int | dict[str, Any]:
250 return json.dumps(self.data)
253class LinkArtifact(Artifact):
254 link: str
255 link_text: Optional[str] = None
256 type: Optional[str] = "markdown"
258 def _format(self) -> str:
259 return (
260 f"[{self.link_text}]({self.link})"
261 if self.link_text
262 else f"[{self.link}]({self.link})"
263 )
265 async def aformat(self) -> str:
266 return self._format()
268 @async_dispatch(aformat)
269 def format(self) -> str:
270 return self._format()
273class MarkdownArtifact(Artifact):
274 markdown: str
275 type: Optional[str] = "markdown"
277 async def aformat(self) -> str:
278 return self.markdown
280 @async_dispatch(aformat)
281 def format(self) -> str:
282 return self.markdown
285class TableArtifact(Artifact):
286 table: Union[dict[str, list[Any]], list[dict[str, Any]], list[list[Any]]]
287 type: Optional[str] = "table"
289 @classmethod
290 def _sanitize(
291 cls, item: dict[str, Any] | list[Any] | float
292 ) -> dict[str, Any] | list[Any] | int | float | None:
293 """
294 Sanitize NaN values in a given item.
295 The item can be a dict, list or float.
296 """
297 if isinstance(item, list):
298 return [cls._sanitize(sub_item) for sub_item in item]
299 elif isinstance(item, dict):
300 return {k: cls._sanitize(v) for k, v in item.items()}
301 elif isinstance(item, float) and math.isnan(item):
302 return None
303 else:
304 return item
306 async def aformat(self) -> str:
307 return json.dumps(self._sanitize(self.table))
309 @async_dispatch(aformat)
310 def format(self) -> str:
311 return json.dumps(self._sanitize(self.table))
314class ProgressArtifact(Artifact):
315 progress: float
316 type: Optional[str] = "progress"
318 def _format(self) -> float:
319 # Ensure progress is between 0 and 100
320 min_progress = 0.0
321 max_progress = 100.0
322 if self.progress < min_progress or self.progress > max_progress:
323 logger.warning(
324 f"ProgressArtifact received an invalid value, Progress: {self.progress}%"
325 )
326 self.progress = max(min_progress, min(self.progress, max_progress))
327 logger.warning(f"Interpreting as {self.progress}% progress")
329 return self.progress
331 async def aformat(self) -> float:
332 return self._format()
334 @async_dispatch(aformat)
335 def format(self) -> float:
336 return self._format()
339class ImageArtifact(Artifact):
340 """
341 An artifact that will display an image from a publicly accessible URL in the UI.
343 Arguments:
344 image_url: The URL of the image to display.
345 """
347 image_url: str
348 type: Optional[str] = "image"
350 async def aformat(self) -> str:
351 return self.image_url
353 @async_dispatch(aformat)
354 def format(self) -> str:
355 """
356 This method is used to format the artifact data so it can be properly sent
357 to the API when the .create() method is called.
359 Returns:
360 str: The image URL.
361 """
362 return self.image_url
365async def acreate_link_artifact(
366 link: str,
367 link_text: str | None = None,
368 key: str | None = None,
369 description: str | None = None,
370 client: "PrefectClient | None" = None,
371) -> UUID:
372 """
373 Create a link artifact.
375 Arguments:
376 link: The link to create.
377 link_text: The link text.
378 key: A user-provided string identifier.
379 Required for the artifact to show in the Artifacts page in the UI.
380 The key must only contain lowercase letters, numbers, and dashes.
381 description: A user-specified description of the artifact.
384 Returns:
385 The table artifact ID.
386 """
387 new_artifact = LinkArtifact(
388 key=key,
389 description=description,
390 link=link,
391 link_text=link_text,
392 )
393 artifact = await new_artifact.acreate(client)
395 return artifact.id
398@async_dispatch(acreate_link_artifact)
399def create_link_artifact(
400 link: str,
401 link_text: str | None = None,
402 key: str | None = None,
403 description: str | None = None,
404 client: "PrefectClient | None" = None,
405) -> UUID:
406 """
407 Create a link artifact.
409 Arguments:
410 link: The link to create.
411 link_text: The link text.
412 key: A user-provided string identifier.
413 Required for the artifact to show in the Artifacts page in the UI.
414 The key must only contain lowercase letters, numbers, and dashes.
415 description: A user-specified description of the artifact.
418 Returns:
419 The table artifact ID.
421 Example:
422 ```python
423 from prefect import flow
424 from prefect.artifacts import create_link_artifact
426 @flow
427 def my_flow():
428 create_link_artifact(
429 link="https://www.prefect.io",
430 link_text="Prefect",
431 key="prefect-link",
432 description="This is a link to the Prefect website",
433 )
435 my_flow()
436 ```
437 """
438 new_artifact = LinkArtifact(
439 key=key,
440 description=description,
441 link=link,
442 link_text=link_text,
443 )
444 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch
446 return artifact.id
449async def acreate_markdown_artifact(
450 markdown: str,
451 key: str | None = None,
452 description: str | None = None,
453) -> UUID:
454 """
455 Create a markdown artifact.
457 Arguments:
458 markdown: The markdown to create.
459 key: A user-provided string identifier.
460 Required for the artifact to show in the Artifacts page in the UI.
461 The key must only contain lowercase letters, numbers, and dashes.
462 description: A user-specified description of the artifact.
464 Returns:
465 The table artifact ID.
466 """
467 new_artifact = MarkdownArtifact(
468 key=key,
469 description=description,
470 markdown=markdown,
471 )
472 artifact = await new_artifact.acreate()
474 return artifact.id
477@async_dispatch(acreate_markdown_artifact)
478def create_markdown_artifact(
479 markdown: str,
480 key: str | None = None,
481 description: str | None = None,
482) -> UUID:
483 """
484 Create a markdown artifact.
486 Arguments:
487 markdown: The markdown to create.
488 key: A user-provided string identifier.
489 Required for the artifact to show in the Artifacts page in the UI.
490 The key must only contain lowercase letters, numbers, and dashes.
491 description: A user-specified description of the artifact.
493 Returns:
494 The table artifact ID.
496 Example:
497 ```python
498 from prefect import flow
499 from prefect.artifacts import create_markdown_artifact
501 @flow
502 def my_flow():
503 create_markdown_artifact(
504 markdown="## Prefect",
505 key="prefect-markdown",
506 description="This is a markdown artifact",
507 )
509 my_flow()
510 ```
511 """
512 new_artifact = MarkdownArtifact(
513 key=key,
514 description=description,
515 markdown=markdown,
516 )
517 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch
519 return artifact.id
522async def acreate_table_artifact(
523 table: dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]],
524 key: str | None = None,
525 description: str | None = None,
526) -> UUID:
527 """
528 Create a table artifact asynchronously.
530 Arguments:
531 table: The table to create.
532 key: A user-provided string identifier.
533 Required for the artifact to show in the Artifacts page in the UI.
534 The key must only contain lowercase letters, numbers, and dashes.
535 description: A user-specified description of the artifact.
537 Returns:
538 The table artifact ID.
539 """
541 new_artifact = TableArtifact(
542 key=key,
543 description=description,
544 table=table,
545 )
546 artifact = await new_artifact.acreate()
548 return artifact.id
551@async_dispatch(acreate_table_artifact)
552def create_table_artifact(
553 table: dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]],
554 key: str | None = None,
555 description: str | None = None,
556) -> UUID:
557 """
558 Create a table artifact.
560 Arguments:
561 table: The table to create.
562 key: A user-provided string identifier.
563 Required for the artifact to show in the Artifacts page in the UI.
564 The key must only contain lowercase letters, numbers, and dashes.
565 description: A user-specified description of the artifact.
567 Returns:
568 The table artifact ID.
570 Example:
571 ```python
572 from prefect import flow
573 from prefect.artifacts import create_table_artifact
575 @flow
576 def my_flow():
577 create_table_artifact(
578 table=[{"name": "John", "age": 30}, {"name": "Jane", "age": 25}],
579 key="prefect-table",
580 description="This is a table artifact",
581 )
583 my_flow()
584 ```
585 """
587 new_artifact = TableArtifact(
588 key=key,
589 description=description,
590 table=table,
591 )
592 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch
594 return artifact.id
597async def acreate_progress_artifact(
598 progress: float,
599 key: str | None = None,
600 description: str | None = None,
601) -> UUID:
602 """
603 Create a progress artifact asynchronously.
605 Arguments:
606 progress: The percentage of progress represented by a float between 0 and 100.
607 key: A user-provided string identifier.
608 Required for the artifact to show in the Artifacts page in the UI.
609 The key must only contain lowercase letters, numbers, and dashes.
610 description: A user-specified description of the artifact.
612 Returns:
613 The progress artifact ID.
614 """
616 new_artifact = ProgressArtifact(
617 key=key,
618 description=description,
619 progress=progress,
620 )
621 artifact = await new_artifact.acreate()
623 return artifact.id
626@async_dispatch(acreate_progress_artifact)
627def create_progress_artifact(
628 progress: float,
629 key: str | None = None,
630 description: str | None = None,
631) -> UUID:
632 """
633 Create a progress artifact.
635 Arguments:
636 progress: The percentage of progress represented by a float between 0 and 100.
637 key: A user-provided string identifier.
638 Required for the artifact to show in the Artifacts page in the UI.
639 The key must only contain lowercase letters, numbers, and dashes.
640 description: A user-specified description of the artifact.
642 Returns:
643 The progress artifact ID.
644 """
646 new_artifact = ProgressArtifact(
647 key=key,
648 description=description,
649 progress=progress,
650 )
651 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch
653 return artifact.id
656async def aupdate_progress_artifact(
657 artifact_id: UUID,
658 progress: float,
659 description: str | None = None,
660 client: "PrefectClient | None" = None,
661) -> UUID:
662 """
663 Update a progress artifact asynchronously.
665 Arguments:
666 artifact_id: The ID of the artifact to update.
667 progress: The percentage of progress represented by a float between 0 and 100.
668 description: A user-specified description of the artifact.
670 Returns:
671 The progress artifact ID.
672 """
674 local_client_context = nullcontext(client) if client else get_client()
675 async with local_client_context as client:
676 artifact = ProgressArtifact(
677 description=description,
678 progress=progress,
679 )
680 update = (
681 ArtifactUpdate(
682 description=artifact.description,
683 data=await artifact.aformat(),
684 )
685 if description
686 else ArtifactUpdate(data=await artifact.aformat())
687 )
689 await client.update_artifact(
690 artifact_id=artifact_id,
691 artifact=update,
692 )
694 return artifact_id
697@async_dispatch(aupdate_progress_artifact)
698def update_progress_artifact(
699 artifact_id: UUID,
700 progress: float,
701 description: str | None = None,
702 client: "PrefectClient | None" = None,
703) -> UUID:
704 """
705 Update a progress artifact.
707 Arguments:
708 artifact_id: The ID of the artifact to update.
709 progress: The percentage of progress represented by a float between 0 and 100.
710 description: A user-specified description of the artifact.
712 Returns:
713 The progress artifact ID.
714 """
716 sync_client = get_client(sync_client=True)
718 artifact = ProgressArtifact(
719 description=description,
720 progress=progress,
721 )
722 update = (
723 ArtifactUpdate(
724 description=artifact.description,
725 data=cast(float, artifact.format(_sync=True)), # pyright: ignore[reportCallIssue] _sync is valid because .format is wrapped in async_dispatch
726 )
727 if description
728 else ArtifactUpdate(data=cast(float, artifact.format(_sync=True))) # pyright: ignore[reportCallIssue] _sync is valid because .format is wrapped in async_dispatch
729 )
731 sync_client.update_artifact(
732 artifact_id=artifact_id,
733 artifact=update,
734 )
736 return artifact_id
739async def acreate_image_artifact(
740 image_url: str,
741 key: str | None = None,
742 description: str | None = None,
743) -> UUID:
744 """
745 Create an image artifact asynchronously.
747 Arguments:
748 image_url: The URL of the image to display.
749 key: A user-provided string identifier.
750 Required for the artifact to show in the Artifacts page in the UI.
751 The key must only contain lowercase letters, numbers, and dashes.
752 description: A user-specified description of the artifact.
754 Returns:
755 The image artifact ID.
756 """
758 new_artifact = ImageArtifact(
759 key=key,
760 description=description,
761 image_url=image_url,
762 )
763 artifact = await new_artifact.acreate()
765 return artifact.id
768@async_dispatch(acreate_image_artifact)
769def create_image_artifact(
770 image_url: str,
771 key: str | None = None,
772 description: str | None = None,
773) -> UUID:
774 """
775 Create an image artifact.
777 Arguments:
778 image_url: The URL of the image to display.
779 key: A user-provided string identifier.
780 Required for the artifact to show in the Artifacts page in the UI.
781 The key must only contain lowercase letters, numbers, and dashes.
782 description: A user-specified description of the artifact.
784 Returns:
785 The image artifact ID.
786 """
788 new_artifact = ImageArtifact(
789 key=key,
790 description=description,
791 image_url=image_url,
792 )
793 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch
795 return artifact.id