Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/schemas/validators.py: 32%
255 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2This module contains a collection of functions that are used to validate the
3values of fields in Pydantic models. These functions are used as validators in
4Pydantic models to ensure that the values of fields conform to the expected
5format.
6This will be subject to consolidation and refactoring over the next few months.
7"""
9from __future__ import annotations 1a
11import datetime 1a
12import os 1a
13import urllib.parse 1a
14import warnings 1a
15from collections.abc import Iterable, Mapping, MutableMapping 1a
16from copy import copy 1a
17from pathlib import Path 1a
18from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union, overload 1a
19from uuid import UUID 1a
20from zoneinfo import ZoneInfo 1a
22import jsonschema 1a
24from prefect.types._datetime import DateTime, create_datetime_instance, get_timezones 1a
25from prefect.utilities.collections import isiterable 1a
26from prefect.utilities.filesystem import relative_path_to_current_platform 1a
27from prefect.utilities.importtools import from_qualified_name 1a
28from prefect.utilities.names import generate_slug 1a
30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a
31 from prefect.serializers import Serializer
33T = TypeVar("T") 1a
34M = TypeVar("M", bound=Mapping[str, Any]) 1a
35MM = TypeVar("MM", bound=MutableMapping[str, Any]) 1a
38def validate_values_conform_to_schema( 1a
39 values: Optional[Mapping[str, Any]],
40 schema: Optional[Mapping[str, Any]],
41 ignore_required: bool = False,
42) -> None:
43 """
44 Validate that the provided values conform to the provided json schema.
46 TODO: This schema validation is outdated. The latest version is
47 prefect.utilities.schema_tools.validate, which handles fixes to Pydantic v1
48 schemas for null values and tuples.
50 Args:
51 values: The values to validate.
52 schema: The schema to validate against.
53 ignore_required: Whether to ignore the required fields in the schema. Should be
54 used when a partial set of values is acceptable.
56 Raises:
57 ValueError: If the parameters do not conform to the schema.
59 """
60 from prefect.utilities.collections import remove_nested_keys
62 if ignore_required:
63 schema = remove_nested_keys(["required"], schema)
65 try:
66 if schema is not None and values is not None:
67 jsonschema.validate(values, schema)
68 except jsonschema.ValidationError as exc:
69 if exc.json_path == "$":
70 error_message = "Validation failed."
71 else:
72 error_message = (
73 f"Validation failed for field {exc.json_path.replace('$.', '')!r}."
74 )
75 error_message += f" Failure reason: {exc.message}"
76 raise ValueError(error_message) from exc
77 except jsonschema.SchemaError as exc:
78 raise ValueError(
79 "The provided schema is not a valid json schema. Schema error:"
80 f" {exc.message}"
81 ) from exc
84### DEPLOYMENT SCHEMA VALIDATORS ###
87def validate_parameters_conform_to_schema( 1a
88 parameters: M, values: Mapping[str, Any]
89) -> M:
90 """Validate that the parameters conform to the parameter schema."""
91 if values.get("enforce_parameter_schema"):
92 validate_values_conform_to_schema(
93 parameters, values.get("parameter_openapi_schema"), ignore_required=True
94 )
95 return parameters
98@overload 1a
99def validate_parameter_openapi_schema(schema: M, values: Mapping[str, Any]) -> M: ... 99 ↛ exitline 99 didn't return from function 'validate_parameter_openapi_schema' because 1a
102@overload 1a
103def validate_parameter_openapi_schema( 103 ↛ exitline 103 didn't return from function 'validate_parameter_openapi_schema' because 1a
104 schema: None, values: Mapping[str, Any]
105) -> None: ...
108def validate_parameter_openapi_schema( 1a
109 schema: Optional[M], values: Mapping[str, Any]
110) -> Optional[M]:
111 """Validate that the parameter_openapi_schema is a valid json schema."""
112 if values.get("enforce_parameter_schema"):
113 try:
114 if schema is not None:
115 # Most closely matches the schemas generated by pydantic
116 jsonschema.Draft202012Validator.check_schema(schema)
117 except jsonschema.SchemaError as exc:
118 raise ValueError(
119 "The provided schema is not a valid json schema. Schema error:"
120 f" {exc.message}"
121 ) from exc
123 return schema
126def convert_to_strings(value: Union[Any, Iterable[Any]]) -> Union[str, list[str]]: 1a
127 if isiterable(value):
128 return [str(item) for item in value]
129 return str(value)
132### SCHEDULE SCHEMA VALIDATORS ###
135def reconcile_schedules_runner(values: MM) -> MM: 1a
136 from prefect.deployments.schedules import (
137 normalize_to_deployment_schedule,
138 )
140 schedules = values.get("schedules")
141 if schedules is not None and len(schedules) > 0:
142 values["schedules"] = normalize_to_deployment_schedule(schedules)
144 return values
147@overload 1a
148def validate_schedule_max_scheduled_runs(v: int, limit: int) -> int: ... 148 ↛ exitline 148 didn't return from function 'validate_schedule_max_scheduled_runs' because 1a
151@overload 1a
152def validate_schedule_max_scheduled_runs(v: None, limit: int) -> None: ... 152 ↛ exitline 152 didn't return from function 'validate_schedule_max_scheduled_runs' because 1a
155def validate_schedule_max_scheduled_runs(v: Optional[int], limit: int) -> Optional[int]: 1a
156 if v is not None and v > limit:
157 raise ValueError(f"`max_scheduled_runs` must be less than or equal to {limit}.")
158 return v
161def remove_old_deployment_fields(values: MM) -> MM: 1a
162 # 2.7.7 removed worker_pool_queue_id in lieu of worker_pool_name and
163 # worker_pool_queue_name. Those fields were later renamed to work_pool_name
164 # and work_queue_name. This validator removes old fields provided
165 # by older clients to avoid 422 errors.
166 values_copy = copy(values)
167 worker_pool_queue_id = values_copy.pop("worker_pool_queue_id", None)
168 worker_pool_name = values_copy.pop("worker_pool_name", None)
169 worker_pool_queue_name = values_copy.pop("worker_pool_queue_name", None)
170 work_pool_queue_name = values_copy.pop("work_pool_queue_name", None)
171 if worker_pool_queue_id:
172 warnings.warn(
173 (
174 "`worker_pool_queue_id` is no longer supported for creating or updating "
175 "deployments. Please use `work_pool_name` and "
176 "`work_queue_name` instead."
177 ),
178 UserWarning,
179 )
180 if worker_pool_name or worker_pool_queue_name or work_pool_queue_name:
181 warnings.warn(
182 (
183 "`worker_pool_name`, `worker_pool_queue_name`, and "
184 "`work_pool_name` are"
185 "no longer supported for creating or updating "
186 "deployments. Please use `work_pool_name` and "
187 "`work_queue_name` instead."
188 ),
189 UserWarning,
190 )
191 return values_copy
194def reconcile_paused_deployment(values: MM) -> MM: 1a
195 paused = values.get("paused")
197 if paused is None:
198 values["paused"] = False
200 return values
203def default_anchor_date(v: DateTime) -> DateTime: 1a
204 return create_datetime_instance(v)
207@overload 1a
208def default_timezone(v: str, values: Optional[Mapping[str, Any]] = ...) -> str: ... 208 ↛ exitline 208 didn't return from function 'default_timezone' because 1a
211@overload 1a
212def default_timezone( 212 ↛ exitline 212 didn't return from function 'default_timezone' because 1a
213 v: None, values: Optional[Mapping[str, Any]] = ...
214) -> Optional[str]: ...
217def default_timezone( 1a
218 v: Optional[str], values: Optional[Mapping[str, Any]] = None
219) -> Optional[str]:
220 values = values or {}
221 timezones = get_timezones()
223 if v is not None:
224 if v and v not in timezones:
225 raise ValueError(
226 f'Invalid timezone: "{v}" (specify in IANA tzdata format, for example,'
227 " America/New_York)"
228 )
229 return v
231 # anchor schedules
232 elif "anchor_date" in values:
233 anchor_date: datetime.datetime = values["anchor_date"]
234 if isinstance(anchor_date.tzinfo, ZoneInfo):
235 tz = anchor_date.tzinfo.key
236 elif hasattr(anchor_date.tzinfo, "name"):
237 tz = anchor_date.tzinfo.name
238 else:
239 tz = "UTC"
240 # sometimes anchor dates have "timezones" that are UTC offsets
241 # like "-04:00". This happens when parsing ISO8601 strings.
242 # In this case we, the correct inferred localization is "UTC".
243 return tz if tz in timezones else "UTC"
245 # cron schedules
246 return v
249def validate_cron_string(v: str) -> str: 1a
250 from prefect._vendor.croniter import croniter
252 # croniter allows "random" and "hashed" expressions
253 # which we do not support https://github.com/kiorky/croniter
254 if not croniter.is_valid(v):
255 raise ValueError(f'Invalid cron string: "{v}"')
256 elif any(c for c in v.split() if c.casefold() in ["R", "H", "r", "h"]):
257 raise ValueError(
258 f'Random and Hashed expressions are unsupported, received: "{v}"'
259 )
260 return v
263# approx. 1 years worth of RDATEs + buffer
264MAX_RRULE_LENGTH = 6500 1a
267def validate_rrule_string(v: str) -> str: 1a
268 import dateutil.rrule
270 # attempt to parse the rrule string as an rrule object
271 # this will error if the string is invalid
272 try:
273 dateutil.rrule.rrulestr(v, cache=True)
274 except ValueError as exc:
275 # rrules errors are a mix of cryptic and informative
276 # so reraise to be clear that the string was invalid
277 raise ValueError(f'Invalid RRule string "{v}": {exc}')
278 if len(v) > MAX_RRULE_LENGTH:
279 raise ValueError(
280 f'Invalid RRule string "{v[:40]}..."\n'
281 f"Max length is {MAX_RRULE_LENGTH}, got {len(v)}"
282 )
283 return v
286### STATE SCHEMA VALIDATORS ###
289def get_or_create_run_name(name: Optional[str]) -> str: 1a
290 return name or generate_slug(2)
293### FILESYSTEM SCHEMA VALIDATORS ###
296def stringify_path(value: Union[str, os.PathLike[str]]) -> str: 1a
297 return os.fspath(value)
300def validate_basepath(value: str) -> str: 1a
301 scheme, netloc, _, _, _ = urllib.parse.urlsplit(value)
303 if not scheme:
304 raise ValueError(f"Base path must start with a scheme. Got {value!r}.")
306 if not netloc:
307 raise ValueError(
308 f"Base path must include a location after the scheme. Got {value!r}."
309 )
311 if scheme == "file":
312 raise ValueError(
313 "Base path scheme cannot be 'file'. Use `LocalFileSystem` instead for"
314 " local file access."
315 )
317 return value
320### SERIALIZER SCHEMA VALIDATORS ###
323def validate_picklelib(value: str) -> str: 1a
324 """
325 Check that the given pickle library is importable and has dumps/loads methods.
326 """
327 try:
328 pickler = from_qualified_name(value)
329 except (ImportError, AttributeError) as exc:
330 raise ValueError(
331 f"Failed to import requested pickle library: {value!r}."
332 ) from exc
334 if not callable(getattr(pickler, "dumps", None)):
335 raise ValueError(f"Pickle library at {value!r} does not have a 'dumps' method.")
337 if not callable(getattr(pickler, "loads", None)):
338 raise ValueError(f"Pickle library at {value!r} does not have a 'loads' method.")
340 return value
343def validate_dump_kwargs(value: M) -> M: 1a
344 # `default` is set by `object_encoder`. A user provided callable would make this
345 # class unserializable anyway.
346 if "default" in value: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true1ab
347 raise ValueError("`default` cannot be provided. Use `object_encoder` instead.")
348 return value 1ab
351def validate_load_kwargs(value: M) -> M: 1a
352 # `object_hook` is set by `object_decoder`. A user provided callable would make
353 # this class unserializable anyway.
354 if "object_hook" in value:
355 raise ValueError(
356 "`object_hook` cannot be provided. Use `object_decoder` instead."
357 )
358 return value
361@overload 1a
362def cast_type_names_to_serializers(value: str) -> "Serializer[Any]": ... 362 ↛ exitline 362 didn't return from function 'cast_type_names_to_serializers' because 1a
365@overload 1a
366def cast_type_names_to_serializers(value: "Serializer[T]") -> "Serializer[T]": ... 366 ↛ exitline 366 didn't return from function 'cast_type_names_to_serializers' because 1a
369def cast_type_names_to_serializers( 1a
370 value: Union[str, "Serializer[Any]"],
371) -> "Serializer[Any]":
372 from prefect.serializers import Serializer
374 if isinstance(value, str):
375 return Serializer(type=value)
376 return value
379def validate_compressionlib(value: str) -> str: 1a
380 """
381 Check that the given pickle library is importable and has compress/decompress
382 methods.
383 """
384 try:
385 compressor = from_qualified_name(value)
386 except (ImportError, AttributeError) as exc:
387 raise ValueError(
388 f"Failed to import requested compression library: {value!r}."
389 ) from exc
391 if not callable(getattr(compressor, "compress", None)):
392 raise ValueError(
393 f"Compression library at {value!r} does not have a 'compress' method."
394 )
396 if not callable(getattr(compressor, "decompress", None)):
397 raise ValueError(
398 f"Compression library at {value!r} does not have a 'decompress' method."
399 )
401 return value
404# TODO: if we use this elsewhere we can change the error message to be more generic
405@overload 1a
406def list_length_50_or_less(v: int) -> int: ... 406 ↛ exitline 406 didn't return from function 'list_length_50_or_less' because 1a
409@overload 1a
410def list_length_50_or_less(v: float) -> float: ... 410 ↛ exitline 410 didn't return from function 'list_length_50_or_less' because 1a
413@overload 1a
414def list_length_50_or_less(v: list[int]) -> list[int]: ... 414 ↛ exitline 414 didn't return from function 'list_length_50_or_less' because 1a
417@overload 1a
418def list_length_50_or_less(v: list[float]) -> list[float]: ... 418 ↛ exitline 418 didn't return from function 'list_length_50_or_less' because 1a
421@overload 1a
422def list_length_50_or_less(v: None) -> None: ... 422 ↛ exitline 422 didn't return from function 'list_length_50_or_less' because 1a
425def list_length_50_or_less( 1a
426 v: Optional[int | float | list[int] | list[float]],
427) -> Optional[int | float | list[int] | list[float]]:
428 if isinstance(v, list) and (len(v) > 50):
429 raise ValueError("Can not configure more than 50 retry delays per task.")
430 return v
433# TODO: if we use this elsewhere we can change the error message to be more generic
434@overload 1a
435def validate_not_negative(v: float) -> float: ... 435 ↛ exitline 435 didn't return from function 'validate_not_negative' because 1a
438@overload 1a
439def validate_not_negative(v: None) -> None: ... 439 ↛ exitline 439 didn't return from function 'validate_not_negative' because 1a
442def validate_not_negative(v: Optional[float]) -> Optional[float]: 1a
443 if v is not None and v < 0:
444 raise ValueError("`retry_jitter_factor` must be >= 0.")
445 return v
448def validate_default_queue_id_not_none(v: Optional[UUID]) -> UUID: 1a
449 if v is None:
450 raise ValueError(
451 "`default_queue_id` is a required field. If you are "
452 "creating a new WorkPool and don't have a queue "
453 "ID yet, use the `actions.WorkPoolCreate` model instead."
454 )
455 return v
458@overload 1a
459def validate_max_metadata_length(v: MM) -> MM: ... 459 ↛ exitline 459 didn't return from function 'validate_max_metadata_length' because 1a
462@overload 1a
463def validate_max_metadata_length(v: None) -> None: ... 463 ↛ exitline 463 didn't return from function 'validate_max_metadata_length' because 1a
466def validate_max_metadata_length(v: Optional[MM]) -> Optional[MM]: 1a
467 max_metadata_length = 500
468 if v is None:
469 return v
470 for key in v.keys():
471 if len(str(v[key])) > max_metadata_length:
472 v[key] = str(v[key])[:max_metadata_length] + "..."
473 return v
476### TASK RUN SCHEMA VALIDATORS ###
479@overload 1a
480def validate_cache_key_length(cache_key: str) -> str: ... 480 ↛ exitline 480 didn't return from function 'validate_cache_key_length' because 1a
483@overload 1a
484def validate_cache_key_length(cache_key: None) -> None: ... 484 ↛ exitline 484 didn't return from function 'validate_cache_key_length' because 1a
487def validate_cache_key_length(cache_key: Optional[str]) -> Optional[str]: 1a
488 from prefect.settings import (
489 PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH,
490 )
492 if cache_key and len(cache_key) > PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH.value():
493 raise ValueError(
494 "Cache key exceeded maximum allowed length of"
495 f" {PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH.value()} characters."
496 )
497 return cache_key
500def set_run_policy_deprecated_fields(values: MM) -> MM: 1a
501 """
502 If deprecated fields are provided, populate the corresponding new fields
503 to preserve orchestration behavior.
504 """
505 if not values.get("retries", None) and values.get("max_retries", 0) != 0:
506 values["retries"] = values["max_retries"]
508 if (
509 not values.get("retry_delay", None)
510 and values.get("retry_delay_seconds", 0) != 0
511 ):
512 values["retry_delay"] = values["retry_delay_seconds"]
514 return values
517### PYTHON ENVIRONMENT SCHEMA VALIDATORS ###
520@overload 1a
521def return_v_or_none(v: str) -> str: ... 521 ↛ exitline 521 didn't return from function 'return_v_or_none' because 1a
524@overload 1a
525def return_v_or_none(v: None) -> None: ... 525 ↛ exitline 525 didn't return from function 'return_v_or_none' because 1a
528def return_v_or_none(v: Optional[str]) -> Optional[str]: 1a
529 """Make sure that empty strings are treated as None"""
530 if not v:
531 return None
532 return v
535### BLOCK SCHEMA VALIDATORS ###
538def validate_parent_and_ref_diff(values: M) -> M: 1a
539 parent_id = values.get("parent_block_document_id")
540 ref_id = values.get("reference_block_document_id")
541 if parent_id and ref_id and parent_id == ref_id:
542 raise ValueError(
543 "`parent_block_document_id` and `reference_block_document_id` cannot be"
544 " the same"
545 )
546 return values
549def validate_name_present_on_nonanonymous_blocks(values: M) -> M: 1a
550 # anonymous blocks may have no name prior to actually being
551 # stored in the database
552 if not values.get("is_anonymous") and not values.get("name"):
553 raise ValueError("Names must be provided for block documents.")
554 return values
557### PROCESS JOB CONFIGURATION VALIDATORS ###
560@overload 1a
561def validate_working_dir(v: str) -> Path: ... 561 ↛ exitline 561 didn't return from function 'validate_working_dir' because 1a
564@overload 1a
565def validate_working_dir(v: None) -> None: ... 565 ↛ exitline 565 didn't return from function 'validate_working_dir' because 1a
568def validate_working_dir(v: Optional[Path | str]) -> Optional[Path]: 1a
569 """Make sure that the working directory is formatted for the current platform."""
570 if isinstance(v, str):
571 return relative_path_to_current_platform(v)
572 return v