Coverage for /usr/local/lib/python3.12/site-packages/prefect/types/__init__.py: 40%

105 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3from datetime import timedelta 1a

4from functools import partial 1a

5from typing import Annotated, Any, Dict, Optional, TypeVar, Union 1a

6from typing_extensions import Literal 1a

7import orjson 1a

8import pydantic 1a

9 

10from ._datetime import DateTime, Date 1a

11from .names import ( 1a

12 Name, 

13 NameOrEmpty, 

14 NonEmptyishName, 

15 BANNED_CHARACTERS, 

16 WITHOUT_BANNED_CHARACTERS, 

17 MAX_VARIABLE_NAME_LENGTH, 

18 URILike, 

19 ValidAssetKey, 

20) 

21from pydantic import ( 1a

22 AfterValidator, 

23 BeforeValidator, 

24 Field, 

25 StrictBool, 

26 StrictFloat, 

27 StrictInt, 

28 StrictStr, 

29 TypeAdapter, 

30) 

31from zoneinfo import available_timezones 1a

32 

33T = TypeVar("T") 1a

34 

35MAX_VARIABLE_VALUE_LENGTH = 5000 1a

36 

37NonNegativeInteger = Annotated[int, Field(ge=0)] 1a

38PositiveInteger = Annotated[int, Field(gt=0)] 1a

39NonNegativeFloat = Annotated[float, Field(ge=0.0)] 1a

40 

41 

42def _validate_non_negative_timedelta(v: timedelta) -> timedelta: 1a

43 """Validate that a timedelta is non-negative.""" 

44 if v.total_seconds() < 0: 

45 raise ValueError("schedule_after must be non-negative") 

46 return v 

47 

48 

49NonNegativeTimeDelta = Annotated[ 1a

50 timedelta, AfterValidator(_validate_non_negative_timedelta) 

51] 

52 

53 

54def _convert_seconds_to_timedelta(value: Any) -> Any: 1a

55 """ 

56 Convert numeric values to timedelta for backward compatibility. 

57 

58 Accepts: 

59 - int/float: interpreted as seconds 

60 - str that can be parsed as float: interpreted as seconds 

61 - timedelta: passed through unchanged 

62 - str in ISO 8601 duration or time format: handled by Pydantic 

63 """ 

64 if isinstance(value, timedelta): 64 ↛ 67line 64 didn't jump to line 67 because the condition on line 64 was always true1a

65 return value 1a

66 

67 if isinstance(value, (int, float)): 

68 return timedelta(seconds=value) 

69 

70 if isinstance(value, str): 

71 # Try to parse as a numeric string first 

72 try: 

73 seconds = float(value) 

74 return timedelta(seconds=seconds) 

75 except ValueError: 

76 # Not a numeric string, let Pydantic handle it 

77 # (ISO 8601 duration format, time format, etc.) 

78 pass 

79 

80 return value 

81 

82 

83SecondsTimeDelta = Annotated[timedelta, BeforeValidator(_convert_seconds_to_timedelta)] 1a

84 

85TimeZone = Annotated[ 1a

86 str, 

87 Field( 

88 pattern="|".join( 

89 [z for z in sorted(available_timezones()) if "localtime" not in z] 

90 ), 

91 examples=["America/New_York"], 

92 ), 

93] 

94 

95 

96VariableValue = Union[ 1a

97 StrictStr, 

98 StrictInt, 

99 StrictBool, 

100 StrictFloat, 

101 None, 

102 dict[str, Any], 

103 list[Any], 

104] 

105 

106 

107def check_variable_value(value: object) -> object: 1a

108 try: 

109 json_string = orjson.dumps(value) 

110 except orjson.JSONEncodeError: 

111 raise ValueError("Variable value must be serializable to JSON") 

112 

113 if value is not None and len(json_string) > MAX_VARIABLE_VALUE_LENGTH: 

114 raise ValueError( 

115 f"Variable value must be less than {MAX_VARIABLE_VALUE_LENGTH} characters" 

116 ) 

117 return value 

118 

119 

120StrictVariableValue = Annotated[VariableValue, BeforeValidator(check_variable_value)] 1a

121 

122LaxUrl = Annotated[str, BeforeValidator(lambda x: str(x).strip())] 1ab

123 

124StatusCode = Annotated[int, Field(ge=100, le=599)] 1a

125 

126 

127def cast_none_to_empty_dict(value: Any) -> dict[str, Any]: 1a

128 if value is None: 

129 return {} 

130 return value 

131 

132 

133KeyValueLabels = Annotated[ 1a

134 dict[str, Union[StrictBool, StrictInt, StrictFloat, str]], 

135 BeforeValidator(cast_none_to_empty_dict), 

136] 

137 

138 

139ListOfNonEmptyStrings = Annotated[ 1a

140 list[str], 

141 BeforeValidator(lambda x: [str(s) for s in x if str(s).strip()]), 

142] 

143 

144 

145class SecretDict(pydantic.Secret[dict[str, Any]]): 1a

146 pass 1a

147 

148 

149def validate_set_T_from_delim_string( 1a

150 value: Union[str, T, set[T], None], type_: Any, delim: str | None = None 

151) -> set[T]: 

152 """ 

153 "no-info" before validator useful in scooping env vars 

154 

155 e.g. `PREFECT_CLIENT_RETRY_EXTRA_CODES=429,502,503` -> `{429, 502, 503}` 

156 e.g. `PREFECT_CLIENT_RETRY_EXTRA_CODES=429` -> `{429}` 

157 """ 

158 if not value: 158 ↛ 161line 158 didn't jump to line 161 because the condition on line 158 was always true1a

159 return set() 1a

160 

161 T_adapter = TypeAdapter(type_) 

162 delim = delim or "," 

163 if isinstance(value, str): 

164 return {T_adapter.validate_strings(s.strip()) for s in value.split(delim)} 

165 errors: list[pydantic.ValidationError] = [] 

166 try: 

167 return {T_adapter.validate_python(value)} 

168 except pydantic.ValidationError as e: 

169 errors.append(e) 

170 try: 

171 return TypeAdapter(set[type_]).validate_python(value) 

172 except pydantic.ValidationError as e: 

173 errors.append(e) 

174 raise ValueError(f"Invalid set[{type_}]: {errors}") 

175 

176 

177ClientRetryExtraCodes = Annotated[ 1a

178 Union[str, StatusCode, set[StatusCode], None], 

179 BeforeValidator(partial(validate_set_T_from_delim_string, type_=StatusCode)), 

180] 

181 

182 

183def parse_retry_delay_input(value: Any) -> Any: 1a

184 """ 

185 Parses various inputs (string, int, float, list) into a format suitable 

186 for TaskRetryDelaySeconds (int, float, list[float], or None). 

187 Handles comma-separated strings for lists of delays. 

188 """ 

189 if isinstance(value, str): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true1a

190 stripped_value = value.strip() 

191 if not stripped_value: 

192 return None # Treat empty or whitespace-only string as None 

193 

194 delim = "," 

195 # Split and filter empty strings that result from multiple commas (e.g., "10,,20") 

196 parts = [s.strip() for s in stripped_value.split(delim) if s.strip()] 

197 

198 if not parts: # e.g., value was just "," or " , " 

199 return None 

200 

201 def _parse_num_part(part_str: str) -> Union[float, int]: 

202 try: 

203 # Prefer float to align with list[float] in TaskRetryDelaySeconds 

204 return TypeAdapter(float).validate_strings(part_str) 

205 except pydantic.ValidationError: 

206 try: 

207 return TypeAdapter(int).validate_strings(part_str) 

208 except pydantic.ValidationError as e_int: 

209 raise ValueError( 

210 f"Invalid number format '{part_str}' for retry delay." 

211 ) from e_int 

212 

213 if len(parts) == 1: 

214 return _parse_num_part(parts[0]) 

215 else: 

216 return [_parse_num_part(p) for p in parts] 

217 

218 # For non-string inputs (int, float, list, None, etc.), pass them through. 

219 # Pydantic will then validate them against Union[int, float, list[float], None]. 

220 return value 1a

221 

222 

223TaskRetryDelaySeconds = Annotated[ 1a

224 Union[str, int, float, list[float], None], 

225 BeforeValidator(parse_retry_delay_input), 

226] 

227 

228LogLevel = Annotated[ 1a

229 Literal["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"], 

230 BeforeValidator(lambda x: x.upper()), 

231] 

232 

233 

234def convert_none_to_empty_dict(v: Optional[KeyValueLabels]) -> KeyValueLabels: 1a

235 return v or {} 

236 

237 

238KeyValueLabelsField = Annotated[ 1a

239 KeyValueLabels, 

240 Field( 

241 default_factory=dict, 

242 description="A dictionary of key-value labels. Values can be strings, numbers, or booleans.", 

243 examples=[{"key": "value1", "key2": 42}], 

244 ), 

245 BeforeValidator(convert_none_to_empty_dict), 

246] 

247 

248 

249def _deserialize_dict_if_string(value: Any) -> dict[str, str]: 1a

250 """ 

251 Useful when a value is sometimes passed as a string or natively as a dict. 

252 

253 Example, setting the custom headers via the CLI involves passing a string format of a dict: 

254 ``` 

255 prefect settings set PREFECT_CLIENT_CUSTOM_HEADERS='{"X-Test-Header": "test-value", "Authorization": "Bearer token123"}' 

256 ``` 

257 """ 

258 if isinstance(value, str): 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true1a

259 return orjson.loads(value) 

260 return value 1a

261 

262 

263# This is a Dict[str, str] and not a dict[str, str] because the latter causes errors with Pydantic 

264# 2.10.6 and Python < 3.11 

265JsonStringOrDict = Annotated[ 1a

266 Dict[str, str], 

267 BeforeValidator(_deserialize_dict_if_string), 

268] 

269 

270 

271__all__ = [ 1a

272 "BANNED_CHARACTERS", 

273 "WITHOUT_BANNED_CHARACTERS", 

274 "ClientRetryExtraCodes", 

275 "Date", 

276 "DateTime", 

277 "JsonStringOrDict", 

278 "LogLevel", 

279 "KeyValueLabelsField", 

280 "MAX_VARIABLE_NAME_LENGTH", 

281 "MAX_VARIABLE_VALUE_LENGTH", 

282 "NonNegativeInteger", 

283 "PositiveInteger", 

284 "ListOfNonEmptyStrings", 

285 "NonNegativeFloat", 

286 "NonNegativeTimeDelta", 

287 "Name", 

288 "NameOrEmpty", 

289 "NonEmptyishName", 

290 "SecondsTimeDelta", 

291 "ValidAssetKey", 

292 "SecretDict", 

293 "StatusCode", 

294 "StrictVariableValue", 

295 "TaskRetryDelaySeconds", 

296 "URILike", 

297]