Coverage for /usr/local/lib/python3.12/site-packages/prefect/artifacts.py: 0%

196 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Interface for creating and reading artifacts. 

3""" 

4 

5from __future__ import annotations 

6 

7import json 

8import math 

9import warnings 

10from contextlib import nullcontext 

11from typing import TYPE_CHECKING, Any, Optional, Union, cast 

12from uuid import UUID 

13 

14from typing_extensions import Self 

15 

16from prefect._internal.compatibility.async_dispatch import async_dispatch 

17from prefect.client.orchestration import PrefectClient, get_client 

18from prefect.client.schemas.actions import ArtifactCreate as ArtifactRequest 

19from prefect.client.schemas.actions import ArtifactUpdate 

20from prefect.client.schemas.filters import ArtifactFilter, ArtifactFilterKey 

21from prefect.client.schemas.objects import Artifact as ArtifactResponse 

22from prefect.client.schemas.sorting import ArtifactSort 

23from prefect.context import MissingContextError, get_run_context 

24from prefect.logging.loggers import get_logger 

25from prefect.utilities.asyncutils import asyncnullcontext 

26from prefect.utilities.context import get_task_and_flow_run_ids 

27 

28if TYPE_CHECKING: 

29 import logging 

30 

31logger: "logging.Logger" = get_logger("artifacts") 

32 

33 

34class Artifact(ArtifactRequest): 

35 """ 

36 An artifact is a piece of data that is created by a flow or task run. 

37 https://docs.prefect.io/latest/develop/artifacts 

38 

39 Arguments: 

40 type: A string identifying the type of artifact. 

41 key: A user-provided string identifier. 

42 The key must only contain lowercase letters, numbers, and dashes. 

43 description: A user-specified description of the artifact. 

44 data: A JSON payload that allows for a result to be retrieved. 

45 """ 

46 

47 async def acreate( 

48 self, 

49 client: "PrefectClient | None" = None, 

50 ) -> "ArtifactResponse": 

51 """ 

52 An async method to create an artifact. 

53 

54 Arguments: 

55 client: The PrefectClient 

56 

57 Returns: 

58 The created artifact. 

59 """ 

60 

61 local_client_context = asyncnullcontext(client) if client else get_client() 

62 async with local_client_context as client: 

63 task_run_id, flow_run_id = get_task_and_flow_run_ids() 

64 

65 try: 

66 get_run_context() 

67 except MissingContextError: 

68 warnings.warn( 

69 "Artifact creation outside of a flow or task run is deprecated and will be removed in a later version.", 

70 FutureWarning, 

71 ) 

72 

73 return await client.create_artifact( 

74 artifact=ArtifactRequest( 

75 type=self.type, 

76 key=self.key, 

77 description=self.description, 

78 task_run_id=self.task_run_id or task_run_id, 

79 flow_run_id=self.flow_run_id or flow_run_id, 

80 data=await self.aformat(), 

81 ) 

82 ) 

83 

84 @async_dispatch(acreate) 

85 def create( 

86 self: Self, 

87 client: "PrefectClient | None" = None, 

88 ) -> "ArtifactResponse": 

89 """ 

90 A method to create an artifact. 

91 

92 Arguments: 

93 client: The PrefectClient 

94 

95 Returns: 

96 The created artifact. 

97 """ 

98 

99 # Create sync client since this is a sync method. 

100 sync_client = get_client(sync_client=True) 

101 task_run_id, flow_run_id = get_task_and_flow_run_ids() 

102 

103 try: 

104 get_run_context() 

105 except MissingContextError: 

106 warnings.warn( 

107 "Artifact creation outside of a flow or task run is deprecated and will be removed in a later version.", 

108 FutureWarning, 

109 ) 

110 

111 return sync_client.create_artifact( 

112 artifact=ArtifactRequest( 

113 type=self.type, 

114 key=self.key, 

115 description=self.description, 

116 task_run_id=self.task_run_id or task_run_id, 

117 flow_run_id=self.flow_run_id or flow_run_id, 

118 data=cast(str, self.format(_sync=True)), # pyright: ignore[reportCallIssue] _sync is valid because .format is wrapped in async_dispatch 

119 ) 

120 ) 

121 

122 @classmethod 

123 async def aget( 

124 cls, 

125 key: str | None = None, 

126 client: "PrefectClient | None" = None, 

127 ) -> "ArtifactResponse | None": 

128 """ 

129 A async method to get an artifact. 

130 

131 Arguments: 

132 key: The key of the artifact to get. 

133 client: A client to use when calling the Prefect API. 

134 

135 Returns: 

136 The artifact (if found). 

137 """ 

138 

139 local_client_context = asyncnullcontext(client) if client else get_client() 

140 async with local_client_context as client: 

141 filter_key_value = None if key is None else [key] 

142 artifacts = await client.read_artifacts( 

143 limit=1, 

144 sort=ArtifactSort.UPDATED_DESC, 

145 artifact_filter=ArtifactFilter( 

146 key=ArtifactFilterKey(any_=filter_key_value) 

147 ), 

148 ) 

149 return None if not artifacts else artifacts[0] 

150 

151 @classmethod 

152 @async_dispatch(aget) 

153 def get( 

154 cls, key: str | None = None, client: "PrefectClient | None" = None 

155 ) -> "ArtifactResponse | None": 

156 """ 

157 A method to get an artifact. 

158 

159 Arguments: 

160 key: The key of the artifact to get. 

161 client: A client to use when calling the Prefect API. 

162 

163 Returns: 

164 The artifact (if found). 

165 """ 

166 

167 # Create sync client since this is a sync method. 

168 sync_client = get_client(sync_client=True) 

169 

170 filter_key_value = None if key is None else [key] 

171 artifacts = sync_client.read_artifacts( 

172 limit=1, 

173 sort=ArtifactSort.UPDATED_DESC, 

174 artifact_filter=ArtifactFilter( 

175 key=ArtifactFilterKey(any_=filter_key_value) 

176 ), 

177 ) 

178 return None if not artifacts else artifacts[0] 

179 

180 @classmethod 

181 async def aget_or_create( 

182 cls, 

183 key: str | None = None, 

184 description: str | None = None, 

185 data: dict[str, Any] | Any | None = None, 

186 client: "PrefectClient | None" = None, 

187 **kwargs: Any, 

188 ) -> tuple["ArtifactResponse", bool]: 

189 """ 

190 A async method to get or create an artifact. 

191 

192 Arguments: 

193 key: The key of the artifact to get or create. 

194 description: The description of the artifact to create. 

195 data: The data of the artifact to create. 

196 client: The PrefectClient 

197 **kwargs: Additional keyword arguments to use when creating the artifact. 

198 

199 Returns: 

200 The artifact, either retrieved or created. 

201 """ 

202 artifact = await cls.aget(key, client) 

203 if artifact: 

204 return artifact, False 

205 

206 new_artifact = cls(key=key, description=description, data=data, **kwargs) 

207 created_artifact = await new_artifact.acreate(client) 

208 return created_artifact, True 

209 

210 @classmethod 

211 @async_dispatch(aget_or_create) 

212 def get_or_create( 

213 cls, 

214 key: str | None = None, 

215 description: str | None = None, 

216 data: dict[str, Any] | Any | None = None, 

217 client: "PrefectClient | None" = None, 

218 **kwargs: Any, 

219 ) -> tuple["ArtifactResponse", bool]: 

220 """ 

221 A method to get or create an artifact. 

222 

223 Arguments: 

224 key: The key of the artifact to get or create. 

225 description: The description of the artifact to create. 

226 data: The data of the artifact to create. 

227 client: The PrefectClient 

228 **kwargs: Additional keyword arguments to use when creating the artifact. 

229 

230 Returns: 

231 The artifact, either retrieved or created. 

232 """ 

233 artifact = cast(ArtifactResponse, cls.get(key, _sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .get is wrapped in async_dispatch 

234 if artifact: 

235 return artifact, False 

236 

237 new_artifact = cls(key=key, description=description, data=data, **kwargs) 

238 created_artifact = cast( 

239 ArtifactResponse, 

240 new_artifact.create(_sync=True), # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch 

241 ) 

242 return created_artifact, True 

243 

244 # TODO: Remove this when we remove async_dispatch because it doesn't need to be async 

245 async def aformat(self) -> str | float | int | dict[str, Any]: 

246 return json.dumps(self.data) 

247 

248 @async_dispatch(aformat) 

249 def format(self) -> str | float | int | dict[str, Any]: 

250 return json.dumps(self.data) 

251 

252 

253class LinkArtifact(Artifact): 

254 link: str 

255 link_text: Optional[str] = None 

256 type: Optional[str] = "markdown" 

257 

258 def _format(self) -> str: 

259 return ( 

260 f"[{self.link_text}]({self.link})" 

261 if self.link_text 

262 else f"[{self.link}]({self.link})" 

263 ) 

264 

265 async def aformat(self) -> str: 

266 return self._format() 

267 

268 @async_dispatch(aformat) 

269 def format(self) -> str: 

270 return self._format() 

271 

272 

273class MarkdownArtifact(Artifact): 

274 markdown: str 

275 type: Optional[str] = "markdown" 

276 

277 async def aformat(self) -> str: 

278 return self.markdown 

279 

280 @async_dispatch(aformat) 

281 def format(self) -> str: 

282 return self.markdown 

283 

284 

285class TableArtifact(Artifact): 

286 table: Union[dict[str, list[Any]], list[dict[str, Any]], list[list[Any]]] 

287 type: Optional[str] = "table" 

288 

289 @classmethod 

290 def _sanitize( 

291 cls, item: dict[str, Any] | list[Any] | float 

292 ) -> dict[str, Any] | list[Any] | int | float | None: 

293 """ 

294 Sanitize NaN values in a given item. 

295 The item can be a dict, list or float. 

296 """ 

297 if isinstance(item, list): 

298 return [cls._sanitize(sub_item) for sub_item in item] 

299 elif isinstance(item, dict): 

300 return {k: cls._sanitize(v) for k, v in item.items()} 

301 elif isinstance(item, float) and math.isnan(item): 

302 return None 

303 else: 

304 return item 

305 

306 async def aformat(self) -> str: 

307 return json.dumps(self._sanitize(self.table)) 

308 

309 @async_dispatch(aformat) 

310 def format(self) -> str: 

311 return json.dumps(self._sanitize(self.table)) 

312 

313 

314class ProgressArtifact(Artifact): 

315 progress: float 

316 type: Optional[str] = "progress" 

317 

318 def _format(self) -> float: 

319 # Ensure progress is between 0 and 100 

320 min_progress = 0.0 

321 max_progress = 100.0 

322 if self.progress < min_progress or self.progress > max_progress: 

323 logger.warning( 

324 f"ProgressArtifact received an invalid value, Progress: {self.progress}%" 

325 ) 

326 self.progress = max(min_progress, min(self.progress, max_progress)) 

327 logger.warning(f"Interpreting as {self.progress}% progress") 

328 

329 return self.progress 

330 

331 async def aformat(self) -> float: 

332 return self._format() 

333 

334 @async_dispatch(aformat) 

335 def format(self) -> float: 

336 return self._format() 

337 

338 

339class ImageArtifact(Artifact): 

340 """ 

341 An artifact that will display an image from a publicly accessible URL in the UI. 

342 

343 Arguments: 

344 image_url: The URL of the image to display. 

345 """ 

346 

347 image_url: str 

348 type: Optional[str] = "image" 

349 

350 async def aformat(self) -> str: 

351 return self.image_url 

352 

353 @async_dispatch(aformat) 

354 def format(self) -> str: 

355 """ 

356 This method is used to format the artifact data so it can be properly sent 

357 to the API when the .create() method is called. 

358 

359 Returns: 

360 str: The image URL. 

361 """ 

362 return self.image_url 

363 

364 

365async def acreate_link_artifact( 

366 link: str, 

367 link_text: str | None = None, 

368 key: str | None = None, 

369 description: str | None = None, 

370 client: "PrefectClient | None" = None, 

371) -> UUID: 

372 """ 

373 Create a link artifact. 

374 

375 Arguments: 

376 link: The link to create. 

377 link_text: The link text. 

378 key: A user-provided string identifier. 

379 Required for the artifact to show in the Artifacts page in the UI. 

380 The key must only contain lowercase letters, numbers, and dashes. 

381 description: A user-specified description of the artifact. 

382 

383 

384 Returns: 

385 The table artifact ID. 

386 """ 

387 new_artifact = LinkArtifact( 

388 key=key, 

389 description=description, 

390 link=link, 

391 link_text=link_text, 

392 ) 

393 artifact = await new_artifact.acreate(client) 

394 

395 return artifact.id 

396 

397 

398@async_dispatch(acreate_link_artifact) 

399def create_link_artifact( 

400 link: str, 

401 link_text: str | None = None, 

402 key: str | None = None, 

403 description: str | None = None, 

404 client: "PrefectClient | None" = None, 

405) -> UUID: 

406 """ 

407 Create a link artifact. 

408 

409 Arguments: 

410 link: The link to create. 

411 link_text: The link text. 

412 key: A user-provided string identifier. 

413 Required for the artifact to show in the Artifacts page in the UI. 

414 The key must only contain lowercase letters, numbers, and dashes. 

415 description: A user-specified description of the artifact. 

416 

417 

418 Returns: 

419 The table artifact ID. 

420 

421 Example: 

422 ```python 

423 from prefect import flow 

424 from prefect.artifacts import create_link_artifact 

425 

426 @flow 

427 def my_flow(): 

428 create_link_artifact( 

429 link="https://www.prefect.io", 

430 link_text="Prefect", 

431 key="prefect-link", 

432 description="This is a link to the Prefect website", 

433 ) 

434 

435 my_flow() 

436 ``` 

437 """ 

438 new_artifact = LinkArtifact( 

439 key=key, 

440 description=description, 

441 link=link, 

442 link_text=link_text, 

443 ) 

444 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch 

445 

446 return artifact.id 

447 

448 

449async def acreate_markdown_artifact( 

450 markdown: str, 

451 key: str | None = None, 

452 description: str | None = None, 

453) -> UUID: 

454 """ 

455 Create a markdown artifact. 

456 

457 Arguments: 

458 markdown: The markdown to create. 

459 key: A user-provided string identifier. 

460 Required for the artifact to show in the Artifacts page in the UI. 

461 The key must only contain lowercase letters, numbers, and dashes. 

462 description: A user-specified description of the artifact. 

463 

464 Returns: 

465 The table artifact ID. 

466 """ 

467 new_artifact = MarkdownArtifact( 

468 key=key, 

469 description=description, 

470 markdown=markdown, 

471 ) 

472 artifact = await new_artifact.acreate() 

473 

474 return artifact.id 

475 

476 

477@async_dispatch(acreate_markdown_artifact) 

478def create_markdown_artifact( 

479 markdown: str, 

480 key: str | None = None, 

481 description: str | None = None, 

482) -> UUID: 

483 """ 

484 Create a markdown artifact. 

485 

486 Arguments: 

487 markdown: The markdown to create. 

488 key: A user-provided string identifier. 

489 Required for the artifact to show in the Artifacts page in the UI. 

490 The key must only contain lowercase letters, numbers, and dashes. 

491 description: A user-specified description of the artifact. 

492 

493 Returns: 

494 The table artifact ID. 

495 

496 Example: 

497 ```python 

498 from prefect import flow 

499 from prefect.artifacts import create_markdown_artifact 

500 

501 @flow 

502 def my_flow(): 

503 create_markdown_artifact( 

504 markdown="## Prefect", 

505 key="prefect-markdown", 

506 description="This is a markdown artifact", 

507 ) 

508 

509 my_flow() 

510 ``` 

511 """ 

512 new_artifact = MarkdownArtifact( 

513 key=key, 

514 description=description, 

515 markdown=markdown, 

516 ) 

517 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch 

518 

519 return artifact.id 

520 

521 

522async def acreate_table_artifact( 

523 table: dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]], 

524 key: str | None = None, 

525 description: str | None = None, 

526) -> UUID: 

527 """ 

528 Create a table artifact asynchronously. 

529 

530 Arguments: 

531 table: The table to create. 

532 key: A user-provided string identifier. 

533 Required for the artifact to show in the Artifacts page in the UI. 

534 The key must only contain lowercase letters, numbers, and dashes. 

535 description: A user-specified description of the artifact. 

536 

537 Returns: 

538 The table artifact ID. 

539 """ 

540 

541 new_artifact = TableArtifact( 

542 key=key, 

543 description=description, 

544 table=table, 

545 ) 

546 artifact = await new_artifact.acreate() 

547 

548 return artifact.id 

549 

550 

551@async_dispatch(acreate_table_artifact) 

552def create_table_artifact( 

553 table: dict[str, list[Any]] | list[dict[str, Any]] | list[list[Any]], 

554 key: str | None = None, 

555 description: str | None = None, 

556) -> UUID: 

557 """ 

558 Create a table artifact. 

559 

560 Arguments: 

561 table: The table to create. 

562 key: A user-provided string identifier. 

563 Required for the artifact to show in the Artifacts page in the UI. 

564 The key must only contain lowercase letters, numbers, and dashes. 

565 description: A user-specified description of the artifact. 

566 

567 Returns: 

568 The table artifact ID. 

569 

570 Example: 

571 ```python 

572 from prefect import flow 

573 from prefect.artifacts import create_table_artifact 

574 

575 @flow 

576 def my_flow(): 

577 create_table_artifact( 

578 table=[{"name": "John", "age": 30}, {"name": "Jane", "age": 25}], 

579 key="prefect-table", 

580 description="This is a table artifact", 

581 ) 

582 

583 my_flow() 

584 ``` 

585 """ 

586 

587 new_artifact = TableArtifact( 

588 key=key, 

589 description=description, 

590 table=table, 

591 ) 

592 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch 

593 

594 return artifact.id 

595 

596 

597async def acreate_progress_artifact( 

598 progress: float, 

599 key: str | None = None, 

600 description: str | None = None, 

601) -> UUID: 

602 """ 

603 Create a progress artifact asynchronously. 

604 

605 Arguments: 

606 progress: The percentage of progress represented by a float between 0 and 100. 

607 key: A user-provided string identifier. 

608 Required for the artifact to show in the Artifacts page in the UI. 

609 The key must only contain lowercase letters, numbers, and dashes. 

610 description: A user-specified description of the artifact. 

611 

612 Returns: 

613 The progress artifact ID. 

614 """ 

615 

616 new_artifact = ProgressArtifact( 

617 key=key, 

618 description=description, 

619 progress=progress, 

620 ) 

621 artifact = await new_artifact.acreate() 

622 

623 return artifact.id 

624 

625 

626@async_dispatch(acreate_progress_artifact) 

627def create_progress_artifact( 

628 progress: float, 

629 key: str | None = None, 

630 description: str | None = None, 

631) -> UUID: 

632 """ 

633 Create a progress artifact. 

634 

635 Arguments: 

636 progress: The percentage of progress represented by a float between 0 and 100. 

637 key: A user-provided string identifier. 

638 Required for the artifact to show in the Artifacts page in the UI. 

639 The key must only contain lowercase letters, numbers, and dashes. 

640 description: A user-specified description of the artifact. 

641 

642 Returns: 

643 The progress artifact ID. 

644 """ 

645 

646 new_artifact = ProgressArtifact( 

647 key=key, 

648 description=description, 

649 progress=progress, 

650 ) 

651 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch 

652 

653 return artifact.id 

654 

655 

656async def aupdate_progress_artifact( 

657 artifact_id: UUID, 

658 progress: float, 

659 description: str | None = None, 

660 client: "PrefectClient | None" = None, 

661) -> UUID: 

662 """ 

663 Update a progress artifact asynchronously. 

664 

665 Arguments: 

666 artifact_id: The ID of the artifact to update. 

667 progress: The percentage of progress represented by a float between 0 and 100. 

668 description: A user-specified description of the artifact. 

669 

670 Returns: 

671 The progress artifact ID. 

672 """ 

673 

674 local_client_context = nullcontext(client) if client else get_client() 

675 async with local_client_context as client: 

676 artifact = ProgressArtifact( 

677 description=description, 

678 progress=progress, 

679 ) 

680 update = ( 

681 ArtifactUpdate( 

682 description=artifact.description, 

683 data=await artifact.aformat(), 

684 ) 

685 if description 

686 else ArtifactUpdate(data=await artifact.aformat()) 

687 ) 

688 

689 await client.update_artifact( 

690 artifact_id=artifact_id, 

691 artifact=update, 

692 ) 

693 

694 return artifact_id 

695 

696 

697@async_dispatch(aupdate_progress_artifact) 

698def update_progress_artifact( 

699 artifact_id: UUID, 

700 progress: float, 

701 description: str | None = None, 

702 client: "PrefectClient | None" = None, 

703) -> UUID: 

704 """ 

705 Update a progress artifact. 

706 

707 Arguments: 

708 artifact_id: The ID of the artifact to update. 

709 progress: The percentage of progress represented by a float between 0 and 100. 

710 description: A user-specified description of the artifact. 

711 

712 Returns: 

713 The progress artifact ID. 

714 """ 

715 

716 sync_client = get_client(sync_client=True) 

717 

718 artifact = ProgressArtifact( 

719 description=description, 

720 progress=progress, 

721 ) 

722 update = ( 

723 ArtifactUpdate( 

724 description=artifact.description, 

725 data=cast(float, artifact.format(_sync=True)), # pyright: ignore[reportCallIssue] _sync is valid because .format is wrapped in async_dispatch 

726 ) 

727 if description 

728 else ArtifactUpdate(data=cast(float, artifact.format(_sync=True))) # pyright: ignore[reportCallIssue] _sync is valid because .format is wrapped in async_dispatch 

729 ) 

730 

731 sync_client.update_artifact( 

732 artifact_id=artifact_id, 

733 artifact=update, 

734 ) 

735 

736 return artifact_id 

737 

738 

739async def acreate_image_artifact( 

740 image_url: str, 

741 key: str | None = None, 

742 description: str | None = None, 

743) -> UUID: 

744 """ 

745 Create an image artifact asynchronously. 

746 

747 Arguments: 

748 image_url: The URL of the image to display. 

749 key: A user-provided string identifier. 

750 Required for the artifact to show in the Artifacts page in the UI. 

751 The key must only contain lowercase letters, numbers, and dashes. 

752 description: A user-specified description of the artifact. 

753 

754 Returns: 

755 The image artifact ID. 

756 """ 

757 

758 new_artifact = ImageArtifact( 

759 key=key, 

760 description=description, 

761 image_url=image_url, 

762 ) 

763 artifact = await new_artifact.acreate() 

764 

765 return artifact.id 

766 

767 

768@async_dispatch(acreate_image_artifact) 

769def create_image_artifact( 

770 image_url: str, 

771 key: str | None = None, 

772 description: str | None = None, 

773) -> UUID: 

774 """ 

775 Create an image artifact. 

776 

777 Arguments: 

778 image_url: The URL of the image to display. 

779 key: A user-provided string identifier. 

780 Required for the artifact to show in the Artifacts page in the UI. 

781 The key must only contain lowercase letters, numbers, and dashes. 

782 description: A user-specified description of the artifact. 

783 

784 Returns: 

785 The image artifact ID. 

786 """ 

787 

788 new_artifact = ImageArtifact( 

789 key=key, 

790 description=description, 

791 image_url=image_url, 

792 ) 

793 artifact = cast(ArtifactResponse, new_artifact.create(_sync=True)) # pyright: ignore[reportCallIssue] _sync is valid because .create is wrapped in async_dispatch 

794 

795 return artifact.id