Coverage for /usr/local/lib/python3.12/site-packages/prefect/serializers.py: 63%
120 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Serializer implementations for converting objects to bytes and bytes to objects.
4All serializers are based on the `Serializer` class and include a `type` string that
5allows them to be referenced without referencing the actual class. For example, you
6can get often specify the `JSONSerializer` with the string "json". Some serializers
7support additional settings for configuration of serialization. These are stored on
8the instance so the same settings can be used to load saved objects.
10All serializers must implement `dumps` and `loads` which convert objects to bytes and
11bytes to an object respectively.
12"""
14import base64 1a
15import io 1a
16from typing import Any, ClassVar, Generic, Optional, Union, overload 1a
18from pydantic import ( 1a
19 BaseModel,
20 ConfigDict,
21 Field,
22 TypeAdapter,
23 ValidationError,
24 field_validator,
25)
26from typing_extensions import Self, TypeVar 1a
28from prefect._internal.schemas.validators import ( 1a
29 cast_type_names_to_serializers,
30 validate_compressionlib,
31 validate_dump_kwargs,
32 validate_load_kwargs,
33 validate_picklelib,
34)
35from prefect.utilities.dispatch import get_dispatch_key, lookup_type, register_base_type 1a
36from prefect.utilities.importtools import from_qualified_name, to_qualified_name 1a
37from prefect.utilities.pydantic import custom_pydantic_encoder 1a
39D = TypeVar("D", default=Any) 1a
41_TYPE_ADAPTER_CACHE: dict[str, TypeAdapter[Any]] = {} 1a
44def prefect_json_object_encoder(obj: Any) -> Any: 1a
45 """
46 `JSONEncoder.default` for encoding objects into JSON with extended type support.
48 Raises a `TypeError` to fallback on other encoders on failure.
49 """
50 if isinstance(obj, BaseException): 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true1b
51 return {"__exc_type__": to_qualified_name(obj.__class__), "message": str(obj)}
52 elif isinstance(obj, io.IOBase): 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true1b
53 return {
54 "__class__": to_qualified_name(obj.__class__),
55 "data": (
56 f"<Prefect IOStream Placeholder: type={obj.__class__.__name__}, "
57 f"repr={repr(obj)} (original content not read)>"
58 ),
59 }
60 else:
61 return {
62 "__class__": to_qualified_name(obj.__class__),
63 "data": custom_pydantic_encoder({}, obj),
64 }
67def prefect_json_object_decoder(result: dict[str, Any]) -> Any: 1a
68 """
69 `JSONDecoder.object_hook` for decoding objects from JSON when previously encoded
70 with `prefect_json_object_encoder`
71 """
72 if "__class__" in result:
73 class_name = result["__class__"]
74 if class_name not in _TYPE_ADAPTER_CACHE:
75 _TYPE_ADAPTER_CACHE[class_name] = TypeAdapter(
76 from_qualified_name(class_name)
77 )
78 return _TYPE_ADAPTER_CACHE[class_name].validate_python(result["data"])
79 elif "__exc_type__" in result:
80 return from_qualified_name(result["__exc_type__"])(result["message"])
81 else:
82 return result
85@register_base_type 1a
86class Serializer(BaseModel, Generic[D]): 1a
87 """
88 A serializer that can encode objects of type 'D' into bytes.
89 """
91 def __init__(self, **data: Any) -> None: 1a
92 type_string = ( 1ab
93 get_dispatch_key(self) if type(self) is not Serializer else "__base__"
94 )
95 data.setdefault("type", type_string) 1ab
96 super().__init__(**data) 1ab
98 @overload 1a
99 def __new__(cls, *, type: str, **kwargs: Any) -> "Serializer[Any]": ... 99 ↛ exitline 99 didn't return from function '__new__' because 1a
101 @overload 1a
102 def __new__(cls, *, type: None = ..., **kwargs: Any) -> Self: ... 102 ↛ exitline 102 didn't return from function '__new__' because 1a
104 def __new__(cls, **kwargs: Any) -> Union[Self, "Serializer[Any]"]: 1a
105 if type_ := kwargs.get("type"): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true1ab
106 try:
107 subcls = lookup_type(cls, dispatch_key=type_)
108 except KeyError as exc:
109 raise ValidationError.from_exception_data(
110 title=cls.__name__,
111 line_errors=[{"type": str(exc), "input": kwargs["type"]}],
112 input_type="python",
113 )
115 return super().__new__(subcls)
116 else:
117 return super().__new__(cls) 1ab
119 type: str 1a
121 def dumps(self, obj: D) -> bytes: 1a
122 """Encode the object into a blob of bytes."""
123 raise NotImplementedError
125 def loads(self, blob: bytes) -> D: 1a
126 """Decode the blob of bytes into an object."""
127 raise NotImplementedError
129 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
131 @classmethod 1a
132 def __dispatch_key__(cls) -> Optional[str]: 1a
133 type_str = cls.model_fields["type"].default 1ab
134 return type_str if isinstance(type_str, str) else None 1ab
137class PickleSerializer(Serializer[D]): 1a
138 """
139 Serializes objects using the pickle protocol.
141 - Uses `cloudpickle` by default. See `picklelib` for using alternative libraries.
142 - Stores the version of the pickle library to check for compatibility during
143 deserialization.
144 - Wraps pickles in base64 for safe transmission.
145 """
147 type: str = Field(default="pickle", frozen=True) 1a
149 picklelib: str = "cloudpickle" 1a
150 picklelib_version: Optional[str] = None 1a
152 @field_validator("picklelib") 1a
153 def check_picklelib(cls, value: str) -> str: 1a
154 return validate_picklelib(value)
156 def dumps(self, obj: D) -> bytes: 1a
157 pickler = from_qualified_name(self.picklelib)
158 blob = pickler.dumps(obj)
159 return base64.encodebytes(blob)
161 def loads(self, blob: bytes) -> D: 1a
162 pickler = from_qualified_name(self.picklelib)
163 return pickler.loads(base64.decodebytes(blob))
166class JSONSerializer(Serializer[D]): 1a
167 """
168 Serializes data to JSON.
170 Input types must be compatible with the stdlib json library.
172 Wraps the `json` library to serialize to UTF-8 bytes instead of string types.
173 """
175 type: str = Field(default="json", frozen=True) 1a
177 jsonlib: str = "json" 1a
178 object_encoder: Optional[str] = Field( 1a
179 default="prefect.serializers.prefect_json_object_encoder",
180 description=(
181 "An optional callable to use when serializing objects that are not "
182 "supported by the JSON encoder. By default, this is set to a callable that "
183 "adds support for all types supported by "
184 ),
185 )
186 object_decoder: Optional[str] = Field( 1a
187 default="prefect.serializers.prefect_json_object_decoder",
188 description=(
189 "An optional callable to use when deserializing objects. This callable "
190 "is passed each dictionary encountered during JSON deserialization. "
191 "By default, this is set to a callable that deserializes content created "
192 "by our default `object_encoder`."
193 ),
194 )
195 dumps_kwargs: dict[str, Any] = Field(default_factory=dict) 1a
196 loads_kwargs: dict[str, Any] = Field(default_factory=dict) 1a
198 @field_validator("dumps_kwargs") 1a
199 def dumps_kwargs_cannot_contain_default( 1a
200 cls, value: dict[str, Any]
201 ) -> dict[str, Any]:
202 return validate_dump_kwargs(value) 1ab
204 @field_validator("loads_kwargs") 1a
205 def loads_kwargs_cannot_contain_object_hook( 1a
206 cls, value: dict[str, Any]
207 ) -> dict[str, Any]:
208 return validate_load_kwargs(value)
210 def dumps(self, obj: D) -> bytes: 1a
211 json = from_qualified_name(self.jsonlib) 1ab
212 kwargs = self.dumps_kwargs.copy() 1ab
213 if self.object_encoder: 213 ↛ 215line 213 didn't jump to line 215 because the condition on line 213 was always true1ab
214 kwargs["default"] = from_qualified_name(self.object_encoder) 1ab
215 result = json.dumps(obj, **kwargs) 1ab
216 if isinstance(result, str): 216 ↛ 219line 216 didn't jump to line 219 because the condition on line 216 was always true1ab
217 # The standard library returns str but others may return bytes directly
218 result = result.encode() 1ab
219 return result 1ab
221 def loads(self, blob: bytes) -> D: 1a
222 json = from_qualified_name(self.jsonlib)
223 kwargs = self.loads_kwargs.copy()
224 if self.object_decoder:
225 kwargs["object_hook"] = from_qualified_name(self.object_decoder)
226 return json.loads(blob.decode(), **kwargs)
229class CompressedSerializer(Serializer[D]): 1a
230 """
231 Wraps another serializer, compressing its output.
232 Uses `lzma` by default. See `compressionlib` for using alternative libraries.
234 Attributes:
235 serializer: The serializer to use before compression.
236 compressionlib: The import path of a compression module to use.
237 Must have methods `compress(bytes) -> bytes` and `decompress(bytes) -> bytes`.
238 level: If not null, the level of compression to pass to `compress`.
239 """
241 type: str = Field(default="compressed", frozen=True) 1a
243 serializer: Serializer[D] 1a
244 compressionlib: str = "lzma" 1a
246 @field_validator("serializer", mode="before") 1a
247 def validate_serializer(cls, value: Union[str, Serializer[D]]) -> Serializer[D]: 1a
248 return cast_type_names_to_serializers(value)
250 @field_validator("compressionlib") 1a
251 def check_compressionlib(cls, value: str) -> str: 1a
252 return validate_compressionlib(value)
254 def dumps(self, obj: D) -> bytes: 1a
255 blob = self.serializer.dumps(obj)
256 compressor = from_qualified_name(self.compressionlib)
257 return base64.encodebytes(compressor.compress(blob))
259 def loads(self, blob: bytes) -> D: 1a
260 compressor = from_qualified_name(self.compressionlib)
261 uncompressed = compressor.decompress(base64.decodebytes(blob))
262 return self.serializer.loads(uncompressed)
265class CompressedPickleSerializer(CompressedSerializer[D]): 1a
266 """
267 A compressed serializer preconfigured to use the pickle serializer.
268 """
270 type: str = Field(default="compressed/pickle", frozen=True) 1a
272 serializer: Serializer[D] = Field(default_factory=PickleSerializer) 1a
275class CompressedJSONSerializer(CompressedSerializer[D]): 1a
276 """
277 A compressed serializer preconfigured to use the json serializer.
278 """
280 type: str = Field(default="compressed/json", frozen=True) 1a
282 serializer: Serializer[D] = Field(default_factory=JSONSerializer) 1a