Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/schemas/validators.py: 32%

255 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2This module contains a collection of functions that are used to validate the 

3values of fields in Pydantic models. These functions are used as validators in 

4Pydantic models to ensure that the values of fields conform to the expected 

5format. 

6This will be subject to consolidation and refactoring over the next few months. 

7""" 

8 

9from __future__ import annotations 1a

10 

11import datetime 1a

12import os 1a

13import urllib.parse 1a

14import warnings 1a

15from collections.abc import Iterable, Mapping, MutableMapping 1a

16from copy import copy 1a

17from pathlib import Path 1a

18from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union, overload 1a

19from uuid import UUID 1a

20from zoneinfo import ZoneInfo 1a

21 

22import jsonschema 1a

23 

24from prefect.types._datetime import DateTime, create_datetime_instance, get_timezones 1a

25from prefect.utilities.collections import isiterable 1a

26from prefect.utilities.filesystem import relative_path_to_current_platform 1a

27from prefect.utilities.importtools import from_qualified_name 1a

28from prefect.utilities.names import generate_slug 1a

29 

30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a

31 from prefect.serializers import Serializer 

32 

33T = TypeVar("T") 1a

34M = TypeVar("M", bound=Mapping[str, Any]) 1a

35MM = TypeVar("MM", bound=MutableMapping[str, Any]) 1a

36 

37 

38def validate_values_conform_to_schema( 1a

39 values: Optional[Mapping[str, Any]], 

40 schema: Optional[Mapping[str, Any]], 

41 ignore_required: bool = False, 

42) -> None: 

43 """ 

44 Validate that the provided values conform to the provided json schema. 

45 

46 TODO: This schema validation is outdated. The latest version is 

47 prefect.utilities.schema_tools.validate, which handles fixes to Pydantic v1 

48 schemas for null values and tuples. 

49 

50 Args: 

51 values: The values to validate. 

52 schema: The schema to validate against. 

53 ignore_required: Whether to ignore the required fields in the schema. Should be 

54 used when a partial set of values is acceptable. 

55 

56 Raises: 

57 ValueError: If the parameters do not conform to the schema. 

58 

59 """ 

60 from prefect.utilities.collections import remove_nested_keys 

61 

62 if ignore_required: 

63 schema = remove_nested_keys(["required"], schema) 

64 

65 try: 

66 if schema is not None and values is not None: 

67 jsonschema.validate(values, schema) 

68 except jsonschema.ValidationError as exc: 

69 if exc.json_path == "$": 

70 error_message = "Validation failed." 

71 else: 

72 error_message = ( 

73 f"Validation failed for field {exc.json_path.replace('$.', '')!r}." 

74 ) 

75 error_message += f" Failure reason: {exc.message}" 

76 raise ValueError(error_message) from exc 

77 except jsonschema.SchemaError as exc: 

78 raise ValueError( 

79 "The provided schema is not a valid json schema. Schema error:" 

80 f" {exc.message}" 

81 ) from exc 

82 

83 

84### DEPLOYMENT SCHEMA VALIDATORS ### 

85 

86 

87def validate_parameters_conform_to_schema( 1a

88 parameters: M, values: Mapping[str, Any] 

89) -> M: 

90 """Validate that the parameters conform to the parameter schema.""" 

91 if values.get("enforce_parameter_schema"): 

92 validate_values_conform_to_schema( 

93 parameters, values.get("parameter_openapi_schema"), ignore_required=True 

94 ) 

95 return parameters 

96 

97 

98@overload 1a

99def validate_parameter_openapi_schema(schema: M, values: Mapping[str, Any]) -> M: ... 99 ↛ exitline 99 didn't return from function 'validate_parameter_openapi_schema' because 1a

100 

101 

102@overload 1a

103def validate_parameter_openapi_schema( 103 ↛ exitline 103 didn't return from function 'validate_parameter_openapi_schema' because 1a

104 schema: None, values: Mapping[str, Any] 

105) -> None: ... 

106 

107 

108def validate_parameter_openapi_schema( 1a

109 schema: Optional[M], values: Mapping[str, Any] 

110) -> Optional[M]: 

111 """Validate that the parameter_openapi_schema is a valid json schema.""" 

112 if values.get("enforce_parameter_schema"): 

113 try: 

114 if schema is not None: 

115 # Most closely matches the schemas generated by pydantic 

116 jsonschema.Draft202012Validator.check_schema(schema) 

117 except jsonschema.SchemaError as exc: 

118 raise ValueError( 

119 "The provided schema is not a valid json schema. Schema error:" 

120 f" {exc.message}" 

121 ) from exc 

122 

123 return schema 

124 

125 

126def convert_to_strings(value: Union[Any, Iterable[Any]]) -> Union[str, list[str]]: 1a

127 if isiterable(value): 

128 return [str(item) for item in value] 

129 return str(value) 

130 

131 

132### SCHEDULE SCHEMA VALIDATORS ### 

133 

134 

135def reconcile_schedules_runner(values: MM) -> MM: 1a

136 from prefect.deployments.schedules import ( 

137 normalize_to_deployment_schedule, 

138 ) 

139 

140 schedules = values.get("schedules") 

141 if schedules is not None and len(schedules) > 0: 

142 values["schedules"] = normalize_to_deployment_schedule(schedules) 

143 

144 return values 

145 

146 

147@overload 1a

148def validate_schedule_max_scheduled_runs(v: int, limit: int) -> int: ... 148 ↛ exitline 148 didn't return from function 'validate_schedule_max_scheduled_runs' because 1a

149 

150 

151@overload 1a

152def validate_schedule_max_scheduled_runs(v: None, limit: int) -> None: ... 152 ↛ exitline 152 didn't return from function 'validate_schedule_max_scheduled_runs' because 1a

153 

154 

155def validate_schedule_max_scheduled_runs(v: Optional[int], limit: int) -> Optional[int]: 1a

156 if v is not None and v > limit: 

157 raise ValueError(f"`max_scheduled_runs` must be less than or equal to {limit}.") 

158 return v 

159 

160 

161def remove_old_deployment_fields(values: MM) -> MM: 1a

162 # 2.7.7 removed worker_pool_queue_id in lieu of worker_pool_name and 

163 # worker_pool_queue_name. Those fields were later renamed to work_pool_name 

164 # and work_queue_name. This validator removes old fields provided 

165 # by older clients to avoid 422 errors. 

166 values_copy = copy(values) 

167 worker_pool_queue_id = values_copy.pop("worker_pool_queue_id", None) 

168 worker_pool_name = values_copy.pop("worker_pool_name", None) 

169 worker_pool_queue_name = values_copy.pop("worker_pool_queue_name", None) 

170 work_pool_queue_name = values_copy.pop("work_pool_queue_name", None) 

171 if worker_pool_queue_id: 

172 warnings.warn( 

173 ( 

174 "`worker_pool_queue_id` is no longer supported for creating or updating " 

175 "deployments. Please use `work_pool_name` and " 

176 "`work_queue_name` instead." 

177 ), 

178 UserWarning, 

179 ) 

180 if worker_pool_name or worker_pool_queue_name or work_pool_queue_name: 

181 warnings.warn( 

182 ( 

183 "`worker_pool_name`, `worker_pool_queue_name`, and " 

184 "`work_pool_name` are" 

185 "no longer supported for creating or updating " 

186 "deployments. Please use `work_pool_name` and " 

187 "`work_queue_name` instead." 

188 ), 

189 UserWarning, 

190 ) 

191 return values_copy 

192 

193 

194def reconcile_paused_deployment(values: MM) -> MM: 1a

195 paused = values.get("paused") 

196 

197 if paused is None: 

198 values["paused"] = False 

199 

200 return values 

201 

202 

203def default_anchor_date(v: DateTime) -> DateTime: 1a

204 return create_datetime_instance(v) 

205 

206 

207@overload 1a

208def default_timezone(v: str, values: Optional[Mapping[str, Any]] = ...) -> str: ... 208 ↛ exitline 208 didn't return from function 'default_timezone' because 1a

209 

210 

211@overload 1a

212def default_timezone( 212 ↛ exitline 212 didn't return from function 'default_timezone' because 1a

213 v: None, values: Optional[Mapping[str, Any]] = ... 

214) -> Optional[str]: ... 

215 

216 

217def default_timezone( 1a

218 v: Optional[str], values: Optional[Mapping[str, Any]] = None 

219) -> Optional[str]: 

220 values = values or {} 

221 timezones = get_timezones() 

222 

223 if v is not None: 

224 if v and v not in timezones: 

225 raise ValueError( 

226 f'Invalid timezone: "{v}" (specify in IANA tzdata format, for example,' 

227 " America/New_York)" 

228 ) 

229 return v 

230 

231 # anchor schedules 

232 elif "anchor_date" in values: 

233 anchor_date: datetime.datetime = values["anchor_date"] 

234 if isinstance(anchor_date.tzinfo, ZoneInfo): 

235 tz = anchor_date.tzinfo.key 

236 elif hasattr(anchor_date.tzinfo, "name"): 

237 tz = anchor_date.tzinfo.name 

238 else: 

239 tz = "UTC" 

240 # sometimes anchor dates have "timezones" that are UTC offsets 

241 # like "-04:00". This happens when parsing ISO8601 strings. 

242 # In this case we, the correct inferred localization is "UTC". 

243 return tz if tz in timezones else "UTC" 

244 

245 # cron schedules 

246 return v 

247 

248 

249def validate_cron_string(v: str) -> str: 1a

250 from prefect._vendor.croniter import croniter 

251 

252 # croniter allows "random" and "hashed" expressions 

253 # which we do not support https://github.com/kiorky/croniter 

254 if not croniter.is_valid(v): 

255 raise ValueError(f'Invalid cron string: "{v}"') 

256 elif any(c for c in v.split() if c.casefold() in ["R", "H", "r", "h"]): 

257 raise ValueError( 

258 f'Random and Hashed expressions are unsupported, received: "{v}"' 

259 ) 

260 return v 

261 

262 

263# approx. 1 years worth of RDATEs + buffer 

264MAX_RRULE_LENGTH = 6500 1a

265 

266 

267def validate_rrule_string(v: str) -> str: 1a

268 import dateutil.rrule 

269 

270 # attempt to parse the rrule string as an rrule object 

271 # this will error if the string is invalid 

272 try: 

273 dateutil.rrule.rrulestr(v, cache=True) 

274 except ValueError as exc: 

275 # rrules errors are a mix of cryptic and informative 

276 # so reraise to be clear that the string was invalid 

277 raise ValueError(f'Invalid RRule string "{v}": {exc}') 

278 if len(v) > MAX_RRULE_LENGTH: 

279 raise ValueError( 

280 f'Invalid RRule string "{v[:40]}..."\n' 

281 f"Max length is {MAX_RRULE_LENGTH}, got {len(v)}" 

282 ) 

283 return v 

284 

285 

286### STATE SCHEMA VALIDATORS ### 

287 

288 

289def get_or_create_run_name(name: Optional[str]) -> str: 1a

290 return name or generate_slug(2) 

291 

292 

293### FILESYSTEM SCHEMA VALIDATORS ### 

294 

295 

296def stringify_path(value: Union[str, os.PathLike[str]]) -> str: 1a

297 return os.fspath(value) 

298 

299 

300def validate_basepath(value: str) -> str: 1a

301 scheme, netloc, _, _, _ = urllib.parse.urlsplit(value) 

302 

303 if not scheme: 

304 raise ValueError(f"Base path must start with a scheme. Got {value!r}.") 

305 

306 if not netloc: 

307 raise ValueError( 

308 f"Base path must include a location after the scheme. Got {value!r}." 

309 ) 

310 

311 if scheme == "file": 

312 raise ValueError( 

313 "Base path scheme cannot be 'file'. Use `LocalFileSystem` instead for" 

314 " local file access." 

315 ) 

316 

317 return value 

318 

319 

320### SERIALIZER SCHEMA VALIDATORS ### 

321 

322 

323def validate_picklelib(value: str) -> str: 1a

324 """ 

325 Check that the given pickle library is importable and has dumps/loads methods. 

326 """ 

327 try: 

328 pickler = from_qualified_name(value) 

329 except (ImportError, AttributeError) as exc: 

330 raise ValueError( 

331 f"Failed to import requested pickle library: {value!r}." 

332 ) from exc 

333 

334 if not callable(getattr(pickler, "dumps", None)): 

335 raise ValueError(f"Pickle library at {value!r} does not have a 'dumps' method.") 

336 

337 if not callable(getattr(pickler, "loads", None)): 

338 raise ValueError(f"Pickle library at {value!r} does not have a 'loads' method.") 

339 

340 return value 

341 

342 

343def validate_dump_kwargs(value: M) -> M: 1a

344 # `default` is set by `object_encoder`. A user provided callable would make this 

345 # class unserializable anyway. 

346 if "default" in value: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true1ab

347 raise ValueError("`default` cannot be provided. Use `object_encoder` instead.") 

348 return value 1ab

349 

350 

351def validate_load_kwargs(value: M) -> M: 1a

352 # `object_hook` is set by `object_decoder`. A user provided callable would make 

353 # this class unserializable anyway. 

354 if "object_hook" in value: 

355 raise ValueError( 

356 "`object_hook` cannot be provided. Use `object_decoder` instead." 

357 ) 

358 return value 

359 

360 

361@overload 1a

362def cast_type_names_to_serializers(value: str) -> "Serializer[Any]": ... 362 ↛ exitline 362 didn't return from function 'cast_type_names_to_serializers' because 1a

363 

364 

365@overload 1a

366def cast_type_names_to_serializers(value: "Serializer[T]") -> "Serializer[T]": ... 366 ↛ exitline 366 didn't return from function 'cast_type_names_to_serializers' because 1a

367 

368 

369def cast_type_names_to_serializers( 1a

370 value: Union[str, "Serializer[Any]"], 

371) -> "Serializer[Any]": 

372 from prefect.serializers import Serializer 

373 

374 if isinstance(value, str): 

375 return Serializer(type=value) 

376 return value 

377 

378 

379def validate_compressionlib(value: str) -> str: 1a

380 """ 

381 Check that the given pickle library is importable and has compress/decompress 

382 methods. 

383 """ 

384 try: 

385 compressor = from_qualified_name(value) 

386 except (ImportError, AttributeError) as exc: 

387 raise ValueError( 

388 f"Failed to import requested compression library: {value!r}." 

389 ) from exc 

390 

391 if not callable(getattr(compressor, "compress", None)): 

392 raise ValueError( 

393 f"Compression library at {value!r} does not have a 'compress' method." 

394 ) 

395 

396 if not callable(getattr(compressor, "decompress", None)): 

397 raise ValueError( 

398 f"Compression library at {value!r} does not have a 'decompress' method." 

399 ) 

400 

401 return value 

402 

403 

404# TODO: if we use this elsewhere we can change the error message to be more generic 

405@overload 1a

406def list_length_50_or_less(v: int) -> int: ... 406 ↛ exitline 406 didn't return from function 'list_length_50_or_less' because 1a

407 

408 

409@overload 1a

410def list_length_50_or_less(v: float) -> float: ... 410 ↛ exitline 410 didn't return from function 'list_length_50_or_less' because 1a

411 

412 

413@overload 1a

414def list_length_50_or_less(v: list[int]) -> list[int]: ... 414 ↛ exitline 414 didn't return from function 'list_length_50_or_less' because 1a

415 

416 

417@overload 1a

418def list_length_50_or_less(v: list[float]) -> list[float]: ... 418 ↛ exitline 418 didn't return from function 'list_length_50_or_less' because 1a

419 

420 

421@overload 1a

422def list_length_50_or_less(v: None) -> None: ... 422 ↛ exitline 422 didn't return from function 'list_length_50_or_less' because 1a

423 

424 

425def list_length_50_or_less( 1a

426 v: Optional[int | float | list[int] | list[float]], 

427) -> Optional[int | float | list[int] | list[float]]: 

428 if isinstance(v, list) and (len(v) > 50): 

429 raise ValueError("Can not configure more than 50 retry delays per task.") 

430 return v 

431 

432 

433# TODO: if we use this elsewhere we can change the error message to be more generic 

434@overload 1a

435def validate_not_negative(v: float) -> float: ... 435 ↛ exitline 435 didn't return from function 'validate_not_negative' because 1a

436 

437 

438@overload 1a

439def validate_not_negative(v: None) -> None: ... 439 ↛ exitline 439 didn't return from function 'validate_not_negative' because 1a

440 

441 

442def validate_not_negative(v: Optional[float]) -> Optional[float]: 1a

443 if v is not None and v < 0: 

444 raise ValueError("`retry_jitter_factor` must be >= 0.") 

445 return v 

446 

447 

448def validate_default_queue_id_not_none(v: Optional[UUID]) -> UUID: 1a

449 if v is None: 

450 raise ValueError( 

451 "`default_queue_id` is a required field. If you are " 

452 "creating a new WorkPool and don't have a queue " 

453 "ID yet, use the `actions.WorkPoolCreate` model instead." 

454 ) 

455 return v 

456 

457 

458@overload 1a

459def validate_max_metadata_length(v: MM) -> MM: ... 459 ↛ exitline 459 didn't return from function 'validate_max_metadata_length' because 1a

460 

461 

462@overload 1a

463def validate_max_metadata_length(v: None) -> None: ... 463 ↛ exitline 463 didn't return from function 'validate_max_metadata_length' because 1a

464 

465 

466def validate_max_metadata_length(v: Optional[MM]) -> Optional[MM]: 1a

467 max_metadata_length = 500 

468 if v is None: 

469 return v 

470 for key in v.keys(): 

471 if len(str(v[key])) > max_metadata_length: 

472 v[key] = str(v[key])[:max_metadata_length] + "..." 

473 return v 

474 

475 

476### TASK RUN SCHEMA VALIDATORS ### 

477 

478 

479@overload 1a

480def validate_cache_key_length(cache_key: str) -> str: ... 480 ↛ exitline 480 didn't return from function 'validate_cache_key_length' because 1a

481 

482 

483@overload 1a

484def validate_cache_key_length(cache_key: None) -> None: ... 484 ↛ exitline 484 didn't return from function 'validate_cache_key_length' because 1a

485 

486 

487def validate_cache_key_length(cache_key: Optional[str]) -> Optional[str]: 1a

488 from prefect.settings import ( 

489 PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH, 

490 ) 

491 

492 if cache_key and len(cache_key) > PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH.value(): 

493 raise ValueError( 

494 "Cache key exceeded maximum allowed length of" 

495 f" {PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH.value()} characters." 

496 ) 

497 return cache_key 

498 

499 

500def set_run_policy_deprecated_fields(values: MM) -> MM: 1a

501 """ 

502 If deprecated fields are provided, populate the corresponding new fields 

503 to preserve orchestration behavior. 

504 """ 

505 if not values.get("retries", None) and values.get("max_retries", 0) != 0: 

506 values["retries"] = values["max_retries"] 

507 

508 if ( 

509 not values.get("retry_delay", None) 

510 and values.get("retry_delay_seconds", 0) != 0 

511 ): 

512 values["retry_delay"] = values["retry_delay_seconds"] 

513 

514 return values 

515 

516 

517### PYTHON ENVIRONMENT SCHEMA VALIDATORS ### 

518 

519 

520@overload 1a

521def return_v_or_none(v: str) -> str: ... 521 ↛ exitline 521 didn't return from function 'return_v_or_none' because 1a

522 

523 

524@overload 1a

525def return_v_or_none(v: None) -> None: ... 525 ↛ exitline 525 didn't return from function 'return_v_or_none' because 1a

526 

527 

528def return_v_or_none(v: Optional[str]) -> Optional[str]: 1a

529 """Make sure that empty strings are treated as None""" 

530 if not v: 

531 return None 

532 return v 

533 

534 

535### BLOCK SCHEMA VALIDATORS ### 

536 

537 

538def validate_parent_and_ref_diff(values: M) -> M: 1a

539 parent_id = values.get("parent_block_document_id") 

540 ref_id = values.get("reference_block_document_id") 

541 if parent_id and ref_id and parent_id == ref_id: 

542 raise ValueError( 

543 "`parent_block_document_id` and `reference_block_document_id` cannot be" 

544 " the same" 

545 ) 

546 return values 

547 

548 

549def validate_name_present_on_nonanonymous_blocks(values: M) -> M: 1a

550 # anonymous blocks may have no name prior to actually being 

551 # stored in the database 

552 if not values.get("is_anonymous") and not values.get("name"): 

553 raise ValueError("Names must be provided for block documents.") 

554 return values 

555 

556 

557### PROCESS JOB CONFIGURATION VALIDATORS ### 

558 

559 

560@overload 1a

561def validate_working_dir(v: str) -> Path: ... 561 ↛ exitline 561 didn't return from function 'validate_working_dir' because 1a

562 

563 

564@overload 1a

565def validate_working_dir(v: None) -> None: ... 565 ↛ exitline 565 didn't return from function 'validate_working_dir' because 1a

566 

567 

568def validate_working_dir(v: Optional[Path | str]) -> Optional[Path]: 1a

569 """Make sure that the working directory is formatted for the current platform.""" 

570 if isinstance(v, str): 

571 return relative_path_to_current_platform(v) 

572 return v