Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/schema_tools/validation.py: 16%
140 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from collections import defaultdict, deque 1a
2from collections.abc import Callable, Iterable, Iterator 1a
3from copy import deepcopy 1a
4from typing import TYPE_CHECKING, Any, cast 1a
6import jsonschema 1a
7from jsonschema.exceptions import ValidationError as JSONSchemaValidationError 1a
8from jsonschema.validators import Draft202012Validator, create 1a
9from referencing.jsonschema import ObjectSchema, Schema 1a
11from prefect.utilities.collections import remove_nested_keys 1a
12from prefect.utilities.schema_tools.hydration import HydrationError, Placeholder 1a
14if TYPE_CHECKING: 14 ↛ 15line 14 didn't jump to line 15 because the condition on line 14 was never true1a
15 from jsonschema.validators import _Validator # type: ignore
18class CircularSchemaRefError(Exception): 1a
19 pass 1a
22class ValidationError(Exception): 1a
23 pass 1a
26PLACEHOLDERS_VALIDATOR_NAME = "_placeholders" 1a
29def _build_validator() -> type["_Validator"]: 1a
30 def _applicable_validators(schema: Schema) -> Iterable[tuple[str, Any]]: 1a
31 # the default implementation returns `schema.items()`
32 assert not isinstance(schema, bool)
33 schema = {**schema, PLACEHOLDERS_VALIDATOR_NAME: None}
34 return schema.items()
36 def _placeholders( 1a
37 _validator: "_Validator", _property: object, instance: Any, _schema: Schema
38 ) -> Iterator[JSONSchemaValidationError]:
39 if isinstance(instance, HydrationError):
40 yield JSONSchemaValidationError(instance.message)
42 validators = dict(Draft202012Validator.VALIDATORS) 1a
43 validators.update({PLACEHOLDERS_VALIDATOR_NAME: _placeholders}) 1a
45 # It is necessary to `create` a new validator instead of using `extend` because
46 # the `extend` method does not accept an `application_validators` parameter.
47 # We want `_placeholders` to be applicable always, without needing to modify
48 # the schema itself.
49 return create( 1a
50 meta_schema=Draft202012Validator.META_SCHEMA,
51 validators=validators,
52 version="prefect",
53 type_checker=Draft202012Validator.TYPE_CHECKER,
54 format_checker=Draft202012Validator.FORMAT_CHECKER,
55 id_of=cast( # the stub for create() is wrong here; id_of accepts (Schema) -> str | None
56 Callable[[Schema], str], Draft202012Validator.ID_OF
57 ),
58 applicable_validators=_applicable_validators,
59 )
62_VALIDATOR = _build_validator() 1a
65def is_valid_schema(schema: ObjectSchema, preprocess: bool = True) -> None: 1a
66 if preprocess:
67 schema = preprocess_schema(schema)
68 try:
69 _VALIDATOR.check_schema(schema, format_checker=_VALIDATOR.FORMAT_CHECKER)
70 except jsonschema.SchemaError as exc:
71 raise ValueError(f"Invalid schema: {exc.message}") from exc
74def validate( 1a
75 obj: dict[str, Any],
76 schema: ObjectSchema,
77 raise_on_error: bool = False,
78 preprocess: bool = True,
79 ignore_required: bool = False,
80 allow_none_with_default: bool = False,
81) -> list[JSONSchemaValidationError]:
82 if preprocess:
83 schema = preprocess_schema(schema, allow_none_with_default)
85 if ignore_required:
86 schema = remove_nested_keys(["required"], schema)
88 if raise_on_error:
89 try:
90 jsonschema.validate(obj, schema, _VALIDATOR)
91 except RecursionError:
92 raise CircularSchemaRefError
93 except JSONSchemaValidationError as exc:
94 if exc.json_path == "$":
95 error_message = "Validation failed."
96 else:
97 error_message = (
98 f"Validation failed for field {exc.json_path.replace('$.', '')!r}."
99 )
100 error_message += f" Failure reason: {exc.message}"
101 raise ValidationError(error_message) from exc
102 return []
103 else:
104 try:
105 validator = _VALIDATOR(schema, format_checker=_VALIDATOR.FORMAT_CHECKER)
106 errors = list(validator.iter_errors(obj)) # type: ignore
107 except RecursionError:
108 raise CircularSchemaRefError
109 return errors
112def is_valid(obj: dict[str, Any], schema: ObjectSchema) -> bool: 1a
113 errors = validate(obj, schema)
114 return not errors
117def prioritize_placeholder_errors( 1a
118 errors: list[JSONSchemaValidationError],
119) -> list[JSONSchemaValidationError]:
120 errors_by_path: dict[str, list[JSONSchemaValidationError]] = defaultdict(list)
121 for error in errors:
122 path_str = "->".join(str(p) for p in error.relative_path)
123 errors_by_path[path_str].append(error)
125 filtered_errors: list[JSONSchemaValidationError] = []
126 for grouped_errors in errors_by_path.values():
127 placeholders_errors = [
128 error
129 for error in grouped_errors
130 if error.validator == PLACEHOLDERS_VALIDATOR_NAME # type: ignore # typing stubs are incomplete
131 ]
133 if placeholders_errors:
134 filtered_errors.extend(placeholders_errors)
135 else:
136 filtered_errors.extend(grouped_errors)
138 return filtered_errors
141def build_error_obj(errors: list[JSONSchemaValidationError]) -> dict[str, Any]: 1a
142 error_response: dict[str, Any] = {"errors": []}
144 # If multiple errors are present for the same path and one of them
145 # is a placeholder error, we want only want to use the placeholder error.
146 errors = prioritize_placeholder_errors(errors)
148 for error in errors:
149 # If the Placeholder is not representing an error, we can skip it
150 if isinstance(error.instance, Placeholder) and not error.instance.is_error:
151 continue
153 path = deque(error.relative_path)
155 # Required errors should be moved one level down to the property
156 # they're associated with, so we add an extra level to the path.
157 if error.validator == "required": # type: ignore
158 required_field = error.message.partition(" ")[0].strip("'")
159 path.append(required_field)
161 current: list[Any] = error_response["errors"]
163 # error at the root, just append the error message
164 if not path:
165 current.append(error.message)
167 while path:
168 part = path.popleft()
169 if isinstance(part, int):
170 if not path:
171 current.append({"index": part, "errors": [error.message]})
172 else:
173 for entry in current:
174 if entry.get("index") == part:
175 current = cast(list[Any], entry["errors"])
176 break
177 else:
178 new_entry: dict[str, Any] = {"index": part, "errors": []}
179 current.append(new_entry)
180 current = new_entry["errors"]
181 else:
182 if not path:
183 current.append({"property": part, "errors": [error.message]})
184 else:
185 for entry in current:
186 if entry.get("property") == part:
187 current = entry.get("errors", [])
188 break
189 else:
190 new_entry = {"property": part, "errors": []}
191 current.append(new_entry)
192 current = new_entry["errors"]
194 valid = not bool(error_response["errors"])
195 error_response["valid"] = valid
197 return error_response
200def _fix_null_typing( 1a
201 key: str,
202 schema: dict[str, Any],
203 required_fields: list[str],
204 allow_none_with_default: bool = False,
205) -> None:
206 """
207 Pydantic V1 does not generate a valid Draft2020-12 schema for null types.
208 """
209 if (
210 key not in required_fields
211 and "type" in schema
212 and schema.get("type") != "null"
213 and ("default" not in schema or allow_none_with_default)
214 ):
215 schema["anyOf"] = [{"type": schema["type"]}, {"type": "null"}]
216 del schema["type"]
219def _fix_tuple_items(schema: dict[str, Any]) -> None: 1a
220 """
221 Pydantic V1 does not generate a valid Draft2020-12 schema for tuples.
222 """
223 if (
224 schema.get("items")
225 and isinstance(schema["items"], list)
226 and not schema.get("prefixItems")
227 ):
228 schema["prefixItems"] = deepcopy(cast(list[Any], schema["items"]))
229 del schema["items"]
232def process_properties( 1a
233 properties: dict[str, dict[str, Any]],
234 required_fields: list[str],
235 allow_none_with_default: bool = False,
236) -> None:
237 for key, schema in properties.items():
238 _fix_null_typing(key, schema, required_fields, allow_none_with_default)
239 _fix_tuple_items(schema)
241 if "properties" in schema:
242 required_fields = schema.get("required", [])
243 process_properties(schema["properties"], required_fields)
246def preprocess_schema( 1a
247 schema: ObjectSchema,
248 allow_none_with_default: bool = False,
249) -> ObjectSchema:
250 schema = deepcopy(schema)
252 if "properties" in schema:
253 required_fields = schema.get("required", [])
254 process_properties(
255 schema["properties"], required_fields, allow_none_with_default
256 )
258 if "definitions" in schema: # Also process definitions for reused models
259 definitions = cast(dict[str, Any], schema["definitions"])
260 for definition in definitions.values():
261 if "properties" in definition:
262 required_fields = definition.get("required", [])
263 process_properties(
264 definition["properties"], required_fields, allow_none_with_default
265 )
266 # Allow block types to be referenced by their id
267 if "block_type_slug" in definition:
268 schema["definitions"][definition["title"]] = {
269 "oneOf": [
270 definition,
271 {
272 "type": "object",
273 "properties": {
274 "$ref": {
275 "oneOf": [
276 {
277 "type": "string",
278 "format": "uuid",
279 },
280 {
281 "type": "object",
282 "additionalProperties": {
283 "type": "string",
284 },
285 "minProperties": 1,
286 },
287 ]
288 }
289 },
290 "required": [
291 "$ref",
292 ],
293 },
294 ]
295 }
297 return schema