Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/pydantic.py: 34%
132 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import warnings 1a
2from functools import partial 1a
3from typing import ( 1a
4 Any,
5 Callable,
6 Generic,
7 Optional,
8 TypeVar,
9 Union,
10 cast,
11 get_origin,
12 overload,
13)
15from pydantic import ( 1a
16 BaseModel,
17 Secret,
18 TypeAdapter,
19 ValidationError,
20)
21from pydantic_core import to_jsonable_python 1a
22from typing_extensions import Literal 1a
24from prefect.utilities.collections import visit_collection 1a
25from prefect.utilities.dispatch import get_dispatch_key, lookup_type, register_base_type 1a
26from prefect.utilities.importtools import from_qualified_name, to_qualified_name 1a
27from prefect.utilities.names import obfuscate 1a
29D = TypeVar("D", bound=Any) 1a
30M = TypeVar("M", bound=BaseModel) 1a
31T = TypeVar("T", bound=Any) 1a
34def _reduce_model(self: BaseModel) -> tuple[Any, ...]: 1a
35 """
36 Helper for serializing a cythonized model with cloudpickle.
38 Keyword arguments can provide additional settings to the `json` call. Since
39 `__reduce__` takes no arguments, these are set on the `__reduce_kwargs__` attr.
40 """
41 return (
42 _unreduce_model,
43 (
44 to_qualified_name(type(self)),
45 self.model_dump_json(**getattr(self, "__reduce_kwargs__", {})),
46 ),
47 )
50def _unreduce_model(model_name: str, json: str) -> Any: 1a
51 """Helper for restoring model after serialization"""
52 model = from_qualified_name(model_name)
53 return model.model_validate_json(json)
56@overload 1a
57def add_cloudpickle_reduction(__model_cls: type[M]) -> type[M]: ... 57 ↛ exitline 57 didn't return from function 'add_cloudpickle_reduction' because 1a
60@overload 1a
61def add_cloudpickle_reduction( 61 ↛ exitline 61 didn't return from function 'add_cloudpickle_reduction' because 1a
62 __model_cls: None = None, **kwargs: Any
63) -> Callable[[type[M]], type[M]]: ...
66def add_cloudpickle_reduction( 1a
67 __model_cls: Optional[type[M]] = None, **kwargs: Any
68) -> Union[type[M], Callable[[type[M]], type[M]]]:
69 """
70 Adds a `__reducer__` to the given class that ensures it is cloudpickle compatible.
72 Workaround for issues with cloudpickle when using cythonized pydantic which
73 throws exceptions when attempting to pickle the class which has "compiled"
74 validator methods dynamically attached to it.
76 We cannot define this utility in the model class itself because the class is the
77 type that contains unserializable methods.
79 Any model using some features of Pydantic (e.g. `Path` validation) with a Cython
80 compiled Pydantic installation may encounter pickling issues.
82 See related issue at https://github.com/cloudpipe/cloudpickle/issues/408
83 """
84 if __model_cls:
85 __model_cls.__reduce__ = _reduce_model
86 setattr(__model_cls, "__reduce_kwargs__", kwargs)
87 return __model_cls
89 def reducer_with_kwargs(__model_cls: type[M]) -> type[M]:
90 return add_cloudpickle_reduction(__model_cls, **kwargs)
92 return reducer_with_kwargs
95def get_class_fields_only(model: type[BaseModel]) -> set[str]: 1a
96 """
97 Gets all the field names defined on the model class but not any parent classes.
98 Any fields that are on the parent but redefined on the subclass are included.
99 """
100 subclass_class_fields = set(model.__annotations__.keys())
101 parent_class_fields: set[str] = set()
103 for base in model.__class__.__bases__:
104 if issubclass(base, BaseModel):
105 parent_class_fields.update(base.__annotations__.keys())
107 return (subclass_class_fields - parent_class_fields) | (
108 subclass_class_fields & parent_class_fields
109 )
112def add_type_dispatch(model_cls: type[M]) -> type[M]: 1a
113 """
114 Extend a Pydantic model to add a 'type' field that is used as a discriminator field
115 to dynamically determine the subtype that when deserializing models.
117 This allows automatic resolution to subtypes of the decorated model.
119 If a type field already exists, it should be a string literal field that has a
120 constant value for each subclass. The default value of this field will be used as
121 the dispatch key.
123 If a type field does not exist, one will be added. In this case, the value of the
124 field will be set to the value of the `__dispatch_key__`. The base class should
125 define a `__dispatch_key__` class method that is used to determine the unique key
126 for each subclass. Alternatively, each subclass can define the `__dispatch_key__`
127 as a string literal.
129 The base class must not define a 'type' field. If it is not desirable to add a field
130 to the model and the dispatch key can be tracked separately, the lower level
131 utilities in `prefect.utilities.dispatch` should be used directly.
132 """
133 defines_dispatch_key = hasattr(
134 model_cls, "__dispatch_key__"
135 ) or "__dispatch_key__" in getattr(model_cls, "__annotations__", {})
137 defines_type_field = "type" in model_cls.model_fields
139 if not defines_dispatch_key and not defines_type_field:
140 raise ValueError(
141 f"Model class {model_cls.__name__!r} does not define a `__dispatch_key__` "
142 "or a type field. One of these is required for dispatch."
143 )
145 elif not defines_dispatch_key and defines_type_field:
146 field_type_annotation = model_cls.model_fields["type"].annotation
147 if field_type_annotation is not str and field_type_annotation is not None:
148 raise TypeError(
149 f"Model class {model_cls.__name__!r} defines a 'type' field with "
150 f"type {field_type_annotation.__name__!r} but it must be 'str'."
151 )
153 # Set the dispatch key to retrieve the value from the type field
154 @classmethod
155 def dispatch_key_from_type_field(cls: type[M]) -> str:
156 return cls.model_fields["type"].default
158 setattr(model_cls, "__dispatch_key__", dispatch_key_from_type_field)
160 else:
161 raise ValueError(
162 f"Model class {model_cls.__name__!r} defines a `__dispatch_key__` "
163 "and a type field. Only one of these may be defined for dispatch."
164 )
166 cls_init = model_cls.__init__
167 cls_new = model_cls.__new__
169 def __init__(__pydantic_self__: M, **data: Any) -> None:
170 type_string = (
171 get_dispatch_key(__pydantic_self__)
172 if type(__pydantic_self__) is not model_cls
173 else "__base__"
174 )
175 data.setdefault("type", type_string)
176 cls_init(__pydantic_self__, **data)
178 def __new__(cls: type[M], **kwargs: Any) -> M:
179 if "type" in kwargs:
180 try:
181 subcls = lookup_type(cls, dispatch_key=kwargs["type"])
182 except KeyError as exc:
183 raise ValidationError.from_exception_data(
184 title=cls.__name__,
185 line_errors=[{"type": str(exc), "input": kwargs["type"]}],
186 input_type="python",
187 )
188 return cls_new(subcls)
189 else:
190 return cls_new(cls)
192 model_cls.__init__ = __init__
193 model_cls.__new__ = __new__
195 register_base_type(model_cls)
197 return model_cls
200class PartialModel(Generic[M]): 1a
201 """
202 A utility for creating a Pydantic model in several steps.
204 Fields may be set at initialization, via attribute assignment, or at finalization
205 when the concrete model is returned.
207 Pydantic validation does not occur until finalization.
209 Each field can only be set once and a `ValueError` will be raised on assignment if
210 a field already has a value.
212 Example:
213 ```python
214 class MyModel(BaseModel):
215 x: int
216 y: str
217 z: float
219 partial_model = PartialModel(MyModel, x=1)
220 partial_model.y = "two"
221 model = partial_model.finalize(z=3.0)
222 ```
223 """
225 def __init__(self, __model_cls: type[M], **kwargs: Any) -> None: 1a
226 self.fields = kwargs
227 # Set fields first to avoid issues if `fields` is also set on the `model_cls`
228 # in our custom `setattr` implementation.
229 self.model_cls = __model_cls
231 for name in kwargs.keys():
232 self.raise_if_not_in_model(name)
234 def finalize(self, **kwargs: Any) -> M: 1a
235 for name in kwargs.keys():
236 self.raise_if_already_set(name)
237 self.raise_if_not_in_model(name)
238 return self.model_cls(**self.fields, **kwargs)
240 def raise_if_already_set(self, name: str) -> None: 1a
241 if name in self.fields:
242 raise ValueError(f"Field {name!r} has already been set.")
244 def raise_if_not_in_model(self, name: str) -> None: 1a
245 if name not in self.model_cls.model_fields:
246 raise ValueError(f"Field {name!r} is not present in the model.")
248 def __setattr__(self, __name: str, __value: Any) -> None: 1a
249 if __name in {"fields", "model_cls"}:
250 return super().__setattr__(__name, __value)
252 self.raise_if_already_set(__name)
253 self.raise_if_not_in_model(__name)
254 self.fields[__name] = __value
256 def __repr__(self) -> str: 1a
257 dsp_fields = ", ".join(
258 f"{key}={repr(value)}" for key, value in self.fields.items()
259 )
260 return f"PartialModel(cls={self.model_cls.__name__}, {dsp_fields})"
263def custom_pydantic_encoder( 1a
264 type_encoders: dict[Any, Callable[[type[Any]], Any]], obj: Any
265) -> Any:
266 # Check the class type and its superclasses for a matching encoder
267 for base in obj.__class__.__mro__[:-1]: 1b
268 try: 1b
269 encoder = type_encoders[base] 1b
270 except KeyError: 1b
271 continue 1b
273 return encoder(obj)
274 else: # We have exited the for loop without finding a suitable encoder
275 if isinstance(obj, BaseModel): 275 ↛ 276line 275 didn't jump to line 276 because the condition on line 275 was never true1b
276 return obj.model_dump(mode="json")
277 else:
278 return to_jsonable_python(obj) 1b
281def parse_obj_as( 1a
282 type_: type[T],
283 data: Any,
284 mode: Literal["python", "json", "strings"] = "python",
285) -> T:
286 """Parse a given data structure as a Pydantic model via `TypeAdapter`.
288 Read more about `TypeAdapter` [here](https://docs.pydantic.dev/latest/concepts/type_adapter/).
290 Args:
291 type_: The type to parse the data as.
292 data: The data to be parsed.
293 mode: The mode to use for parsing, either `python`, `json`, or `strings`.
294 Defaults to `python`, where `data` should be a Python object (e.g. `dict`).
296 Returns:
297 The parsed `data` as the given `type_`.
300 Example:
301 Basic Usage of `parse_as`
302 ```python
303 from prefect.utilities.pydantic import parse_as
304 from pydantic import BaseModel
306 class ExampleModel(BaseModel):
307 name: str
309 # parsing python objects
310 parsed = parse_as(ExampleModel, {"name": "Marvin"})
311 assert isinstance(parsed, ExampleModel)
312 assert parsed.name == "Marvin"
314 # parsing json strings
315 parsed = parse_as(
316 list[ExampleModel],
317 '[{"name": "Marvin"}, {"name": "Arthur"}]',
318 mode="json"
319 )
320 assert all(isinstance(item, ExampleModel) for item in parsed)
321 assert parsed[0].name == "Marvin"
322 assert parsed[1].name == "Arthur"
324 # parsing raw strings
325 parsed = parse_as(int, '123', mode="strings")
326 assert isinstance(parsed, int)
327 assert parsed == 123
328 ```
330 """
331 adapter = TypeAdapter(type_)
333 origin: Optional[Any] = get_origin(type_)
334 if origin is list and isinstance(data, dict):
335 values_dict: dict[Any, Any] = data
336 data = next(iter(values_dict.values()))
338 parser: Callable[[Any], T] = getattr(adapter, f"validate_{mode}")
340 return parser(data)
343def handle_secret_render(value: object, context: dict[str, Any]) -> object: 1a
344 if hasattr(value, "get_secret_value"): 1a
345 return ( 1a
346 cast(Secret[object], value).get_secret_value()
347 if context.get("include_secrets", False)
348 else obfuscate(value)
349 )
350 elif isinstance(value, BaseModel): 1a
351 # Pass the serialization mode if available in context
352 mode = context.get("serialization_mode", "python") 1a
353 if mode == "json": 353 ↛ 357line 353 didn't jump to line 357 because the condition on line 353 was never true1a
354 # For JSON mode with nested models, we need to recursively process fields
355 # because regular Pydantic models don't understand include_secrets
357 json_data = value.model_dump(mode="json")
358 for field_name in type(value).model_fields:
359 field_value = getattr(value, field_name)
360 json_data[field_name] = visit_collection(
361 expr=field_value,
362 visit_fn=partial(handle_secret_render, context=context),
363 return_data=True,
364 )
365 return json_data
366 else:
367 return value.model_dump(context=context) 1a
368 return value 1a
371def __getattr__(name: str) -> Any: 1a
372 """
373 Handles imports from this module that are deprecated.
374 """
376 if name == "JsonPatch": 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true1a
377 warnings.warn(
378 "JsonPatch is deprecated and will be removed after March 2025. "
379 "Please use `JsonPatch` from the `jsonpatch` package instead.",
380 DeprecationWarning,
381 stacklevel=2,
382 )
383 from ._deprecated import JsonPatch
385 return JsonPatch
386 else:
387 raise AttributeError(f"module '{__name__}' has no attribute '{name}'") 1a