Coverage for /usr/local/lib/python3.12/site-packages/prefect/results.py: 18%
386 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import asyncio 1a
4import os 1a
5import socket 1a
6import threading 1a
7import uuid 1a
8from functools import partial 1a
9from operator import methodcaller 1a
10from pathlib import Path 1a
11from typing import ( 1a
12 TYPE_CHECKING,
13 Annotated,
14 Any,
15 Callable,
16 ClassVar,
17 Optional,
18 TypeVar,
19 Union,
20)
21from uuid import UUID 1a
23from cachetools import LRUCache 1a
24from pydantic import ( 1a
25 BaseModel,
26 ConfigDict,
27 Discriminator,
28 Field,
29 Tag,
30)
31from typing_extensions import ParamSpec, Self 1a
33import prefect 1a
34import prefect.types._datetime 1a
35from prefect._internal.compatibility.async_dispatch import async_dispatch 1a
36from prefect._internal.compatibility.blocks import call_explicitly_async_block_method 1a
37from prefect._internal.concurrency.event_loop import get_running_loop 1a
38from prefect._result_records import R, ResultRecord, ResultRecordMetadata 1a
39from prefect.blocks.core import Block 1a
40from prefect.exceptions import ( 1a
41 ConfigurationError,
42 MissingContextError,
43)
44from prefect.filesystems import ( 1a
45 LocalFileSystem,
46 NullFileSystem,
47 WritableFileSystem,
48)
49from prefect.locking.protocol import LockManager 1a
50from prefect.logging import get_logger 1a
51from prefect.serializers import Serializer 1a
52from prefect.settings.context import get_current_settings 1a
53from prefect.types import DateTime 1a
54from prefect.utilities.annotations import NotSet 1a
55from prefect.utilities.asyncutils import sync_compatible 1a
57if TYPE_CHECKING: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true1a
58 import logging
60 from prefect import Flow, Task
61 from prefect.transactions import IsolationLevel
64ResultStorage = Union[WritableFileSystem, str] 1a
65ResultSerializer = Union[Serializer, str] 1a
66LITERAL_TYPES: set[type] = {type(None), bool, UUID} 1a
69def DEFAULT_STORAGE_KEY_FN() -> str: 1a
70 return uuid.uuid4().hex
73logger: "logging.Logger" = get_logger("results") 1a
74P = ParamSpec("P") 1a
76_default_storages: dict[tuple[str, str], WritableFileSystem] = {} 1a
79async def aget_default_result_storage() -> WritableFileSystem: 1a
80 """
81 Generate a default file system for result storage.
82 """
83 settings = get_current_settings()
84 default_block = settings.results.default_storage_block
85 basepath = settings.results.local_storage_path
87 cache_key = (str(default_block), str(basepath))
89 if cache_key in _default_storages:
90 return _default_storages[cache_key]
92 if default_block is not None:
93 storage = await aresolve_result_storage(default_block)
94 else:
95 # Use the local file system
96 storage = LocalFileSystem(basepath=str(basepath))
98 _default_storages[cache_key] = storage
99 return storage
102@async_dispatch(aget_default_result_storage) 1a
103def get_default_result_storage() -> WritableFileSystem: 1a
104 """
105 Generate a default file system for result storage.
106 """
107 settings = get_current_settings()
108 default_block = settings.results.default_storage_block
109 basepath = settings.results.local_storage_path
111 cache_key = (str(default_block), str(basepath))
113 if cache_key in _default_storages:
114 return _default_storages[cache_key]
116 if default_block is not None:
117 storage = resolve_result_storage(default_block, _sync=True)
118 if TYPE_CHECKING:
119 assert isinstance(storage, WritableFileSystem)
120 else:
121 # Use the local file system
122 storage = LocalFileSystem(basepath=str(basepath))
124 _default_storages[cache_key] = storage
125 return storage
128async def aresolve_result_storage( 1a
129 result_storage: ResultStorage | UUID | Path,
130) -> WritableFileSystem:
131 """
132 Resolve one of the valid `ResultStorage` input types into a saved block
133 document id and an instance of the block.
134 """
135 from prefect.client.orchestration import get_client
137 client = get_client()
138 storage_block: WritableFileSystem
139 if isinstance(result_storage, Block):
140 storage_block = result_storage
141 elif isinstance(result_storage, Path):
142 storage_block = LocalFileSystem(basepath=str(result_storage))
143 elif isinstance(result_storage, str):
144 block = await Block.aload(result_storage, client=client)
145 if TYPE_CHECKING:
146 assert isinstance(block, WritableFileSystem)
147 storage_block = block
148 elif isinstance(result_storage, UUID): # pyright: ignore[reportUnnecessaryIsInstance]
149 block_document = await client.read_block_document(result_storage)
150 from_block_document = methodcaller("_from_block_document", block_document)
151 block = from_block_document(Block)
152 if TYPE_CHECKING:
153 assert isinstance(block, WritableFileSystem)
154 storage_block = block
155 else:
156 raise TypeError(
157 "Result storage must be one of the following types: 'UUID', 'Block', "
158 f"'str'. Got unsupported type {type(result_storage).__name__!r}."
159 )
161 return storage_block
164@async_dispatch(aresolve_result_storage) 1a
165def resolve_result_storage( 1a
166 result_storage: ResultStorage | UUID | Path,
167) -> WritableFileSystem:
168 """
169 Resolve one of the valid `ResultStorage` input types into a saved block
170 document id and an instance of the block.
171 """
172 from prefect.client.orchestration import get_client
174 client = get_client(sync_client=True)
175 storage_block: WritableFileSystem
176 if isinstance(result_storage, Block):
177 storage_block = result_storage
178 elif isinstance(result_storage, Path):
179 storage_block = LocalFileSystem(basepath=str(result_storage))
180 elif isinstance(result_storage, str):
181 block = Block.load(result_storage, _sync=True)
182 if TYPE_CHECKING:
183 assert isinstance(block, WritableFileSystem)
184 storage_block = block
185 elif isinstance(result_storage, UUID): # pyright: ignore[reportUnnecessaryIsInstance]
186 block_document = client.read_block_document(result_storage)
187 from_block_document = methodcaller("_from_block_document", block_document)
188 block = from_block_document(Block)
189 if TYPE_CHECKING:
190 assert isinstance(block, WritableFileSystem)
191 storage_block = block
192 else:
193 raise TypeError(
194 "Result storage must be one of the following types: 'UUID', 'Block', "
195 f"'str'. Got unsupported type {type(result_storage).__name__!r}."
196 )
197 return storage_block
200def resolve_serializer(serializer: ResultSerializer) -> Serializer: 1a
201 """
202 Resolve one of the valid `ResultSerializer` input types into a serializer
203 instance.
204 """
205 if isinstance(serializer, Serializer):
206 return serializer
207 elif isinstance(serializer, str): # pyright: ignore[reportUnnecessaryIsInstance]
208 return Serializer(type=serializer)
209 else:
210 raise TypeError(
211 "Result serializer must be one of the following types: 'Serializer', "
212 f"'str'. Got unsupported type {type(serializer).__name__!r}."
213 )
216async def get_or_create_default_task_scheduling_storage() -> ResultStorage: 1a
217 """
218 Generate a default file system for background task parameter/result storage.
219 """
220 settings = get_current_settings()
221 default_block = settings.tasks.scheduling.default_storage_block
223 if default_block is not None:
224 block = await Block.aload(default_block)
225 if TYPE_CHECKING:
226 assert isinstance(block, WritableFileSystem)
227 return block
229 # otherwise, use the local file system
230 basepath = settings.results.local_storage_path
231 return LocalFileSystem(basepath=str(basepath))
234def get_default_result_serializer() -> Serializer: 1a
235 """
236 Generate a default file system for result storage.
237 """
238 settings = get_current_settings()
239 return resolve_serializer(settings.results.default_serializer)
242def get_default_persist_setting() -> bool: 1a
243 """
244 Return the default option for result persistence.
245 """
246 settings = get_current_settings()
247 return settings.results.persist_by_default
250def get_default_persist_setting_for_tasks() -> bool: 1a
251 """
252 Return the default option for result persistence for tasks.
253 """
254 settings = get_current_settings()
255 return (
256 settings.tasks.default_persist_result
257 if settings.tasks.default_persist_result is not None
258 else settings.results.persist_by_default
259 )
262def should_persist_result() -> bool: 1a
263 """
264 Return the default option for result persistence determined by the current run context.
266 If there is no current run context, the value of `results.persist_by_default` on the
267 current settings will be returned.
268 """
269 from prefect.context import FlowRunContext, TaskRunContext
271 task_run_context = TaskRunContext.get()
272 if task_run_context is not None:
273 return task_run_context.persist_result
274 flow_run_context = FlowRunContext.get()
275 if flow_run_context is not None:
276 return flow_run_context.persist_result
278 return get_default_persist_setting()
281def _format_user_supplied_storage_key(key: str) -> str: 1a
282 # Note here we are pinning to task runs since flow runs do not support storage keys
283 # yet; we'll need to split logic in the future or have two separate functions
284 runtime_vars = {key: getattr(prefect.runtime, key) for key in dir(prefect.runtime)}
285 return key.format(**runtime_vars, parameters=prefect.runtime.task_run.parameters)
288T = TypeVar("T") 1a
291def default_cache() -> LRUCache[str, "ResultRecord[Any]"]: 1a
292 return LRUCache(maxsize=1000)
295def result_storage_discriminator(x: Any) -> str: 1a
296 if isinstance(x, dict):
297 if "block_type_slug" in x:
298 return "WritableFileSystem"
299 else:
300 return "NullFileSystem"
301 if isinstance(x, WritableFileSystem):
302 return "WritableFileSystem"
303 if isinstance(x, NullFileSystem):
304 return "NullFileSystem"
305 return "None"
308class ResultStore(BaseModel): 1a
309 """
310 Manages the storage and retrieval of results.
312 Attributes:
313 result_storage: The storage for result records. If not provided, the default
314 result storage will be used.
315 metadata_storage: The storage for result record metadata. If not provided,
316 the metadata will be stored alongside the results.
317 lock_manager: The lock manager to use for locking result records. If not provided,
318 the store cannot be used in transactions with the SERIALIZABLE isolation level.
319 cache_result_in_memory: Whether to cache results in memory.
320 serializer: The serializer to use for results.
321 storage_key_fn: The function to generate storage keys.
322 """
324 model_config: ClassVar[ConfigDict] = ConfigDict(arbitrary_types_allowed=True) 1a
326 result_storage: Optional[WritableFileSystem] = Field(default=None) 1a
327 metadata_storage: Annotated[ 1a
328 Union[
329 Annotated[WritableFileSystem, Tag("WritableFileSystem")],
330 Annotated[NullFileSystem, Tag("NullFileSystem")],
331 Annotated[None, Tag("None")],
332 ],
333 Discriminator(result_storage_discriminator),
334 ] = Field(default=None)
335 lock_manager: Optional[LockManager] = Field(default=None) 1a
336 cache_result_in_memory: bool = Field(default=True) 1a
337 serializer: Serializer = Field(default_factory=get_default_result_serializer) 1a
338 storage_key_fn: Callable[[], str] = Field(default=DEFAULT_STORAGE_KEY_FN) 1a
339 cache: LRUCache[str, "ResultRecord[Any]"] = Field(default_factory=default_cache) 1a
341 @property 1a
342 def result_storage_block_id(self) -> UUID | None: 1a
343 if self.result_storage is None:
344 return None
345 return getattr(self.result_storage, "_block_document_id", None)
347 @classmethod 1a
348 async def _from_metadata(cls, metadata: ResultRecordMetadata) -> "ResultRecord[R]": 1a
349 """
350 Create a result record from metadata.
352 Will use the result record metadata to fetch data via a result store.
354 Args:
355 metadata: The metadata to create the result record from.
357 Returns:
358 ResultRecord: The result record.
359 """
360 if metadata.storage_block_id is None:
361 storage_block = None
362 else:
363 storage_block = await aresolve_result_storage(metadata.storage_block_id)
364 store = cls(result_storage=storage_block, serializer=metadata.serializer)
365 if metadata.storage_key is None:
366 raise ValueError(
367 "storage_key is required to hydrate a result record from metadata"
368 )
369 result = await store.aread(metadata.storage_key)
370 return result
372 @sync_compatible 1a
373 async def update_for_flow(self, flow: "Flow[..., Any]") -> Self: 1a
374 """
375 Create a new result store for a flow with updated settings.
377 Args:
378 flow: The flow to update the result store for.
380 Returns:
381 An updated result store.
382 """
383 update: dict[str, Any] = {}
384 update["cache_result_in_memory"] = flow.cache_result_in_memory
385 if flow.result_storage is not None:
386 update["result_storage"] = await aresolve_result_storage(
387 flow.result_storage
388 )
389 if flow.result_serializer is not None:
390 update["serializer"] = resolve_serializer(flow.result_serializer)
391 if self.result_storage is None and update.get("result_storage") is None:
392 update["result_storage"] = await aget_default_result_storage()
393 update["metadata_storage"] = NullFileSystem()
394 return self.model_copy(update=update)
396 @sync_compatible 1a
397 async def update_for_task(self: Self, task: "Task[P, R]") -> Self: 1a
398 """
399 Create a new result store for a task.
401 Args:
402 task: The task to update the result store for.
404 Returns:
405 An updated result store.
406 """
407 from prefect.transactions import get_transaction
409 update: dict[str, Any] = {}
410 update["cache_result_in_memory"] = task.cache_result_in_memory
411 if task.result_storage is not None:
412 update["result_storage"] = await aresolve_result_storage(
413 task.result_storage
414 )
415 if task.result_serializer is not None:
416 update["serializer"] = resolve_serializer(task.result_serializer)
417 if task.result_storage_key is not None:
418 update["storage_key_fn"] = partial(
419 _format_user_supplied_storage_key, task.result_storage_key
420 )
422 # use the lock manager from a parent transaction if it exists
423 if (current_txn := get_transaction()) and isinstance(
424 current_txn.store, ResultStore
425 ):
426 update["lock_manager"] = current_txn.store.lock_manager
428 from prefect.cache_policies import CachePolicy
430 if isinstance(task.cache_policy, CachePolicy):
431 if task.cache_policy.key_storage is not None:
432 storage = task.cache_policy.key_storage
433 if isinstance(storage, str) and not len(storage.split("/")) == 2:
434 storage = Path(storage)
435 update["metadata_storage"] = await aresolve_result_storage(storage)
436 # if the cache policy has a lock manager, it takes precedence over the parent transaction
437 if task.cache_policy.lock_manager is not None:
438 update["lock_manager"] = task.cache_policy.lock_manager
440 if self.result_storage is None and update.get("result_storage") is None:
441 update["result_storage"] = await aget_default_result_storage()
442 if (
443 isinstance(self.metadata_storage, NullFileSystem)
444 and update.get("metadata_storage", NotSet) is NotSet
445 ):
446 update["metadata_storage"] = None
447 return self.model_copy(update=update)
449 @staticmethod 1a
450 def generate_default_holder() -> str: 1a
451 """
452 Generate a default holder string using hostname, PID, and thread ID.
454 Returns:
455 str: A unique identifier string.
456 """
457 current_loop = get_running_loop()
458 hostname = socket.gethostname()
459 pid = os.getpid()
460 thread_name = threading.current_thread().name
461 thread_id = threading.get_ident()
462 if current_loop:
463 current_task = asyncio.current_task()
464 if current_task:
465 # include the task id to ensure uniqueness because there might be
466 # multiple tasks running in the same thread
467 return f"{hostname}:{pid}:{thread_id}:{thread_name}:{id(current_task)}"
468 return f"{hostname}:{pid}:{thread_id}:{thread_name}"
470 @sync_compatible 1a
471 async def _exists(self, key: str) -> bool: 1a
472 """
473 Check if a result record exists in storage.
475 Args:
476 key: The key to check for the existence of a result record.
478 Returns:
479 bool: True if the result record exists, False otherwise.
480 """
481 if self.metadata_storage is not None:
482 # TODO: Add an `exists` method to commonly used storage blocks
483 # so the entire payload doesn't need to be read
484 try:
485 metadata_content = await call_explicitly_async_block_method(
486 self.metadata_storage, "read_path", (key,), {}
487 )
488 if metadata_content is None:
489 return False
490 metadata = ResultRecordMetadata.load_bytes(metadata_content)
492 except Exception:
493 return False
494 else:
495 try:
496 content = await call_explicitly_async_block_method(
497 self.result_storage, "read_path", (key,), {}
498 )
499 if content is None:
500 return False
501 record: ResultRecord[Any] = ResultRecord.deserialize(content)
502 metadata = record.metadata
503 except Exception:
504 return False
506 if metadata.expiration:
507 # if the result has an expiration,
508 # check if it is still in the future
509 exists = metadata.expiration > prefect.types._datetime.now("UTC")
510 else:
511 exists = True
512 return exists
514 def exists(self, key: str) -> bool: 1a
515 """
516 Check if a result record exists in storage.
518 Args:
519 key: The key to check for the existence of a result record.
521 Returns:
522 bool: True if the result record exists, False otherwise.
523 """
524 return self._exists(key=key, _sync=True)
526 async def aexists(self, key: str) -> bool: 1a
527 """
528 Check if a result record exists in storage.
530 Args:
531 key: The key to check for the existence of a result record.
533 Returns:
534 bool: True if the result record exists, False otherwise.
535 """
536 return await self._exists(key=key, _sync=False)
538 def _resolved_key_path(self, key: str) -> str: 1a
539 if self.result_storage_block_id is None and (
540 _resolve_path := getattr(self.result_storage, "_resolve_path", None)
541 ):
542 path_key = _resolve_path(key)
543 if path_key is not None:
544 return str(_resolve_path(key))
545 else:
546 return key
547 return key
549 @sync_compatible 1a
550 async def _read(self, key: str, holder: str) -> "ResultRecord[Any]": 1a
551 """
552 Read a result record from storage.
554 This is the internal implementation. Use `read` or `aread` for synchronous and
555 asynchronous result reading respectively.
557 Args:
558 key: The key to read the result record from.
559 holder: The holder of the lock if a lock was set on the record.
561 Returns:
562 A result record.
563 """
565 if self.lock_manager is not None and not self.is_lock_holder(key, holder):
566 await self.await_for_lock(key)
568 resolved_key_path = self._resolved_key_path(key)
570 if resolved_key_path in self.cache:
571 return self.cache[resolved_key_path]
573 if self.result_storage is None:
574 self.result_storage = await aget_default_result_storage()
576 if self.metadata_storage is not None:
577 metadata_content = await call_explicitly_async_block_method(
578 self.metadata_storage,
579 "read_path",
580 (key,),
581 {},
582 )
583 metadata = ResultRecordMetadata.load_bytes(metadata_content)
584 assert metadata.storage_key is not None, (
585 "Did not find storage key in metadata"
586 )
587 result_content = await call_explicitly_async_block_method(
588 self.result_storage,
589 "read_path",
590 (metadata.storage_key,),
591 {},
592 )
593 result_record: ResultRecord[Any] = (
594 ResultRecord.deserialize_from_result_and_metadata(
595 result=result_content, metadata=metadata_content
596 )
597 )
598 else:
599 content = await call_explicitly_async_block_method(
600 self.result_storage,
601 "read_path",
602 (key,),
603 {},
604 )
605 result_record: ResultRecord[Any] = ResultRecord.deserialize(
606 content, backup_serializer=self.serializer
607 )
609 if self.cache_result_in_memory:
610 self.cache[resolved_key_path] = result_record
611 return result_record
613 def read( 1a
614 self,
615 key: str,
616 holder: str | None = None,
617 ) -> "ResultRecord[Any]":
618 """
619 Read a result record from storage.
621 Args:
622 key: The key to read the result record from.
623 holder: The holder of the lock if a lock was set on the record.
625 Returns:
626 A result record.
627 """
628 holder = holder or self.generate_default_holder()
629 return self._read(key=key, holder=holder, _sync=True)
631 async def aread( 1a
632 self,
633 key: str,
634 holder: str | None = None,
635 ) -> "ResultRecord[Any]":
636 """
637 Read a result record from storage.
639 Args:
640 key: The key to read the result record from.
641 holder: The holder of the lock if a lock was set on the record.
643 Returns:
644 A result record.
645 """
646 holder = holder or self.generate_default_holder()
647 return await self._read(key=key, holder=holder, _sync=False)
649 def create_result_record( 1a
650 self,
651 obj: Any,
652 key: str | None = None,
653 expiration: DateTime | None = None,
654 ) -> "ResultRecord[Any]":
655 """
656 Create a result record.
658 Args:
659 key: The key to create the result record for.
660 obj: The object to create the result record for.
661 expiration: The expiration time for the result record.
662 """
663 key = key or self.storage_key_fn()
665 if self.result_storage is None:
666 self.result_storage = get_default_result_storage(_sync=True)
667 if TYPE_CHECKING:
668 assert isinstance(self.result_storage, WritableFileSystem)
670 if self.result_storage_block_id is None:
671 if _resolve_path := getattr(self.result_storage, "_resolve_path", None):
672 path_key = _resolve_path(key)
673 if path_key is not None:
674 key = str(_resolve_path(key))
676 return ResultRecord(
677 result=obj,
678 metadata=ResultRecordMetadata(
679 serializer=self.serializer,
680 expiration=expiration,
681 storage_key=key,
682 storage_block_id=self.result_storage_block_id,
683 ),
684 )
686 def write( 1a
687 self,
688 obj: Any,
689 key: str | None = None,
690 expiration: DateTime | None = None,
691 holder: str | None = None,
692 ) -> None:
693 """
694 Write a result to storage.
696 Handles the creation of a `ResultRecord` and its serialization to storage.
698 Args:
699 key: The key to write the result record to.
700 obj: The object to write to storage.
701 expiration: The expiration time for the result record.
702 holder: The holder of the lock if a lock was set on the record.
703 """
704 holder = holder or self.generate_default_holder()
705 result_record = self.create_result_record(
706 key=key, obj=obj, expiration=expiration
707 )
708 return self.persist_result_record(
709 result_record=result_record,
710 holder=holder,
711 )
713 async def awrite( 1a
714 self,
715 obj: Any,
716 key: str | None = None,
717 expiration: DateTime | None = None,
718 holder: str | None = None,
719 ) -> None:
720 """
721 Write a result to storage.
723 Args:
724 key: The key to write the result record to.
725 obj: The object to write to storage.
726 expiration: The expiration time for the result record.
727 holder: The holder of the lock if a lock was set on the record.
728 """
729 holder = holder or self.generate_default_holder()
730 return await self.apersist_result_record(
731 result_record=self.create_result_record(
732 key=key, obj=obj, expiration=expiration
733 ),
734 holder=holder,
735 )
737 @sync_compatible 1a
738 async def _persist_result_record( 1a
739 self, result_record: "ResultRecord[Any]", holder: str
740 ) -> None:
741 """
742 Persist a result record to storage.
744 Args:
745 result_record: The result record to persist.
746 holder: The holder of the lock if a lock was set on the record.
747 """
748 assert result_record.metadata.storage_key is not None, (
749 "Storage key is required on result record"
750 )
752 key = result_record.metadata.storage_key
753 if result_record.metadata.storage_block_id is None:
754 basepath = (
755 _resolve_path("")
756 if (
757 _resolve_path := getattr(self.result_storage, "_resolve_path", None)
758 )
759 else Path(".").resolve()
760 )
761 base_key = key if basepath is None else str(Path(key).relative_to(basepath))
762 else:
763 base_key = key
764 if (
765 self.lock_manager is not None
766 and self.is_locked(base_key)
767 and not self.is_lock_holder(base_key, holder)
768 ):
769 raise RuntimeError(
770 f"Cannot write to result record with key {base_key} because it is locked by "
771 f"another holder."
772 )
773 if self.result_storage is None:
774 self.result_storage = await aget_default_result_storage()
776 # If metadata storage is configured, write result and metadata separately
777 if self.metadata_storage is not None:
778 await call_explicitly_async_block_method(
779 self.result_storage,
780 "write_path",
781 (result_record.metadata.storage_key,),
782 {"content": result_record.serialize_result()},
783 )
784 await call_explicitly_async_block_method(
785 self.metadata_storage,
786 "write_path",
787 (base_key,),
788 {"content": result_record.serialize_metadata()},
789 )
790 # Otherwise, write the result metadata and result together
791 else:
792 await call_explicitly_async_block_method(
793 self.result_storage,
794 "write_path",
795 (result_record.metadata.storage_key,),
796 {"content": result_record.serialize()},
797 )
798 if self.cache_result_in_memory:
799 self.cache[key] = result_record
801 def persist_result_record( 1a
802 self, result_record: "ResultRecord[Any]", holder: str | None = None
803 ) -> None:
804 """
805 Persist a result record to storage.
807 Args:
808 result_record: The result record to persist.
809 """
810 holder = holder or self.generate_default_holder()
811 return self._persist_result_record(
812 result_record=result_record, holder=holder, _sync=True
813 )
815 async def apersist_result_record( 1a
816 self, result_record: "ResultRecord[Any]", holder: str | None = None
817 ) -> None:
818 """
819 Persist a result record to storage.
821 Args:
822 result_record: The result record to persist.
823 """
824 holder = holder or self.generate_default_holder()
825 return await self._persist_result_record(
826 result_record=result_record, holder=holder, _sync=False
827 )
829 def supports_isolation_level(self, level: "IsolationLevel") -> bool: 1a
830 """
831 Check if the result store supports a given isolation level.
833 Args:
834 level: The isolation level to check.
836 Returns:
837 bool: True if the isolation level is supported, False otherwise.
838 """
839 from prefect.transactions import IsolationLevel
841 if level == IsolationLevel.READ_COMMITTED:
842 return True
843 elif level == IsolationLevel.SERIALIZABLE:
844 return self.lock_manager is not None
845 else:
846 raise ValueError(f"Unsupported isolation level: {level}")
848 def acquire_lock( 1a
849 self, key: str, holder: str | None = None, timeout: float | None = None
850 ) -> bool:
851 """
852 Acquire a lock for a result record.
854 Args:
855 key: The key to acquire the lock for.
856 holder: The holder of the lock. If not provided, a default holder based on the
857 current host, process, and thread will be used.
858 timeout: The timeout for the lock.
860 Returns:
861 bool: True if the lock was successfully acquired; False otherwise.
862 """
863 holder = holder or self.generate_default_holder()
864 if self.lock_manager is None:
865 raise ConfigurationError(
866 "Result store is not configured with a lock manager. Please set"
867 " a lock manager when creating the result store to enable locking."
868 )
869 return self.lock_manager.acquire_lock(key, holder, timeout)
871 async def aacquire_lock( 1a
872 self, key: str, holder: str | None = None, timeout: float | None = None
873 ) -> bool:
874 """
875 Acquire a lock for a result record.
877 Args:
878 key: The key to acquire the lock for.
879 holder: The holder of the lock. If not provided, a default holder based on the
880 current host, process, and thread will be used.
881 timeout: The timeout for the lock.
883 Returns:
884 bool: True if the lock was successfully acquired; False otherwise.
885 """
886 holder = holder or self.generate_default_holder()
887 if self.lock_manager is None:
888 raise ConfigurationError(
889 "Result store is not configured with a lock manager. Please set"
890 " a lock manager when creating the result store to enable locking."
891 )
893 return await self.lock_manager.aacquire_lock(key, holder, timeout)
895 def release_lock(self, key: str, holder: str | None = None) -> None: 1a
896 """
897 Release a lock for a result record.
899 Args:
900 key: The key to release the lock for.
901 holder: The holder of the lock. Must match the holder that acquired the lock.
902 If not provided, a default holder based on the current host, process, and
903 thread will be used.
904 """
905 holder = holder or self.generate_default_holder()
906 if self.lock_manager is None:
907 raise ConfigurationError(
908 "Result store is not configured with a lock manager. Please set"
909 " a lock manager when creating the result store to enable locking."
910 )
911 return self.lock_manager.release_lock(key, holder)
913 def is_locked(self, key: str) -> bool: 1a
914 """
915 Check if a result record is locked.
916 """
917 if self.lock_manager is None:
918 raise ConfigurationError(
919 "Result store is not configured with a lock manager. Please set"
920 " a lock manager when creating the result store to enable locking."
921 )
922 return self.lock_manager.is_locked(key)
924 def is_lock_holder(self, key: str, holder: str | None = None) -> bool: 1a
925 """
926 Check if the current holder is the lock holder for the result record.
928 Args:
929 key: The key to check the lock for.
930 holder: The holder of the lock. If not provided, a default holder based on the
931 current host, process, and thread will be used.
933 Returns:
934 bool: True if the current holder is the lock holder; False otherwise.
935 """
936 holder = holder or self.generate_default_holder()
937 if self.lock_manager is None:
938 raise ConfigurationError(
939 "Result store is not configured with a lock manager. Please set"
940 " a lock manager when creating the result store to enable locking."
941 )
942 return self.lock_manager.is_lock_holder(key, holder)
944 def wait_for_lock(self, key: str, timeout: float | None = None) -> bool: 1a
945 """
946 Wait for the corresponding transaction record to become free.
947 """
948 if self.lock_manager is None:
949 raise ConfigurationError(
950 "Result store is not configured with a lock manager. Please set"
951 " a lock manager when creating the result store to enable locking."
952 )
953 return self.lock_manager.wait_for_lock(key, timeout)
955 async def await_for_lock(self, key: str, timeout: float | None = None) -> bool: 1a
956 """
957 Wait for the corresponding transaction record to become free.
958 """
959 if self.lock_manager is None:
960 raise ConfigurationError(
961 "Result store is not configured with a lock manager. Please set"
962 " a lock manager when creating the result store to enable locking."
963 )
964 return await self.lock_manager.await_for_lock(key, timeout)
967def get_result_store() -> ResultStore: 1a
968 """
969 Get the current result store.
970 """
971 from prefect.context import get_run_context
973 try:
974 run_context = get_run_context()
975 except MissingContextError:
976 result_store = ResultStore()
977 else:
978 result_store = run_context.result_store
979 return result_store