Coverage for /usr/local/lib/python3.12/site-packages/prefect/_result_records.py: 34%
96 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import inspect 1a
4import uuid 1a
5from typing import ( 1a
6 TYPE_CHECKING,
7 Any,
8 Generic,
9 Optional,
10 TypeVar,
11 Union,
12 cast,
13)
14from uuid import UUID 1a
16from pydantic import ( 1a
17 BaseModel,
18 Field,
19 ValidationError,
20 model_validator,
21)
23import prefect 1a
24from prefect.exceptions import ( 1a
25 SerializationError,
26)
27from prefect.serializers import PickleSerializer, Serializer 1a
28from prefect.types import DateTime 1a
30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a
31 pass
34ResultSerializer = Union[Serializer, str] 1a
35LITERAL_TYPES: set[type] = {type(None), bool, UUID} 1a
36R = TypeVar("R") 1a
39class ResultRecordMetadata(BaseModel): 1a
40 """
41 Metadata for a result record.
42 """
44 storage_key: Optional[str] = Field( 1a
45 default=None
46 ) # optional for backwards compatibility
47 expiration: Optional[DateTime] = Field(default=None) 1a
48 serializer: Serializer = Field(default_factory=PickleSerializer) 1a
49 prefect_version: str = Field(default=prefect.__version__) 1a
50 storage_block_id: Optional[uuid.UUID] = Field(default=None) 1a
52 def dump_bytes(self) -> bytes: 1a
53 """
54 Serialize the metadata to bytes.
56 Returns:
57 bytes: the serialized metadata
58 """
59 return self.model_dump_json(serialize_as_any=True).encode()
61 @classmethod 1a
62 def load_bytes(cls, data: bytes) -> "ResultRecordMetadata": 1a
63 """
64 Deserialize metadata from bytes.
66 Args:
67 data: the serialized metadata
69 Returns:
70 ResultRecordMetadata: the deserialized metadata
71 """
72 return cls.model_validate_json(data)
74 def __eq__(self, other: Any) -> bool: 1a
75 if not isinstance(other, ResultRecordMetadata):
76 return False
77 return (
78 self.storage_key == other.storage_key
79 and self.expiration == other.expiration
80 and self.serializer == other.serializer
81 and self.prefect_version == other.prefect_version
82 and self.storage_block_id == other.storage_block_id
83 )
86class ResultRecord(BaseModel, Generic[R]): 1a
87 """
88 A record of a result.
89 """
91 metadata: ResultRecordMetadata 1a
92 result: R 1a
94 @property 1a
95 def expiration(self) -> DateTime | None: 1a
96 return self.metadata.expiration
98 @property 1a
99 def serializer(self) -> Serializer: 1a
100 return self.metadata.serializer
102 def serialize_result(self) -> bytes: 1a
103 try:
104 data = self.serializer.dumps(self.result)
105 except Exception as exc:
106 extra_info = (
107 'You can try a different serializer (e.g. result_serializer="json") '
108 "or disabling persistence (persist_result=False) for this flow or task."
109 )
110 # check if this is a known issue with cloudpickle and pydantic
111 # and add extra information to help the user recover
113 if (
114 isinstance(exc, TypeError)
115 and isinstance(self.result, BaseModel)
116 and str(exc).startswith("cannot pickle")
117 ):
118 try:
119 from IPython.core.getipython import get_ipython
121 if get_ipython() is not None:
122 extra_info = inspect.cleandoc(
123 """
124 This is a known issue in Pydantic that prevents
125 locally-defined (non-imported) models from being
126 serialized by cloudpickle in IPython/Jupyter
127 environments. Please see
128 https://github.com/pydantic/pydantic/issues/8232 for
129 more information. To fix the issue, either: (1) move
130 your Pydantic class definition to an importable
131 location, (2) use the JSON serializer for your flow
132 or task (`result_serializer="json"`), or (3)
133 disable result persistence for your flow or task
134 (`persist_result=False`).
135 """
136 ).replace("\n", " ")
137 except ImportError:
138 pass
139 raise SerializationError(
140 f"Failed to serialize object of type {type(self.result).__name__!r} with "
141 f"serializer {self.serializer.type!r}. {extra_info}"
142 ) from exc
144 return data
146 @model_validator(mode="before") 1a
147 @classmethod 1a
148 def coerce_old_format(cls, value: dict[str, Any] | Any) -> dict[str, Any]: 1a
149 if isinstance(value, dict):
150 if TYPE_CHECKING: # TODO: # isintance doesn't accept generic parameters
151 value = cast(dict[str, Any], value)
152 if "data" in value:
153 value["result"] = value.pop("data")
154 if "metadata" not in value:
155 value["metadata"] = {}
156 if "expiration" in value:
157 value["metadata"]["expiration"] = value.pop("expiration")
158 if "serializer" in value:
159 value["metadata"]["serializer"] = value.pop("serializer")
160 if "prefect_version" in value:
161 value["metadata"]["prefect_version"] = value.pop("prefect_version")
162 return value
164 def serialize_metadata(self) -> bytes: 1a
165 return self.metadata.dump_bytes()
167 def serialize( 1a
168 self,
169 ) -> bytes:
170 """
171 Serialize the record to bytes.
173 Returns:
174 bytes: the serialized record
176 """
177 return (
178 self.model_copy(update={"result": self.serialize_result()})
179 .model_dump_json(serialize_as_any=True)
180 .encode()
181 )
183 @classmethod 1a
184 def deserialize( 1a
185 cls, data: bytes, backup_serializer: Serializer | None = None
186 ) -> "ResultRecord[R]":
187 """
188 Deserialize a record from bytes.
190 Args:
191 data: the serialized record
192 backup_serializer: The serializer to use to deserialize the result record. Only
193 necessary if the provided data does not specify a serializer.
195 Returns:
196 ResultRecord: the deserialized record
197 """
198 try:
199 instance = cls.model_validate_json(data)
200 except ValidationError:
201 if backup_serializer is None:
202 raise
203 else:
204 result = backup_serializer.loads(data)
205 return cls(
206 metadata=ResultRecordMetadata(serializer=backup_serializer),
207 result=result,
208 )
209 if isinstance(instance.result, bytes):
210 instance.result = instance.serializer.loads(instance.result)
211 elif isinstance(instance.result, str):
212 instance.result = instance.serializer.loads(instance.result.encode())
213 return instance
215 @classmethod 1a
216 def deserialize_from_result_and_metadata( 1a
217 cls, result: bytes, metadata: bytes
218 ) -> "ResultRecord[R]":
219 """
220 Deserialize a record from separate result and metadata bytes.
222 Args:
223 result: the result
224 metadata: the serialized metadata
226 Returns:
227 ResultRecord: the deserialized record
228 """
229 result_record_metadata = ResultRecordMetadata.load_bytes(metadata)
230 return cls(
231 metadata=result_record_metadata,
232 result=result_record_metadata.serializer.loads(result),
233 )
235 def __eq__(self, other: Any | "ResultRecord[Any]") -> bool: 1a
236 if not isinstance(other, ResultRecord):
237 return False
238 return self.model_dump(include={"metadata", "result"}) == other.model_dump(
239 include={"metadata", "result"}
240 )