Coverage for /usr/local/lib/python3.12/site-packages/prefect/types/__init__.py: 40%
105 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3from datetime import timedelta 1a
4from functools import partial 1a
5from typing import Annotated, Any, Dict, Optional, TypeVar, Union 1a
6from typing_extensions import Literal 1a
7import orjson 1a
8import pydantic 1a
10from ._datetime import DateTime, Date 1a
11from .names import ( 1a
12 Name,
13 NameOrEmpty,
14 NonEmptyishName,
15 BANNED_CHARACTERS,
16 WITHOUT_BANNED_CHARACTERS,
17 MAX_VARIABLE_NAME_LENGTH,
18 URILike,
19 ValidAssetKey,
20)
21from pydantic import ( 1a
22 AfterValidator,
23 BeforeValidator,
24 Field,
25 StrictBool,
26 StrictFloat,
27 StrictInt,
28 StrictStr,
29 TypeAdapter,
30)
31from zoneinfo import available_timezones 1a
33T = TypeVar("T") 1a
35MAX_VARIABLE_VALUE_LENGTH = 5000 1a
37NonNegativeInteger = Annotated[int, Field(ge=0)] 1a
38PositiveInteger = Annotated[int, Field(gt=0)] 1a
39NonNegativeFloat = Annotated[float, Field(ge=0.0)] 1a
42def _validate_non_negative_timedelta(v: timedelta) -> timedelta: 1a
43 """Validate that a timedelta is non-negative."""
44 if v.total_seconds() < 0:
45 raise ValueError("schedule_after must be non-negative")
46 return v
49NonNegativeTimeDelta = Annotated[ 1a
50 timedelta, AfterValidator(_validate_non_negative_timedelta)
51]
54def _convert_seconds_to_timedelta(value: Any) -> Any: 1a
55 """
56 Convert numeric values to timedelta for backward compatibility.
58 Accepts:
59 - int/float: interpreted as seconds
60 - str that can be parsed as float: interpreted as seconds
61 - timedelta: passed through unchanged
62 - str in ISO 8601 duration or time format: handled by Pydantic
63 """
64 if isinstance(value, timedelta): 64 ↛ 67line 64 didn't jump to line 67 because the condition on line 64 was always true1a
65 return value 1a
67 if isinstance(value, (int, float)):
68 return timedelta(seconds=value)
70 if isinstance(value, str):
71 # Try to parse as a numeric string first
72 try:
73 seconds = float(value)
74 return timedelta(seconds=seconds)
75 except ValueError:
76 # Not a numeric string, let Pydantic handle it
77 # (ISO 8601 duration format, time format, etc.)
78 pass
80 return value
83SecondsTimeDelta = Annotated[timedelta, BeforeValidator(_convert_seconds_to_timedelta)] 1a
85TimeZone = Annotated[ 1a
86 str,
87 Field(
88 pattern="|".join(
89 [z for z in sorted(available_timezones()) if "localtime" not in z]
90 ),
91 examples=["America/New_York"],
92 ),
93]
96VariableValue = Union[ 1a
97 StrictStr,
98 StrictInt,
99 StrictBool,
100 StrictFloat,
101 None,
102 dict[str, Any],
103 list[Any],
104]
107def check_variable_value(value: object) -> object: 1a
108 try:
109 json_string = orjson.dumps(value)
110 except orjson.JSONEncodeError:
111 raise ValueError("Variable value must be serializable to JSON")
113 if value is not None and len(json_string) > MAX_VARIABLE_VALUE_LENGTH:
114 raise ValueError(
115 f"Variable value must be less than {MAX_VARIABLE_VALUE_LENGTH} characters"
116 )
117 return value
120StrictVariableValue = Annotated[VariableValue, BeforeValidator(check_variable_value)] 1a
122LaxUrl = Annotated[str, BeforeValidator(lambda x: str(x).strip())] 1ab
124StatusCode = Annotated[int, Field(ge=100, le=599)] 1a
127def cast_none_to_empty_dict(value: Any) -> dict[str, Any]: 1a
128 if value is None:
129 return {}
130 return value
133KeyValueLabels = Annotated[ 1a
134 dict[str, Union[StrictBool, StrictInt, StrictFloat, str]],
135 BeforeValidator(cast_none_to_empty_dict),
136]
139ListOfNonEmptyStrings = Annotated[ 1a
140 list[str],
141 BeforeValidator(lambda x: [str(s) for s in x if str(s).strip()]),
142]
145class SecretDict(pydantic.Secret[dict[str, Any]]): 1a
146 pass 1a
149def validate_set_T_from_delim_string( 1a
150 value: Union[str, T, set[T], None], type_: Any, delim: str | None = None
151) -> set[T]:
152 """
153 "no-info" before validator useful in scooping env vars
155 e.g. `PREFECT_CLIENT_RETRY_EXTRA_CODES=429,502,503` -> `{429, 502, 503}`
156 e.g. `PREFECT_CLIENT_RETRY_EXTRA_CODES=429` -> `{429}`
157 """
158 if not value: 158 ↛ 161line 158 didn't jump to line 161 because the condition on line 158 was always true1a
159 return set() 1a
161 T_adapter = TypeAdapter(type_)
162 delim = delim or ","
163 if isinstance(value, str):
164 return {T_adapter.validate_strings(s.strip()) for s in value.split(delim)}
165 errors: list[pydantic.ValidationError] = []
166 try:
167 return {T_adapter.validate_python(value)}
168 except pydantic.ValidationError as e:
169 errors.append(e)
170 try:
171 return TypeAdapter(set[type_]).validate_python(value)
172 except pydantic.ValidationError as e:
173 errors.append(e)
174 raise ValueError(f"Invalid set[{type_}]: {errors}")
177ClientRetryExtraCodes = Annotated[ 1a
178 Union[str, StatusCode, set[StatusCode], None],
179 BeforeValidator(partial(validate_set_T_from_delim_string, type_=StatusCode)),
180]
183def parse_retry_delay_input(value: Any) -> Any: 1a
184 """
185 Parses various inputs (string, int, float, list) into a format suitable
186 for TaskRetryDelaySeconds (int, float, list[float], or None).
187 Handles comma-separated strings for lists of delays.
188 """
189 if isinstance(value, str): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true1a
190 stripped_value = value.strip()
191 if not stripped_value:
192 return None # Treat empty or whitespace-only string as None
194 delim = ","
195 # Split and filter empty strings that result from multiple commas (e.g., "10,,20")
196 parts = [s.strip() for s in stripped_value.split(delim) if s.strip()]
198 if not parts: # e.g., value was just "," or " , "
199 return None
201 def _parse_num_part(part_str: str) -> Union[float, int]:
202 try:
203 # Prefer float to align with list[float] in TaskRetryDelaySeconds
204 return TypeAdapter(float).validate_strings(part_str)
205 except pydantic.ValidationError:
206 try:
207 return TypeAdapter(int).validate_strings(part_str)
208 except pydantic.ValidationError as e_int:
209 raise ValueError(
210 f"Invalid number format '{part_str}' for retry delay."
211 ) from e_int
213 if len(parts) == 1:
214 return _parse_num_part(parts[0])
215 else:
216 return [_parse_num_part(p) for p in parts]
218 # For non-string inputs (int, float, list, None, etc.), pass them through.
219 # Pydantic will then validate them against Union[int, float, list[float], None].
220 return value 1a
223TaskRetryDelaySeconds = Annotated[ 1a
224 Union[str, int, float, list[float], None],
225 BeforeValidator(parse_retry_delay_input),
226]
228LogLevel = Annotated[ 1a
229 Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
230 BeforeValidator(lambda x: x.upper()),
231]
234def convert_none_to_empty_dict(v: Optional[KeyValueLabels]) -> KeyValueLabels: 1a
235 return v or {}
238KeyValueLabelsField = Annotated[ 1a
239 KeyValueLabels,
240 Field(
241 default_factory=dict,
242 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.",
243 examples=[{"key": "value1", "key2": 42}],
244 ),
245 BeforeValidator(convert_none_to_empty_dict),
246]
249def _deserialize_dict_if_string(value: Any) -> dict[str, str]: 1a
250 """
251 Useful when a value is sometimes passed as a string or natively as a dict.
253 Example, setting the custom headers via the CLI involves passing a string format of a dict:
254 ```
255 prefect settings set PREFECT_CLIENT_CUSTOM_HEADERS='{"X-Test-Header": "test-value", "Authorization": "Bearer token123"}'
256 ```
257 """
258 if isinstance(value, str): 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true1a
259 return orjson.loads(value)
260 return value 1a
263# This is a Dict[str, str] and not a dict[str, str] because the latter causes errors with Pydantic
264# 2.10.6 and Python < 3.11
265JsonStringOrDict = Annotated[ 1a
266 Dict[str, str],
267 BeforeValidator(_deserialize_dict_if_string),
268]
271__all__ = [ 1a
272 "BANNED_CHARACTERS",
273 "WITHOUT_BANNED_CHARACTERS",
274 "ClientRetryExtraCodes",
275 "Date",
276 "DateTime",
277 "JsonStringOrDict",
278 "LogLevel",
279 "KeyValueLabelsField",
280 "MAX_VARIABLE_NAME_LENGTH",
281 "MAX_VARIABLE_VALUE_LENGTH",
282 "NonNegativeInteger",
283 "PositiveInteger",
284 "ListOfNonEmptyStrings",
285 "NonNegativeFloat",
286 "NonNegativeTimeDelta",
287 "Name",
288 "NameOrEmpty",
289 "NonEmptyishName",
290 "SecondsTimeDelta",
291 "ValidAssetKey",
292 "SecretDict",
293 "StatusCode",
294 "StrictVariableValue",
295 "TaskRetryDelaySeconds",
296 "URILike",
297]