Coverage for /usr/local/lib/python3.12/site-packages/prefect/serializers.py: 63%

120 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Serializer implementations for converting objects to bytes and bytes to objects. 

3 

4All serializers are based on the `Serializer` class and include a `type` string that 

5allows them to be referenced without referencing the actual class. For example, you 

6can get often specify the `JSONSerializer` with the string "json". Some serializers 

7support additional settings for configuration of serialization. These are stored on 

8the instance so the same settings can be used to load saved objects. 

9 

10All serializers must implement `dumps` and `loads` which convert objects to bytes and 

11bytes to an object respectively. 

12""" 

13 

14import base64 1a

15import io 1a

16from typing import Any, ClassVar, Generic, Optional, Union, overload 1a

17 

18from pydantic import ( 1a

19 BaseModel, 

20 ConfigDict, 

21 Field, 

22 TypeAdapter, 

23 ValidationError, 

24 field_validator, 

25) 

26from typing_extensions import Self, TypeVar 1a

27 

28from prefect._internal.schemas.validators import ( 1a

29 cast_type_names_to_serializers, 

30 validate_compressionlib, 

31 validate_dump_kwargs, 

32 validate_load_kwargs, 

33 validate_picklelib, 

34) 

35from prefect.utilities.dispatch import get_dispatch_key, lookup_type, register_base_type 1a

36from prefect.utilities.importtools import from_qualified_name, to_qualified_name 1a

37from prefect.utilities.pydantic import custom_pydantic_encoder 1a

38 

39D = TypeVar("D", default=Any) 1a

40 

41_TYPE_ADAPTER_CACHE: dict[str, TypeAdapter[Any]] = {} 1a

42 

43 

44def prefect_json_object_encoder(obj: Any) -> Any: 1a

45 """ 

46 `JSONEncoder.default` for encoding objects into JSON with extended type support. 

47 

48 Raises a `TypeError` to fallback on other encoders on failure. 

49 """ 

50 if isinstance(obj, BaseException): 50 ↛ 51line 50 didn't jump to line 51 because the condition on line 50 was never true1b

51 return {"__exc_type__": to_qualified_name(obj.__class__), "message": str(obj)} 

52 elif isinstance(obj, io.IOBase): 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true1b

53 return { 

54 "__class__": to_qualified_name(obj.__class__), 

55 "data": ( 

56 f"<Prefect IOStream Placeholder: type={obj.__class__.__name__}, " 

57 f"repr={repr(obj)} (original content not read)>" 

58 ), 

59 } 

60 else: 

61 return { 

62 "__class__": to_qualified_name(obj.__class__), 

63 "data": custom_pydantic_encoder({}, obj), 

64 } 

65 

66 

67def prefect_json_object_decoder(result: dict[str, Any]) -> Any: 1a

68 """ 

69 `JSONDecoder.object_hook` for decoding objects from JSON when previously encoded 

70 with `prefect_json_object_encoder` 

71 """ 

72 if "__class__" in result: 

73 class_name = result["__class__"] 

74 if class_name not in _TYPE_ADAPTER_CACHE: 

75 _TYPE_ADAPTER_CACHE[class_name] = TypeAdapter( 

76 from_qualified_name(class_name) 

77 ) 

78 return _TYPE_ADAPTER_CACHE[class_name].validate_python(result["data"]) 

79 elif "__exc_type__" in result: 

80 return from_qualified_name(result["__exc_type__"])(result["message"]) 

81 else: 

82 return result 

83 

84 

85@register_base_type 1a

86class Serializer(BaseModel, Generic[D]): 1a

87 """ 

88 A serializer that can encode objects of type 'D' into bytes. 

89 """ 

90 

91 def __init__(self, **data: Any) -> None: 1a

92 type_string = ( 1abcd

93 get_dispatch_key(self) if type(self) is not Serializer else "__base__" 

94 ) 

95 data.setdefault("type", type_string) 1abcd

96 super().__init__(**data) 1abcd

97 

98 @overload 1a

99 def __new__(cls, *, type: str, **kwargs: Any) -> "Serializer[Any]": ... 99 ↛ exitline 99 didn't return from function '__new__' because 1a

100 

101 @overload 1a

102 def __new__(cls, *, type: None = ..., **kwargs: Any) -> Self: ... 102 ↛ exitline 102 didn't return from function '__new__' because 1a

103 

104 def __new__(cls, **kwargs: Any) -> Union[Self, "Serializer[Any]"]: 1a

105 if type_ := kwargs.get("type"): 105 ↛ 106line 105 didn't jump to line 106 because the condition on line 105 was never true1abcd

106 try: 

107 subcls = lookup_type(cls, dispatch_key=type_) 

108 except KeyError as exc: 

109 raise ValidationError.from_exception_data( 

110 title=cls.__name__, 

111 line_errors=[{"type": str(exc), "input": kwargs["type"]}], 

112 input_type="python", 

113 ) 

114 

115 return super().__new__(subcls) 

116 else: 

117 return super().__new__(cls) 1abcd

118 

119 type: str 1a

120 

121 def dumps(self, obj: D) -> bytes: 1a

122 """Encode the object into a blob of bytes.""" 

123 raise NotImplementedError 

124 

125 def loads(self, blob: bytes) -> D: 1a

126 """Decode the blob of bytes into an object.""" 

127 raise NotImplementedError 

128 

129 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

130 

131 @classmethod 1a

132 def __dispatch_key__(cls) -> Optional[str]: 1a

133 type_str = cls.model_fields["type"].default 1abcd

134 return type_str if isinstance(type_str, str) else None 1abcd

135 

136 

137class PickleSerializer(Serializer[D]): 1a

138 """ 

139 Serializes objects using the pickle protocol. 

140 

141 - Uses `cloudpickle` by default. See `picklelib` for using alternative libraries. 

142 - Stores the version of the pickle library to check for compatibility during 

143 deserialization. 

144 - Wraps pickles in base64 for safe transmission. 

145 """ 

146 

147 type: str = Field(default="pickle", frozen=True) 1a

148 

149 picklelib: str = "cloudpickle" 1a

150 picklelib_version: Optional[str] = None 1a

151 

152 @field_validator("picklelib") 1a

153 def check_picklelib(cls, value: str) -> str: 1a

154 return validate_picklelib(value) 

155 

156 def dumps(self, obj: D) -> bytes: 1a

157 pickler = from_qualified_name(self.picklelib) 

158 blob = pickler.dumps(obj) 

159 return base64.encodebytes(blob) 

160 

161 def loads(self, blob: bytes) -> D: 1a

162 pickler = from_qualified_name(self.picklelib) 

163 return pickler.loads(base64.decodebytes(blob)) 

164 

165 

166class JSONSerializer(Serializer[D]): 1a

167 """ 

168 Serializes data to JSON. 

169 

170 Input types must be compatible with the stdlib json library. 

171 

172 Wraps the `json` library to serialize to UTF-8 bytes instead of string types. 

173 """ 

174 

175 type: str = Field(default="json", frozen=True) 1a

176 

177 jsonlib: str = "json" 1a

178 object_encoder: Optional[str] = Field( 1a

179 default="prefect.serializers.prefect_json_object_encoder", 

180 description=( 

181 "An optional callable to use when serializing objects that are not " 

182 "supported by the JSON encoder. By default, this is set to a callable that " 

183 "adds support for all types supported by " 

184 ), 

185 ) 

186 object_decoder: Optional[str] = Field( 1a

187 default="prefect.serializers.prefect_json_object_decoder", 

188 description=( 

189 "An optional callable to use when deserializing objects. This callable " 

190 "is passed each dictionary encountered during JSON deserialization. " 

191 "By default, this is set to a callable that deserializes content created " 

192 "by our default `object_encoder`." 

193 ), 

194 ) 

195 dumps_kwargs: dict[str, Any] = Field(default_factory=dict) 1a

196 loads_kwargs: dict[str, Any] = Field(default_factory=dict) 1a

197 

198 @field_validator("dumps_kwargs") 1a

199 def dumps_kwargs_cannot_contain_default( 1a

200 cls, value: dict[str, Any] 

201 ) -> dict[str, Any]: 

202 return validate_dump_kwargs(value) 1abcd

203 

204 @field_validator("loads_kwargs") 1a

205 def loads_kwargs_cannot_contain_object_hook( 1a

206 cls, value: dict[str, Any] 

207 ) -> dict[str, Any]: 

208 return validate_load_kwargs(value) 

209 

210 def dumps(self, obj: D) -> bytes: 1a

211 json = from_qualified_name(self.jsonlib) 1abc

212 kwargs = self.dumps_kwargs.copy() 1abc

213 if self.object_encoder: 213 ↛ 215line 213 didn't jump to line 215 because the condition on line 213 was always true1abc

214 kwargs["default"] = from_qualified_name(self.object_encoder) 1abc

215 result = json.dumps(obj, **kwargs) 1abc

216 if isinstance(result, str): 216 ↛ 219line 216 didn't jump to line 219 because the condition on line 216 was always true1abc

217 # The standard library returns str but others may return bytes directly 

218 result = result.encode() 1abc

219 return result 1abc

220 

221 def loads(self, blob: bytes) -> D: 1a

222 json = from_qualified_name(self.jsonlib) 

223 kwargs = self.loads_kwargs.copy() 

224 if self.object_decoder: 

225 kwargs["object_hook"] = from_qualified_name(self.object_decoder) 

226 return json.loads(blob.decode(), **kwargs) 

227 

228 

229class CompressedSerializer(Serializer[D]): 1a

230 """ 

231 Wraps another serializer, compressing its output. 

232 Uses `lzma` by default. See `compressionlib` for using alternative libraries. 

233 

234 Attributes: 

235 serializer: The serializer to use before compression. 

236 compressionlib: The import path of a compression module to use. 

237 Must have methods `compress(bytes) -> bytes` and `decompress(bytes) -> bytes`. 

238 level: If not null, the level of compression to pass to `compress`. 

239 """ 

240 

241 type: str = Field(default="compressed", frozen=True) 1a

242 

243 serializer: Serializer[D] 1a

244 compressionlib: str = "lzma" 1a

245 

246 @field_validator("serializer", mode="before") 1a

247 def validate_serializer(cls, value: Union[str, Serializer[D]]) -> Serializer[D]: 1a

248 return cast_type_names_to_serializers(value) 

249 

250 @field_validator("compressionlib") 1a

251 def check_compressionlib(cls, value: str) -> str: 1a

252 return validate_compressionlib(value) 

253 

254 def dumps(self, obj: D) -> bytes: 1a

255 blob = self.serializer.dumps(obj) 

256 compressor = from_qualified_name(self.compressionlib) 

257 return base64.encodebytes(compressor.compress(blob)) 

258 

259 def loads(self, blob: bytes) -> D: 1a

260 compressor = from_qualified_name(self.compressionlib) 

261 uncompressed = compressor.decompress(base64.decodebytes(blob)) 

262 return self.serializer.loads(uncompressed) 

263 

264 

265class CompressedPickleSerializer(CompressedSerializer[D]): 1a

266 """ 

267 A compressed serializer preconfigured to use the pickle serializer. 

268 """ 

269 

270 type: str = Field(default="compressed/pickle", frozen=True) 1a

271 

272 serializer: Serializer[D] = Field(default_factory=PickleSerializer) 1a

273 

274 

275class CompressedJSONSerializer(CompressedSerializer[D]): 1a

276 """ 

277 A compressed serializer preconfigured to use the json serializer. 

278 """ 

279 

280 type: str = Field(default="compressed/json", frozen=True) 1a

281 

282 serializer: Serializer[D] = Field(default_factory=JSONSerializer) 1a