Coverage for /usr/local/lib/python3.12/site-packages/prefect/results.py: 18%

386 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import asyncio 1a

4import os 1a

5import socket 1a

6import threading 1a

7import uuid 1a

8from functools import partial 1a

9from operator import methodcaller 1a

10from pathlib import Path 1a

11from typing import ( 1a

12 TYPE_CHECKING, 

13 Annotated, 

14 Any, 

15 Callable, 

16 ClassVar, 

17 Optional, 

18 TypeVar, 

19 Union, 

20) 

21from uuid import UUID 1a

22 

23from cachetools import LRUCache 1a

24from pydantic import ( 1a

25 BaseModel, 

26 ConfigDict, 

27 Discriminator, 

28 Field, 

29 Tag, 

30) 

31from typing_extensions import ParamSpec, Self 1a

32 

33import prefect 1a

34import prefect.types._datetime 1a

35from prefect._internal.compatibility.async_dispatch import async_dispatch 1a

36from prefect._internal.compatibility.blocks import call_explicitly_async_block_method 1a

37from prefect._internal.concurrency.event_loop import get_running_loop 1a

38from prefect._result_records import R, ResultRecord, ResultRecordMetadata 1a

39from prefect.blocks.core import Block 1a

40from prefect.exceptions import ( 1a

41 ConfigurationError, 

42 MissingContextError, 

43) 

44from prefect.filesystems import ( 1a

45 LocalFileSystem, 

46 NullFileSystem, 

47 WritableFileSystem, 

48) 

49from prefect.locking.protocol import LockManager 1a

50from prefect.logging import get_logger 1a

51from prefect.serializers import Serializer 1a

52from prefect.settings.context import get_current_settings 1a

53from prefect.types import DateTime 1a

54from prefect.utilities.annotations import NotSet 1a

55from prefect.utilities.asyncutils import sync_compatible 1a

56 

57if TYPE_CHECKING: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true1a

58 import logging 

59 

60 from prefect import Flow, Task 

61 from prefect.transactions import IsolationLevel 

62 

63 

64ResultStorage = Union[WritableFileSystem, str] 1a

65ResultSerializer = Union[Serializer, str] 1a

66LITERAL_TYPES: set[type] = {type(None), bool, UUID} 1a

67 

68 

69def DEFAULT_STORAGE_KEY_FN() -> str: 1a

70 return uuid.uuid4().hex 

71 

72 

73logger: "logging.Logger" = get_logger("results") 1a

74P = ParamSpec("P") 1a

75 

76_default_storages: dict[tuple[str, str], WritableFileSystem] = {} 1a

77 

78 

79async def aget_default_result_storage() -> WritableFileSystem: 1a

80 """ 

81 Generate a default file system for result storage. 

82 """ 

83 settings = get_current_settings() 

84 default_block = settings.results.default_storage_block 

85 basepath = settings.results.local_storage_path 

86 

87 cache_key = (str(default_block), str(basepath)) 

88 

89 if cache_key in _default_storages: 

90 return _default_storages[cache_key] 

91 

92 if default_block is not None: 

93 storage = await aresolve_result_storage(default_block) 

94 else: 

95 # Use the local file system 

96 storage = LocalFileSystem(basepath=str(basepath)) 

97 

98 _default_storages[cache_key] = storage 

99 return storage 

100 

101 

102@async_dispatch(aget_default_result_storage) 1a

103def get_default_result_storage() -> WritableFileSystem: 1a

104 """ 

105 Generate a default file system for result storage. 

106 """ 

107 settings = get_current_settings() 

108 default_block = settings.results.default_storage_block 

109 basepath = settings.results.local_storage_path 

110 

111 cache_key = (str(default_block), str(basepath)) 

112 

113 if cache_key in _default_storages: 

114 return _default_storages[cache_key] 

115 

116 if default_block is not None: 

117 storage = resolve_result_storage(default_block, _sync=True) 

118 if TYPE_CHECKING: 

119 assert isinstance(storage, WritableFileSystem) 

120 else: 

121 # Use the local file system 

122 storage = LocalFileSystem(basepath=str(basepath)) 

123 

124 _default_storages[cache_key] = storage 

125 return storage 

126 

127 

128async def aresolve_result_storage( 1a

129 result_storage: ResultStorage | UUID | Path, 

130) -> WritableFileSystem: 

131 """ 

132 Resolve one of the valid `ResultStorage` input types into a saved block 

133 document id and an instance of the block. 

134 """ 

135 from prefect.client.orchestration import get_client 

136 

137 client = get_client() 

138 storage_block: WritableFileSystem 

139 if isinstance(result_storage, Block): 

140 storage_block = result_storage 

141 elif isinstance(result_storage, Path): 

142 storage_block = LocalFileSystem(basepath=str(result_storage)) 

143 elif isinstance(result_storage, str): 

144 block = await Block.aload(result_storage, client=client) 

145 if TYPE_CHECKING: 

146 assert isinstance(block, WritableFileSystem) 

147 storage_block = block 

148 elif isinstance(result_storage, UUID): # pyright: ignore[reportUnnecessaryIsInstance] 

149 block_document = await client.read_block_document(result_storage) 

150 from_block_document = methodcaller("_from_block_document", block_document) 

151 block = from_block_document(Block) 

152 if TYPE_CHECKING: 

153 assert isinstance(block, WritableFileSystem) 

154 storage_block = block 

155 else: 

156 raise TypeError( 

157 "Result storage must be one of the following types: 'UUID', 'Block', " 

158 f"'str'. Got unsupported type {type(result_storage).__name__!r}." 

159 ) 

160 

161 return storage_block 

162 

163 

164@async_dispatch(aresolve_result_storage) 1a

165def resolve_result_storage( 1a

166 result_storage: ResultStorage | UUID | Path, 

167) -> WritableFileSystem: 

168 """ 

169 Resolve one of the valid `ResultStorage` input types into a saved block 

170 document id and an instance of the block. 

171 """ 

172 from prefect.client.orchestration import get_client 

173 

174 client = get_client(sync_client=True) 

175 storage_block: WritableFileSystem 

176 if isinstance(result_storage, Block): 

177 storage_block = result_storage 

178 elif isinstance(result_storage, Path): 

179 storage_block = LocalFileSystem(basepath=str(result_storage)) 

180 elif isinstance(result_storage, str): 

181 block = Block.load(result_storage, _sync=True) 

182 if TYPE_CHECKING: 

183 assert isinstance(block, WritableFileSystem) 

184 storage_block = block 

185 elif isinstance(result_storage, UUID): # pyright: ignore[reportUnnecessaryIsInstance] 

186 block_document = client.read_block_document(result_storage) 

187 from_block_document = methodcaller("_from_block_document", block_document) 

188 block = from_block_document(Block) 

189 if TYPE_CHECKING: 

190 assert isinstance(block, WritableFileSystem) 

191 storage_block = block 

192 else: 

193 raise TypeError( 

194 "Result storage must be one of the following types: 'UUID', 'Block', " 

195 f"'str'. Got unsupported type {type(result_storage).__name__!r}." 

196 ) 

197 return storage_block 

198 

199 

200def resolve_serializer(serializer: ResultSerializer) -> Serializer: 1a

201 """ 

202 Resolve one of the valid `ResultSerializer` input types into a serializer 

203 instance. 

204 """ 

205 if isinstance(serializer, Serializer): 

206 return serializer 

207 elif isinstance(serializer, str): # pyright: ignore[reportUnnecessaryIsInstance] 

208 return Serializer(type=serializer) 

209 else: 

210 raise TypeError( 

211 "Result serializer must be one of the following types: 'Serializer', " 

212 f"'str'. Got unsupported type {type(serializer).__name__!r}." 

213 ) 

214 

215 

216async def get_or_create_default_task_scheduling_storage() -> ResultStorage: 1a

217 """ 

218 Generate a default file system for background task parameter/result storage. 

219 """ 

220 settings = get_current_settings() 

221 default_block = settings.tasks.scheduling.default_storage_block 

222 

223 if default_block is not None: 

224 block = await Block.aload(default_block) 

225 if TYPE_CHECKING: 

226 assert isinstance(block, WritableFileSystem) 

227 return block 

228 

229 # otherwise, use the local file system 

230 basepath = settings.results.local_storage_path 

231 return LocalFileSystem(basepath=str(basepath)) 

232 

233 

234def get_default_result_serializer() -> Serializer: 1a

235 """ 

236 Generate a default file system for result storage. 

237 """ 

238 settings = get_current_settings() 

239 return resolve_serializer(settings.results.default_serializer) 

240 

241 

242def get_default_persist_setting() -> bool: 1a

243 """ 

244 Return the default option for result persistence. 

245 """ 

246 settings = get_current_settings() 

247 return settings.results.persist_by_default 

248 

249 

250def get_default_persist_setting_for_tasks() -> bool: 1a

251 """ 

252 Return the default option for result persistence for tasks. 

253 """ 

254 settings = get_current_settings() 

255 return ( 

256 settings.tasks.default_persist_result 

257 if settings.tasks.default_persist_result is not None 

258 else settings.results.persist_by_default 

259 ) 

260 

261 

262def should_persist_result() -> bool: 1a

263 """ 

264 Return the default option for result persistence determined by the current run context. 

265 

266 If there is no current run context, the value of `results.persist_by_default` on the 

267 current settings will be returned. 

268 """ 

269 from prefect.context import FlowRunContext, TaskRunContext 

270 

271 task_run_context = TaskRunContext.get() 

272 if task_run_context is not None: 

273 return task_run_context.persist_result 

274 flow_run_context = FlowRunContext.get() 

275 if flow_run_context is not None: 

276 return flow_run_context.persist_result 

277 

278 return get_default_persist_setting() 

279 

280 

281def _format_user_supplied_storage_key(key: str) -> str: 1a

282 # Note here we are pinning to task runs since flow runs do not support storage keys 

283 # yet; we'll need to split logic in the future or have two separate functions 

284 runtime_vars = {key: getattr(prefect.runtime, key) for key in dir(prefect.runtime)} 

285 return key.format(**runtime_vars, parameters=prefect.runtime.task_run.parameters) 

286 

287 

288T = TypeVar("T") 1a

289 

290 

291def default_cache() -> LRUCache[str, "ResultRecord[Any]"]: 1a

292 return LRUCache(maxsize=1000) 

293 

294 

295def result_storage_discriminator(x: Any) -> str: 1a

296 if isinstance(x, dict): 

297 if "block_type_slug" in x: 

298 return "WritableFileSystem" 

299 else: 

300 return "NullFileSystem" 

301 if isinstance(x, WritableFileSystem): 

302 return "WritableFileSystem" 

303 if isinstance(x, NullFileSystem): 

304 return "NullFileSystem" 

305 return "None" 

306 

307 

308class ResultStore(BaseModel): 1a

309 """ 

310 Manages the storage and retrieval of results. 

311 

312 Attributes: 

313 result_storage: The storage for result records. If not provided, the default 

314 result storage will be used. 

315 metadata_storage: The storage for result record metadata. If not provided, 

316 the metadata will be stored alongside the results. 

317 lock_manager: The lock manager to use for locking result records. If not provided, 

318 the store cannot be used in transactions with the SERIALIZABLE isolation level. 

319 cache_result_in_memory: Whether to cache results in memory. 

320 serializer: The serializer to use for results. 

321 storage_key_fn: The function to generate storage keys. 

322 """ 

323 

324 model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True) 1a

325 

326 result_storage: Optional[WritableFileSystem] = Field(default=None) 1a

327 metadata_storage: Annotated[ 1a

328 Union[ 

329 Annotated[WritableFileSystem, Tag("WritableFileSystem")], 

330 Annotated[NullFileSystem, Tag("NullFileSystem")], 

331 Annotated[None, Tag("None")], 

332 ], 

333 Discriminator(result_storage_discriminator), 

334 ] = Field(default=None) 

335 lock_manager: Optional[LockManager] = Field(default=None) 1a

336 cache_result_in_memory: bool = Field(default=True) 1a

337 serializer: Serializer = Field(default_factory=get_default_result_serializer) 1a

338 storage_key_fn: Callable[[], str] = Field(default=DEFAULT_STORAGE_KEY_FN) 1a

339 cache: LRUCache[str, "ResultRecord[Any]"] = Field(default_factory=default_cache) 1a

340 

341 @property 1a

342 def result_storage_block_id(self) -> UUID | None: 1a

343 if self.result_storage is None: 

344 return None 

345 return getattr(self.result_storage, "_block_document_id", None) 

346 

347 @classmethod 1a

348 async def _from_metadata(cls, metadata: ResultRecordMetadata) -> "ResultRecord[R]": 1a

349 """ 

350 Create a result record from metadata. 

351 

352 Will use the result record metadata to fetch data via a result store. 

353 

354 Args: 

355 metadata: The metadata to create the result record from. 

356 

357 Returns: 

358 ResultRecord: The result record. 

359 """ 

360 if metadata.storage_block_id is None: 

361 storage_block = None 

362 else: 

363 storage_block = await aresolve_result_storage(metadata.storage_block_id) 

364 store = cls(result_storage=storage_block, serializer=metadata.serializer) 

365 if metadata.storage_key is None: 

366 raise ValueError( 

367 "storage_key is required to hydrate a result record from metadata" 

368 ) 

369 result = await store.aread(metadata.storage_key) 

370 return result 

371 

372 @sync_compatible 1a

373 async def update_for_flow(self, flow: "Flow[..., Any]") -> Self: 1a

374 """ 

375 Create a new result store for a flow with updated settings. 

376 

377 Args: 

378 flow: The flow to update the result store for. 

379 

380 Returns: 

381 An updated result store. 

382 """ 

383 update: dict[str, Any] = {} 

384 update["cache_result_in_memory"] = flow.cache_result_in_memory 

385 if flow.result_storage is not None: 

386 update["result_storage"] = await aresolve_result_storage( 

387 flow.result_storage 

388 ) 

389 if flow.result_serializer is not None: 

390 update["serializer"] = resolve_serializer(flow.result_serializer) 

391 if self.result_storage is None and update.get("result_storage") is None: 

392 update["result_storage"] = await aget_default_result_storage() 

393 update["metadata_storage"] = NullFileSystem() 

394 return self.model_copy(update=update) 

395 

396 @sync_compatible 1a

397 async def update_for_task(self: Self, task: "Task[P, R]") -> Self: 1a

398 """ 

399 Create a new result store for a task. 

400 

401 Args: 

402 task: The task to update the result store for. 

403 

404 Returns: 

405 An updated result store. 

406 """ 

407 from prefect.transactions import get_transaction 

408 

409 update: dict[str, Any] = {} 

410 update["cache_result_in_memory"] = task.cache_result_in_memory 

411 if task.result_storage is not None: 

412 update["result_storage"] = await aresolve_result_storage( 

413 task.result_storage 

414 ) 

415 if task.result_serializer is not None: 

416 update["serializer"] = resolve_serializer(task.result_serializer) 

417 if task.result_storage_key is not None: 

418 update["storage_key_fn"] = partial( 

419 _format_user_supplied_storage_key, task.result_storage_key 

420 ) 

421 

422 # use the lock manager from a parent transaction if it exists 

423 if (current_txn := get_transaction()) and isinstance( 

424 current_txn.store, ResultStore 

425 ): 

426 update["lock_manager"] = current_txn.store.lock_manager 

427 

428 from prefect.cache_policies import CachePolicy 

429 

430 if isinstance(task.cache_policy, CachePolicy): 

431 if task.cache_policy.key_storage is not None: 

432 storage = task.cache_policy.key_storage 

433 if isinstance(storage, str) and not len(storage.split("/")) == 2: 

434 storage = Path(storage) 

435 update["metadata_storage"] = await aresolve_result_storage(storage) 

436 # if the cache policy has a lock manager, it takes precedence over the parent transaction 

437 if task.cache_policy.lock_manager is not None: 

438 update["lock_manager"] = task.cache_policy.lock_manager 

439 

440 if self.result_storage is None and update.get("result_storage") is None: 

441 update["result_storage"] = await aget_default_result_storage() 

442 if ( 

443 isinstance(self.metadata_storage, NullFileSystem) 

444 and update.get("metadata_storage", NotSet) is NotSet 

445 ): 

446 update["metadata_storage"] = None 

447 return self.model_copy(update=update) 

448 

449 @staticmethod 1a

450 def generate_default_holder() -> str: 1a

451 """ 

452 Generate a default holder string using hostname, PID, and thread ID. 

453 

454 Returns: 

455 str: A unique identifier string. 

456 """ 

457 current_loop = get_running_loop() 

458 hostname = socket.gethostname() 

459 pid = os.getpid() 

460 thread_name = threading.current_thread().name 

461 thread_id = threading.get_ident() 

462 if current_loop: 

463 current_task = asyncio.current_task() 

464 if current_task: 

465 # include the task id to ensure uniqueness because there might be 

466 # multiple tasks running in the same thread 

467 return f"{hostname}:{pid}:{thread_id}:{thread_name}:{id(current_task)}" 

468 return f"{hostname}:{pid}:{thread_id}:{thread_name}" 

469 

470 @sync_compatible 1a

471 async def _exists(self, key: str) -> bool: 1a

472 """ 

473 Check if a result record exists in storage. 

474 

475 Args: 

476 key: The key to check for the existence of a result record. 

477 

478 Returns: 

479 bool: True if the result record exists, False otherwise. 

480 """ 

481 if self.metadata_storage is not None: 

482 # TODO: Add an `exists` method to commonly used storage blocks 

483 # so the entire payload doesn't need to be read 

484 try: 

485 metadata_content = await call_explicitly_async_block_method( 

486 self.metadata_storage, "read_path", (key,), {} 

487 ) 

488 if metadata_content is None: 

489 return False 

490 metadata = ResultRecordMetadata.load_bytes(metadata_content) 

491 

492 except Exception: 

493 return False 

494 else: 

495 try: 

496 content = await call_explicitly_async_block_method( 

497 self.result_storage, "read_path", (key,), {} 

498 ) 

499 if content is None: 

500 return False 

501 record: ResultRecord[Any] = ResultRecord.deserialize(content) 

502 metadata = record.metadata 

503 except Exception: 

504 return False 

505 

506 if metadata.expiration: 

507 # if the result has an expiration, 

508 # check if it is still in the future 

509 exists = metadata.expiration > prefect.types._datetime.now("UTC") 

510 else: 

511 exists = True 

512 return exists 

513 

514 def exists(self, key: str) -> bool: 1a

515 """ 

516 Check if a result record exists in storage. 

517 

518 Args: 

519 key: The key to check for the existence of a result record. 

520 

521 Returns: 

522 bool: True if the result record exists, False otherwise. 

523 """ 

524 return self._exists(key=key, _sync=True) 

525 

526 async def aexists(self, key: str) -> bool: 1a

527 """ 

528 Check if a result record exists in storage. 

529 

530 Args: 

531 key: The key to check for the existence of a result record. 

532 

533 Returns: 

534 bool: True if the result record exists, False otherwise. 

535 """ 

536 return await self._exists(key=key, _sync=False) 

537 

538 def _resolved_key_path(self, key: str) -> str: 1a

539 if self.result_storage_block_id is None and ( 

540 _resolve_path := getattr(self.result_storage, "_resolve_path", None) 

541 ): 

542 path_key = _resolve_path(key) 

543 if path_key is not None: 

544 return str(_resolve_path(key)) 

545 else: 

546 return key 

547 return key 

548 

549 @sync_compatible 1a

550 async def _read(self, key: str, holder: str) -> "ResultRecord[Any]": 1a

551 """ 

552 Read a result record from storage. 

553 

554 This is the internal implementation. Use `read` or `aread` for synchronous and 

555 asynchronous result reading respectively. 

556 

557 Args: 

558 key: The key to read the result record from. 

559 holder: The holder of the lock if a lock was set on the record. 

560 

561 Returns: 

562 A result record. 

563 """ 

564 

565 if self.lock_manager is not None and not self.is_lock_holder(key, holder): 

566 await self.await_for_lock(key) 

567 

568 resolved_key_path = self._resolved_key_path(key) 

569 

570 if resolved_key_path in self.cache: 

571 return self.cache[resolved_key_path] 

572 

573 if self.result_storage is None: 

574 self.result_storage = await aget_default_result_storage() 

575 

576 if self.metadata_storage is not None: 

577 metadata_content = await call_explicitly_async_block_method( 

578 self.metadata_storage, 

579 "read_path", 

580 (key,), 

581 {}, 

582 ) 

583 metadata = ResultRecordMetadata.load_bytes(metadata_content) 

584 assert metadata.storage_key is not None, ( 

585 "Did not find storage key in metadata" 

586 ) 

587 result_content = await call_explicitly_async_block_method( 

588 self.result_storage, 

589 "read_path", 

590 (metadata.storage_key,), 

591 {}, 

592 ) 

593 result_record: ResultRecord[Any] = ( 

594 ResultRecord.deserialize_from_result_and_metadata( 

595 result=result_content, metadata=metadata_content 

596 ) 

597 ) 

598 else: 

599 content = await call_explicitly_async_block_method( 

600 self.result_storage, 

601 "read_path", 

602 (key,), 

603 {}, 

604 ) 

605 result_record: ResultRecord[Any] = ResultRecord.deserialize( 

606 content, backup_serializer=self.serializer 

607 ) 

608 

609 if self.cache_result_in_memory: 

610 self.cache[resolved_key_path] = result_record 

611 return result_record 

612 

613 def read( 1a

614 self, 

615 key: str, 

616 holder: str | None = None, 

617 ) -> "ResultRecord[Any]": 

618 """ 

619 Read a result record from storage. 

620 

621 Args: 

622 key: The key to read the result record from. 

623 holder: The holder of the lock if a lock was set on the record. 

624 

625 Returns: 

626 A result record. 

627 """ 

628 holder = holder or self.generate_default_holder() 

629 return self._read(key=key, holder=holder, _sync=True) 

630 

631 async def aread( 1a

632 self, 

633 key: str, 

634 holder: str | None = None, 

635 ) -> "ResultRecord[Any]": 

636 """ 

637 Read a result record from storage. 

638 

639 Args: 

640 key: The key to read the result record from. 

641 holder: The holder of the lock if a lock was set on the record. 

642 

643 Returns: 

644 A result record. 

645 """ 

646 holder = holder or self.generate_default_holder() 

647 return await self._read(key=key, holder=holder, _sync=False) 

648 

649 def create_result_record( 1a

650 self, 

651 obj: Any, 

652 key: str | None = None, 

653 expiration: DateTime | None = None, 

654 ) -> "ResultRecord[Any]": 

655 """ 

656 Create a result record. 

657 

658 Args: 

659 key: The key to create the result record for. 

660 obj: The object to create the result record for. 

661 expiration: The expiration time for the result record. 

662 """ 

663 key = key or self.storage_key_fn() 

664 

665 if self.result_storage is None: 

666 self.result_storage = get_default_result_storage(_sync=True) 

667 if TYPE_CHECKING: 

668 assert isinstance(self.result_storage, WritableFileSystem) 

669 

670 if self.result_storage_block_id is None: 

671 if _resolve_path := getattr(self.result_storage, "_resolve_path", None): 

672 path_key = _resolve_path(key) 

673 if path_key is not None: 

674 key = str(_resolve_path(key)) 

675 

676 return ResultRecord( 

677 result=obj, 

678 metadata=ResultRecordMetadata( 

679 serializer=self.serializer, 

680 expiration=expiration, 

681 storage_key=key, 

682 storage_block_id=self.result_storage_block_id, 

683 ), 

684 ) 

685 

686 def write( 1a

687 self, 

688 obj: Any, 

689 key: str | None = None, 

690 expiration: DateTime | None = None, 

691 holder: str | None = None, 

692 ) -> None: 

693 """ 

694 Write a result to storage. 

695 

696 Handles the creation of a `ResultRecord` and its serialization to storage. 

697 

698 Args: 

699 key: The key to write the result record to. 

700 obj: The object to write to storage. 

701 expiration: The expiration time for the result record. 

702 holder: The holder of the lock if a lock was set on the record. 

703 """ 

704 holder = holder or self.generate_default_holder() 

705 result_record = self.create_result_record( 

706 key=key, obj=obj, expiration=expiration 

707 ) 

708 return self.persist_result_record( 

709 result_record=result_record, 

710 holder=holder, 

711 ) 

712 

713 async def awrite( 1a

714 self, 

715 obj: Any, 

716 key: str | None = None, 

717 expiration: DateTime | None = None, 

718 holder: str | None = None, 

719 ) -> None: 

720 """ 

721 Write a result to storage. 

722 

723 Args: 

724 key: The key to write the result record to. 

725 obj: The object to write to storage. 

726 expiration: The expiration time for the result record. 

727 holder: The holder of the lock if a lock was set on the record. 

728 """ 

729 holder = holder or self.generate_default_holder() 

730 return await self.apersist_result_record( 

731 result_record=self.create_result_record( 

732 key=key, obj=obj, expiration=expiration 

733 ), 

734 holder=holder, 

735 ) 

736 

737 @sync_compatible 1a

738 async def _persist_result_record( 1a

739 self, result_record: "ResultRecord[Any]", holder: str 

740 ) -> None: 

741 """ 

742 Persist a result record to storage. 

743 

744 Args: 

745 result_record: The result record to persist. 

746 holder: The holder of the lock if a lock was set on the record. 

747 """ 

748 assert result_record.metadata.storage_key is not None, ( 

749 "Storage key is required on result record" 

750 ) 

751 

752 key = result_record.metadata.storage_key 

753 if result_record.metadata.storage_block_id is None: 

754 basepath = ( 

755 _resolve_path("") 

756 if ( 

757 _resolve_path := getattr(self.result_storage, "_resolve_path", None) 

758 ) 

759 else Path(".").resolve() 

760 ) 

761 base_key = key if basepath is None else str(Path(key).relative_to(basepath)) 

762 else: 

763 base_key = key 

764 if ( 

765 self.lock_manager is not None 

766 and self.is_locked(base_key) 

767 and not self.is_lock_holder(base_key, holder) 

768 ): 

769 raise RuntimeError( 

770 f"Cannot write to result record with key {base_key} because it is locked by " 

771 f"another holder." 

772 ) 

773 if self.result_storage is None: 

774 self.result_storage = await aget_default_result_storage() 

775 

776 # If metadata storage is configured, write result and metadata separately 

777 if self.metadata_storage is not None: 

778 await call_explicitly_async_block_method( 

779 self.result_storage, 

780 "write_path", 

781 (result_record.metadata.storage_key,), 

782 {"content": result_record.serialize_result()}, 

783 ) 

784 await call_explicitly_async_block_method( 

785 self.metadata_storage, 

786 "write_path", 

787 (base_key,), 

788 {"content": result_record.serialize_metadata()}, 

789 ) 

790 # Otherwise, write the result metadata and result together 

791 else: 

792 await call_explicitly_async_block_method( 

793 self.result_storage, 

794 "write_path", 

795 (result_record.metadata.storage_key,), 

796 {"content": result_record.serialize()}, 

797 ) 

798 if self.cache_result_in_memory: 

799 self.cache[key] = result_record 

800 

801 def persist_result_record( 1a

802 self, result_record: "ResultRecord[Any]", holder: str | None = None 

803 ) -> None: 

804 """ 

805 Persist a result record to storage. 

806 

807 Args: 

808 result_record: The result record to persist. 

809 """ 

810 holder = holder or self.generate_default_holder() 

811 return self._persist_result_record( 

812 result_record=result_record, holder=holder, _sync=True 

813 ) 

814 

815 async def apersist_result_record( 1a

816 self, result_record: "ResultRecord[Any]", holder: str | None = None 

817 ) -> None: 

818 """ 

819 Persist a result record to storage. 

820 

821 Args: 

822 result_record: The result record to persist. 

823 """ 

824 holder = holder or self.generate_default_holder() 

825 return await self._persist_result_record( 

826 result_record=result_record, holder=holder, _sync=False 

827 ) 

828 

829 def supports_isolation_level(self, level: "IsolationLevel") -> bool: 1a

830 """ 

831 Check if the result store supports a given isolation level. 

832 

833 Args: 

834 level: The isolation level to check. 

835 

836 Returns: 

837 bool: True if the isolation level is supported, False otherwise. 

838 """ 

839 from prefect.transactions import IsolationLevel 

840 

841 if level == IsolationLevel.READ_COMMITTED: 

842 return True 

843 elif level == IsolationLevel.SERIALIZABLE: 

844 return self.lock_manager is not None 

845 else: 

846 raise ValueError(f"Unsupported isolation level: {level}") 

847 

848 def acquire_lock( 1a

849 self, key: str, holder: str | None = None, timeout: float | None = None 

850 ) -> bool: 

851 """ 

852 Acquire a lock for a result record. 

853 

854 Args: 

855 key: The key to acquire the lock for. 

856 holder: The holder of the lock. If not provided, a default holder based on the 

857 current host, process, and thread will be used. 

858 timeout: The timeout for the lock. 

859 

860 Returns: 

861 bool: True if the lock was successfully acquired; False otherwise. 

862 """ 

863 holder = holder or self.generate_default_holder() 

864 if self.lock_manager is None: 

865 raise ConfigurationError( 

866 "Result store is not configured with a lock manager. Please set" 

867 " a lock manager when creating the result store to enable locking." 

868 ) 

869 return self.lock_manager.acquire_lock(key, holder, timeout) 

870 

871 async def aacquire_lock( 1a

872 self, key: str, holder: str | None = None, timeout: float | None = None 

873 ) -> bool: 

874 """ 

875 Acquire a lock for a result record. 

876 

877 Args: 

878 key: The key to acquire the lock for. 

879 holder: The holder of the lock. If not provided, a default holder based on the 

880 current host, process, and thread will be used. 

881 timeout: The timeout for the lock. 

882 

883 Returns: 

884 bool: True if the lock was successfully acquired; False otherwise. 

885 """ 

886 holder = holder or self.generate_default_holder() 

887 if self.lock_manager is None: 

888 raise ConfigurationError( 

889 "Result store is not configured with a lock manager. Please set" 

890 " a lock manager when creating the result store to enable locking." 

891 ) 

892 

893 return await self.lock_manager.aacquire_lock(key, holder, timeout) 

894 

895 def release_lock(self, key: str, holder: str | None = None) -> None: 1a

896 """ 

897 Release a lock for a result record. 

898 

899 Args: 

900 key: The key to release the lock for. 

901 holder: The holder of the lock. Must match the holder that acquired the lock. 

902 If not provided, a default holder based on the current host, process, and 

903 thread will be used. 

904 """ 

905 holder = holder or self.generate_default_holder() 

906 if self.lock_manager is None: 

907 raise ConfigurationError( 

908 "Result store is not configured with a lock manager. Please set" 

909 " a lock manager when creating the result store to enable locking." 

910 ) 

911 return self.lock_manager.release_lock(key, holder) 

912 

913 def is_locked(self, key: str) -> bool: 1a

914 """ 

915 Check if a result record is locked. 

916 """ 

917 if self.lock_manager is None: 

918 raise ConfigurationError( 

919 "Result store is not configured with a lock manager. Please set" 

920 " a lock manager when creating the result store to enable locking." 

921 ) 

922 return self.lock_manager.is_locked(key) 

923 

924 def is_lock_holder(self, key: str, holder: str | None = None) -> bool: 1a

925 """ 

926 Check if the current holder is the lock holder for the result record. 

927 

928 Args: 

929 key: The key to check the lock for. 

930 holder: The holder of the lock. If not provided, a default holder based on the 

931 current host, process, and thread will be used. 

932 

933 Returns: 

934 bool: True if the current holder is the lock holder; False otherwise. 

935 """ 

936 holder = holder or self.generate_default_holder() 

937 if self.lock_manager is None: 

938 raise ConfigurationError( 

939 "Result store is not configured with a lock manager. Please set" 

940 " a lock manager when creating the result store to enable locking." 

941 ) 

942 return self.lock_manager.is_lock_holder(key, holder) 

943 

944 def wait_for_lock(self, key: str, timeout: float | None = None) -> bool: 1a

945 """ 

946 Wait for the corresponding transaction record to become free. 

947 """ 

948 if self.lock_manager is None: 

949 raise ConfigurationError( 

950 "Result store is not configured with a lock manager. Please set" 

951 " a lock manager when creating the result store to enable locking." 

952 ) 

953 return self.lock_manager.wait_for_lock(key, timeout) 

954 

955 async def await_for_lock(self, key: str, timeout: float | None = None) -> bool: 1a

956 """ 

957 Wait for the corresponding transaction record to become free. 

958 """ 

959 if self.lock_manager is None: 

960 raise ConfigurationError( 

961 "Result store is not configured with a lock manager. Please set" 

962 " a lock manager when creating the result store to enable locking." 

963 ) 

964 return await self.lock_manager.await_for_lock(key, timeout) 

965 

966 

967def get_result_store() -> ResultStore: 1a

968 """ 

969 Get the current result store. 

970 """ 

971 from prefect.context import get_run_context 

972 

973 try: 

974 run_context = get_run_context() 

975 except MissingContextError: 

976 result_store = ResultStore() 

977 else: 

978 result_store = run_context.result_store 

979 return result_store