Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/pydantic.py: 34%

132 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import warnings 1a

2from functools import partial 1a

3from typing import ( 1a

4 Any, 

5 Callable, 

6 Generic, 

7 Optional, 

8 TypeVar, 

9 Union, 

10 cast, 

11 get_origin, 

12 overload, 

13) 

14 

15from pydantic import ( 1a

16 BaseModel, 

17 Secret, 

18 TypeAdapter, 

19 ValidationError, 

20) 

21from pydantic_core import to_jsonable_python 1a

22from typing_extensions import Literal 1a

23 

24from prefect.utilities.collections import visit_collection 1a

25from prefect.utilities.dispatch import get_dispatch_key, lookup_type, register_base_type 1a

26from prefect.utilities.importtools import from_qualified_name, to_qualified_name 1a

27from prefect.utilities.names import obfuscate 1a

28 

29D = TypeVar("D", bound=Any) 1a

30M = TypeVar("M", bound=BaseModel) 1a

31T = TypeVar("T", bound=Any) 1a

32 

33 

34def _reduce_model(self: BaseModel) -> tuple[Any, ...]: 1a

35 """ 

36 Helper for serializing a cythonized model with cloudpickle. 

37 

38 Keyword arguments can provide additional settings to the `json` call. Since 

39 `__reduce__` takes no arguments, these are set on the `__reduce_kwargs__` attr. 

40 """ 

41 return ( 

42 _unreduce_model, 

43 ( 

44 to_qualified_name(type(self)), 

45 self.model_dump_json(**getattr(self, "__reduce_kwargs__", {})), 

46 ), 

47 ) 

48 

49 

50def _unreduce_model(model_name: str, json: str) -> Any: 1a

51 """Helper for restoring model after serialization""" 

52 model = from_qualified_name(model_name) 

53 return model.model_validate_json(json) 

54 

55 

56@overload 1a

57def add_cloudpickle_reduction(__model_cls: type[M]) -> type[M]: ... 57 ↛ exitline 57 didn't return from function 'add_cloudpickle_reduction' because 1a

58 

59 

60@overload 1a

61def add_cloudpickle_reduction( 61 ↛ exitline 61 didn't return from function 'add_cloudpickle_reduction' because 1a

62 __model_cls: None = None, **kwargs: Any 

63) -> Callable[[type[M]], type[M]]: ... 

64 

65 

66def add_cloudpickle_reduction( 1a

67 __model_cls: Optional[type[M]] = None, **kwargs: Any 

68) -> Union[type[M], Callable[[type[M]], type[M]]]: 

69 """ 

70 Adds a `__reducer__` to the given class that ensures it is cloudpickle compatible. 

71 

72 Workaround for issues with cloudpickle when using cythonized pydantic which 

73 throws exceptions when attempting to pickle the class which has "compiled" 

74 validator methods dynamically attached to it. 

75 

76 We cannot define this utility in the model class itself because the class is the 

77 type that contains unserializable methods. 

78 

79 Any model using some features of Pydantic (e.g. `Path` validation) with a Cython 

80 compiled Pydantic installation may encounter pickling issues. 

81 

82 See related issue at https://github.com/cloudpipe/cloudpickle/issues/408 

83 """ 

84 if __model_cls: 

85 __model_cls.__reduce__ = _reduce_model 

86 setattr(__model_cls, "__reduce_kwargs__", kwargs) 

87 return __model_cls 

88 

89 def reducer_with_kwargs(__model_cls: type[M]) -> type[M]: 

90 return add_cloudpickle_reduction(__model_cls, **kwargs) 

91 

92 return reducer_with_kwargs 

93 

94 

95def get_class_fields_only(model: type[BaseModel]) -> set[str]: 1a

96 """ 

97 Gets all the field names defined on the model class but not any parent classes. 

98 Any fields that are on the parent but redefined on the subclass are included. 

99 """ 

100 subclass_class_fields = set(model.__annotations__.keys()) 

101 parent_class_fields: set[str] = set() 

102 

103 for base in model.__class__.__bases__: 

104 if issubclass(base, BaseModel): 

105 parent_class_fields.update(base.__annotations__.keys()) 

106 

107 return (subclass_class_fields - parent_class_fields) | ( 

108 subclass_class_fields & parent_class_fields 

109 ) 

110 

111 

112def add_type_dispatch(model_cls: type[M]) -> type[M]: 1a

113 """ 

114 Extend a Pydantic model to add a 'type' field that is used as a discriminator field 

115 to dynamically determine the subtype that when deserializing models. 

116 

117 This allows automatic resolution to subtypes of the decorated model. 

118 

119 If a type field already exists, it should be a string literal field that has a 

120 constant value for each subclass. The default value of this field will be used as 

121 the dispatch key. 

122 

123 If a type field does not exist, one will be added. In this case, the value of the 

124 field will be set to the value of the `__dispatch_key__`. The base class should 

125 define a `__dispatch_key__` class method that is used to determine the unique key 

126 for each subclass. Alternatively, each subclass can define the `__dispatch_key__` 

127 as a string literal. 

128 

129 The base class must not define a 'type' field. If it is not desirable to add a field 

130 to the model and the dispatch key can be tracked separately, the lower level 

131 utilities in `prefect.utilities.dispatch` should be used directly. 

132 """ 

133 defines_dispatch_key = hasattr( 

134 model_cls, "__dispatch_key__" 

135 ) or "__dispatch_key__" in getattr(model_cls, "__annotations__", {}) 

136 

137 defines_type_field = "type" in model_cls.model_fields 

138 

139 if not defines_dispatch_key and not defines_type_field: 

140 raise ValueError( 

141 f"Model class {model_cls.__name__!r} does not define a `__dispatch_key__` " 

142 "or a type field. One of these is required for dispatch." 

143 ) 

144 

145 elif not defines_dispatch_key and defines_type_field: 

146 field_type_annotation = model_cls.model_fields["type"].annotation 

147 if field_type_annotation is not str and field_type_annotation is not None: 

148 raise TypeError( 

149 f"Model class {model_cls.__name__!r} defines a 'type' field with " 

150 f"type {field_type_annotation.__name__!r} but it must be 'str'." 

151 ) 

152 

153 # Set the dispatch key to retrieve the value from the type field 

154 @classmethod 

155 def dispatch_key_from_type_field(cls: type[M]) -> str: 

156 return cls.model_fields["type"].default 

157 

158 setattr(model_cls, "__dispatch_key__", dispatch_key_from_type_field) 

159 

160 else: 

161 raise ValueError( 

162 f"Model class {model_cls.__name__!r} defines a `__dispatch_key__` " 

163 "and a type field. Only one of these may be defined for dispatch." 

164 ) 

165 

166 cls_init = model_cls.__init__ 

167 cls_new = model_cls.__new__ 

168 

169 def __init__(__pydantic_self__: M, **data: Any) -> None: 

170 type_string = ( 

171 get_dispatch_key(__pydantic_self__) 

172 if type(__pydantic_self__) is not model_cls 

173 else "__base__" 

174 ) 

175 data.setdefault("type", type_string) 

176 cls_init(__pydantic_self__, **data) 

177 

178 def __new__(cls: type[M], **kwargs: Any) -> M: 

179 if "type" in kwargs: 

180 try: 

181 subcls = lookup_type(cls, dispatch_key=kwargs["type"]) 

182 except KeyError as exc: 

183 raise ValidationError.from_exception_data( 

184 title=cls.__name__, 

185 line_errors=[{"type": str(exc), "input": kwargs["type"]}], 

186 input_type="python", 

187 ) 

188 return cls_new(subcls) 

189 else: 

190 return cls_new(cls) 

191 

192 model_cls.__init__ = __init__ 

193 model_cls.__new__ = __new__ 

194 

195 register_base_type(model_cls) 

196 

197 return model_cls 

198 

199 

200class PartialModel(Generic[M]): 1a

201 """ 

202 A utility for creating a Pydantic model in several steps. 

203 

204 Fields may be set at initialization, via attribute assignment, or at finalization 

205 when the concrete model is returned. 

206 

207 Pydantic validation does not occur until finalization. 

208 

209 Each field can only be set once and a `ValueError` will be raised on assignment if 

210 a field already has a value. 

211 

212 Example: 

213 ```python 

214 class MyModel(BaseModel): 

215 x: int 

216 y: str 

217 z: float 

218 

219 partial_model = PartialModel(MyModel, x=1) 

220 partial_model.y = "two" 

221 model = partial_model.finalize(z=3.0) 

222 ``` 

223 """ 

224 

225 def __init__(self, __model_cls: type[M], **kwargs: Any) -> None: 1a

226 self.fields = kwargs 

227 # Set fields first to avoid issues if `fields` is also set on the `model_cls` 

228 # in our custom `setattr` implementation. 

229 self.model_cls = __model_cls 

230 

231 for name in kwargs.keys(): 

232 self.raise_if_not_in_model(name) 

233 

234 def finalize(self, **kwargs: Any) -> M: 1a

235 for name in kwargs.keys(): 

236 self.raise_if_already_set(name) 

237 self.raise_if_not_in_model(name) 

238 return self.model_cls(**self.fields, **kwargs) 

239 

240 def raise_if_already_set(self, name: str) -> None: 1a

241 if name in self.fields: 

242 raise ValueError(f"Field {name!r} has already been set.") 

243 

244 def raise_if_not_in_model(self, name: str) -> None: 1a

245 if name not in self.model_cls.model_fields: 

246 raise ValueError(f"Field {name!r} is not present in the model.") 

247 

248 def __setattr__(self, __name: str, __value: Any) -> None: 1a

249 if __name in {"fields", "model_cls"}: 

250 return super().__setattr__(__name, __value) 

251 

252 self.raise_if_already_set(__name) 

253 self.raise_if_not_in_model(__name) 

254 self.fields[__name] = __value 

255 

256 def __repr__(self) -> str: 1a

257 dsp_fields = ", ".join( 

258 f"{key}={repr(value)}" for key, value in self.fields.items() 

259 ) 

260 return f"PartialModel(cls={self.model_cls.__name__}, {dsp_fields})" 

261 

262 

263def custom_pydantic_encoder( 1a

264 type_encoders: dict[Any, Callable[[type[Any]], Any]], obj: Any 

265) -> Any: 

266 # Check the class type and its superclasses for a matching encoder 

267 for base in obj.__class__.__mro__[:-1]: 1b

268 try: 1b

269 encoder = type_encoders[base] 1b

270 except KeyError: 1b

271 continue 1b

272 

273 return encoder(obj) 

274 else: # We have exited the for loop without finding a suitable encoder 

275 if isinstance(obj, BaseModel): 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true1b

276 return obj.model_dump(mode="json") 

277 else: 

278 return to_jsonable_python(obj) 1b

279 

280 

281def parse_obj_as( 1a

282 type_: type[T], 

283 data: Any, 

284 mode: Literal["python", "json", "strings"] = "python", 

285) -> T: 

286 """Parse a given data structure as a Pydantic model via `TypeAdapter`. 

287 

288 Read more about `TypeAdapter` [here](https://docs.pydantic.dev/latest/concepts/type_adapter/). 

289 

290 Args: 

291 type_: The type to parse the data as. 

292 data: The data to be parsed. 

293 mode: The mode to use for parsing, either `python`, `json`, or `strings`. 

294 Defaults to `python`, where `data` should be a Python object (e.g. `dict`). 

295 

296 Returns: 

297 The parsed `data` as the given `type_`. 

298 

299 

300 Example: 

301 Basic Usage of `parse_as` 

302 ```python 

303 from prefect.utilities.pydantic import parse_as 

304 from pydantic import BaseModel 

305 

306 class ExampleModel(BaseModel): 

307 name: str 

308 

309 # parsing python objects 

310 parsed = parse_as(ExampleModel, {"name": "Marvin"}) 

311 assert isinstance(parsed, ExampleModel) 

312 assert parsed.name == "Marvin" 

313 

314 # parsing json strings 

315 parsed = parse_as( 

316 list[ExampleModel], 

317 '[{"name": "Marvin"}, {"name": "Arthur"}]', 

318 mode="json" 

319 ) 

320 assert all(isinstance(item, ExampleModel) for item in parsed) 

321 assert parsed[0].name == "Marvin" 

322 assert parsed[1].name == "Arthur" 

323 

324 # parsing raw strings 

325 parsed = parse_as(int, '123', mode="strings") 

326 assert isinstance(parsed, int) 

327 assert parsed == 123 

328 ``` 

329 

330 """ 

331 adapter = TypeAdapter(type_) 

332 

333 origin: Optional[Any] = get_origin(type_) 

334 if origin is list and isinstance(data, dict): 

335 values_dict: dict[Any, Any] = data 

336 data = next(iter(values_dict.values())) 

337 

338 parser: Callable[[Any], T] = getattr(adapter, f"validate_{mode}") 

339 

340 return parser(data) 

341 

342 

343def handle_secret_render(value: object, context: dict[str, Any]) -> object: 1a

344 if hasattr(value, "get_secret_value"): 1a

345 return ( 1a

346 cast(Secret[object], value).get_secret_value() 

347 if context.get("include_secrets", False) 

348 else obfuscate(value) 

349 ) 

350 elif isinstance(value, BaseModel): 1a

351 # Pass the serialization mode if available in context 

352 mode = context.get("serialization_mode", "python") 1a

353 if mode == "json": 353 ↛ 357line 353 didn't jump to line 357 because the condition on line 353 was never true1a

354 # For JSON mode with nested models, we need to recursively process fields 

355 # because regular Pydantic models don't understand include_secrets 

356 

357 json_data = value.model_dump(mode="json") 

358 for field_name in type(value).model_fields: 

359 field_value = getattr(value, field_name) 

360 json_data[field_name] = visit_collection( 

361 expr=field_value, 

362 visit_fn=partial(handle_secret_render, context=context), 

363 return_data=True, 

364 ) 

365 return json_data 

366 else: 

367 return value.model_dump(context=context) 1a

368 return value 1a

369 

370 

371def __getattr__(name: str) -> Any: 1a

372 """ 

373 Handles imports from this module that are deprecated. 

374 """ 

375 

376 if name == "JsonPatch": 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true1a

377 warnings.warn( 

378 "JsonPatch is deprecated and will be removed after March 2025. " 

379 "Please use `JsonPatch` from the `jsonpatch` package instead.", 

380 DeprecationWarning, 

381 stacklevel=2, 

382 ) 

383 from ._deprecated import JsonPatch 

384 

385 return JsonPatch 

386 else: 

387 raise AttributeError(f"module '{__name__}' has no attribute '{name}'") 1a