Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/schemas/validators.py: 60%

255 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2This module contains a collection of functions that are used to validate the 

3values of fields in Pydantic models. These functions are used as validators in 

4Pydantic models to ensure that the values of fields conform to the expected 

5format. 

6This will be subject to consolidation and refactoring over the next few months. 

7""" 

8 

9from __future__ import annotations 1a

10 

11import datetime 1a

12import os 1a

13import urllib.parse 1a

14import warnings 1a

15from collections.abc import Iterable, Mapping, MutableMapping 1a

16from copy import copy 1a

17from pathlib import Path 1a

18from typing import TYPE_CHECKING, Any, Optional, TypeVar, Union, overload 1a

19from uuid import UUID 1a

20from zoneinfo import ZoneInfo 1a

21 

22import jsonschema 1a

23 

24from prefect.types._datetime import DateTime, create_datetime_instance, get_timezones 1a

25from prefect.utilities.collections import isiterable 1a

26from prefect.utilities.filesystem import relative_path_to_current_platform 1a

27from prefect.utilities.importtools import from_qualified_name 1a

28from prefect.utilities.names import generate_slug 1a

29 

30if TYPE_CHECKING: 30 ↛ 31line 30 didn't jump to line 31 because the condition on line 30 was never true1a

31 from prefect.serializers import Serializer 

32 

33T = TypeVar("T") 1a

34M = TypeVar("M", bound=Mapping[str, Any]) 1a

35MM = TypeVar("MM", bound=MutableMapping[str, Any]) 1a

36 

37 

38def validate_values_conform_to_schema( 1a

39 values: Optional[Mapping[str, Any]], 

40 schema: Optional[Mapping[str, Any]], 

41 ignore_required: bool = False, 

42) -> None: 

43 """ 

44 Validate that the provided values conform to the provided json schema. 

45 

46 TODO: This schema validation is outdated. The latest version is 

47 prefect.utilities.schema_tools.validate, which handles fixes to Pydantic v1 

48 schemas for null values and tuples. 

49 

50 Args: 

51 values: The values to validate. 

52 schema: The schema to validate against. 

53 ignore_required: Whether to ignore the required fields in the schema. Should be 

54 used when a partial set of values is acceptable. 

55 

56 Raises: 

57 ValueError: If the parameters do not conform to the schema. 

58 

59 """ 

60 from prefect.utilities.collections import remove_nested_keys 1cdb

61 

62 if ignore_required: 62 ↛ 65line 62 didn't jump to line 65 because the condition on line 62 was always true1cdb

63 schema = remove_nested_keys(["required"], schema) 1cdb

64 

65 try: 1cdb

66 if schema is not None and values is not None: 1cdb

67 jsonschema.validate(values, schema) 1cb

68 except jsonschema.ValidationError as exc: 

69 if exc.json_path == "$": 

70 error_message = "Validation failed." 

71 else: 

72 error_message = ( 

73 f"Validation failed for field {exc.json_path.replace('$.', '')!r}." 

74 ) 

75 error_message += f" Failure reason: {exc.message}" 

76 raise ValueError(error_message) from exc 

77 except jsonschema.SchemaError as exc: 

78 raise ValueError( 

79 "The provided schema is not a valid json schema. Schema error:" 

80 f" {exc.message}" 

81 ) from exc 

82 

83 

84### DEPLOYMENT SCHEMA VALIDATORS ### 

85 

86 

87def validate_parameters_conform_to_schema( 1a

88 parameters: M, values: Mapping[str, Any] 

89) -> M: 

90 """Validate that the parameters conform to the parameter schema.""" 

91 if values.get("enforce_parameter_schema"): 1cdb

92 validate_values_conform_to_schema( 1cdb

93 parameters, values.get("parameter_openapi_schema"), ignore_required=True 

94 ) 

95 return parameters 1cdb

96 

97 

98@overload 1a

99def validate_parameter_openapi_schema(schema: M, values: Mapping[str, Any]) -> M: ... 99 ↛ exitline 99 didn't return from function 'validate_parameter_openapi_schema' because 1a

100 

101 

102@overload 1a

103def validate_parameter_openapi_schema( 103 ↛ exitline 103 didn't return from function 'validate_parameter_openapi_schema' because 1a

104 schema: None, values: Mapping[str, Any] 

105) -> None: ... 

106 

107 

108def validate_parameter_openapi_schema( 1a

109 schema: Optional[M], values: Mapping[str, Any] 

110) -> Optional[M]: 

111 """Validate that the parameter_openapi_schema is a valid json schema.""" 

112 if values.get("enforce_parameter_schema"): 1cdb

113 try: 1cdb

114 if schema is not None: 1cdb

115 # Most closely matches the schemas generated by pydantic 

116 jsonschema.Draft202012Validator.check_schema(schema) 1cb

117 except jsonschema.SchemaError as exc: 

118 raise ValueError( 

119 "The provided schema is not a valid json schema. Schema error:" 

120 f" {exc.message}" 

121 ) from exc 

122 

123 return schema 1cdb

124 

125 

126def convert_to_strings(value: Union[Any, Iterable[Any]]) -> Union[str, list[str]]: 1a

127 if isiterable(value): 

128 return [str(item) for item in value] 

129 return str(value) 

130 

131 

132### SCHEDULE SCHEMA VALIDATORS ### 

133 

134 

135def reconcile_schedules_runner(values: MM) -> MM: 1a

136 from prefect.deployments.schedules import ( 

137 normalize_to_deployment_schedule, 

138 ) 

139 

140 schedules = values.get("schedules") 

141 if schedules is not None and len(schedules) > 0: 

142 values["schedules"] = normalize_to_deployment_schedule(schedules) 

143 

144 return values 

145 

146 

147@overload 1a

148def validate_schedule_max_scheduled_runs(v: int, limit: int) -> int: ... 148 ↛ exitline 148 didn't return from function 'validate_schedule_max_scheduled_runs' because 1a

149 

150 

151@overload 1a

152def validate_schedule_max_scheduled_runs(v: None, limit: int) -> None: ... 152 ↛ exitline 152 didn't return from function 'validate_schedule_max_scheduled_runs' because 1a

153 

154 

155def validate_schedule_max_scheduled_runs(v: Optional[int], limit: int) -> Optional[int]: 1a

156 if v is not None and v > limit: 1cdeb

157 raise ValueError(f"`max_scheduled_runs` must be less than or equal to {limit}.") 1cdeb

158 return v 1cdeb

159 

160 

161def remove_old_deployment_fields(values: MM) -> MM: 1a

162 # 2.7.7 removed worker_pool_queue_id in lieu of worker_pool_name and 

163 # worker_pool_queue_name. Those fields were later renamed to work_pool_name 

164 # and work_queue_name. This validator removes old fields provided 

165 # by older clients to avoid 422 errors. 

166 values_copy = copy(values) 1cdb

167 worker_pool_queue_id = values_copy.pop("worker_pool_queue_id", None) 1cdb

168 worker_pool_name = values_copy.pop("worker_pool_name", None) 1cdb

169 worker_pool_queue_name = values_copy.pop("worker_pool_queue_name", None) 1cdb

170 work_pool_queue_name = values_copy.pop("work_pool_queue_name", None) 1cdb

171 if worker_pool_queue_id: 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true1cdb

172 warnings.warn( 

173 ( 

174 "`worker_pool_queue_id` is no longer supported for creating or updating " 

175 "deployments. Please use `work_pool_name` and " 

176 "`work_queue_name` instead." 

177 ), 

178 UserWarning, 

179 ) 

180 if worker_pool_name or worker_pool_queue_name or work_pool_queue_name: 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true1cdb

181 warnings.warn( 

182 ( 

183 "`worker_pool_name`, `worker_pool_queue_name`, and " 

184 "`work_pool_name` are" 

185 "no longer supported for creating or updating " 

186 "deployments. Please use `work_pool_name` and " 

187 "`work_queue_name` instead." 

188 ), 

189 UserWarning, 

190 ) 

191 return values_copy 1cdb

192 

193 

194def reconcile_paused_deployment(values: MM) -> MM: 1a

195 paused = values.get("paused") 

196 

197 if paused is None: 

198 values["paused"] = False 

199 

200 return values 

201 

202 

203def default_anchor_date(v: DateTime) -> DateTime: 1a

204 return create_datetime_instance(v) 1cdeb

205 

206 

207@overload 1a

208def default_timezone(v: str, values: Optional[Mapping[str, Any]] = ...) -> str: ... 208 ↛ exitline 208 didn't return from function 'default_timezone' because 1a

209 

210 

211@overload 1a

212def default_timezone( 212 ↛ exitline 212 didn't return from function 'default_timezone' because 1a

213 v: None, values: Optional[Mapping[str, Any]] = ... 

214) -> Optional[str]: ... 

215 

216 

217def default_timezone( 1a

218 v: Optional[str], values: Optional[Mapping[str, Any]] = None 

219) -> Optional[str]: 

220 values = values or {} 1cb

221 timezones = get_timezones() 1cb

222 

223 if v is not None: 1cb

224 if v and v not in timezones: 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 raise ValueError( 

226 f'Invalid timezone: "{v}" (specify in IANA tzdata format, for example,' 

227 " America/New_York)" 

228 ) 

229 return v 

230 

231 # anchor schedules 

232 elif "anchor_date" in values: 232 ↛ 246line 232 didn't jump to line 246 because the condition on line 232 was always true1cb

233 anchor_date: datetime.datetime = values["anchor_date"] 1cb

234 if isinstance(anchor_date.tzinfo, ZoneInfo): 234 ↛ 236line 234 didn't jump to line 236 because the condition on line 234 was always true1cb

235 tz = anchor_date.tzinfo.key 1cb

236 elif hasattr(anchor_date.tzinfo, "name"): 

237 tz = anchor_date.tzinfo.name 

238 else: 

239 tz = "UTC" 

240 # sometimes anchor dates have "timezones" that are UTC offsets 

241 # like "-04:00". This happens when parsing ISO8601 strings. 

242 # In this case we, the correct inferred localization is "UTC". 

243 return tz if tz in timezones else "UTC" 1cb

244 

245 # cron schedules 

246 return v 

247 

248 

249def validate_cron_string(v: str) -> str: 1a

250 from prefect._vendor.croniter import croniter 1cdb

251 

252 # croniter allows "random" and "hashed" expressions 

253 # which we do not support https://github.com/kiorky/croniter 

254 if not croniter.is_valid(v): 1cdb

255 raise ValueError(f'Invalid cron string: "{v}"') 1cdb

256 elif any(c for c in v.split() if c.casefold() in ["R", "H", "r", "h"]): 256 ↛ 257line 256 didn't jump to line 257 because the condition on line 256 was never true

257 raise ValueError( 

258 f'Random and Hashed expressions are unsupported, received: "{v}"' 

259 ) 

260 return v 

261 

262 

263# approx. 1 years worth of RDATEs + buffer 

264MAX_RRULE_LENGTH = 6500 1a

265 

266 

267def validate_rrule_string(v: str) -> str: 1a

268 import dateutil.rrule 1cdeb

269 

270 # attempt to parse the rrule string as an rrule object 

271 # this will error if the string is invalid 

272 try: 1cdeb

273 dateutil.rrule.rrulestr(v, cache=True) 1cdeb

274 except ValueError as exc: 1cdeb

275 # rrules errors are a mix of cryptic and informative 

276 # so reraise to be clear that the string was invalid 

277 raise ValueError(f'Invalid RRule string "{v}": {exc}') 1cdeb

278 if len(v) > MAX_RRULE_LENGTH: 

279 raise ValueError( 

280 f'Invalid RRule string "{v[:40]}..."\n' 

281 f"Max length is {MAX_RRULE_LENGTH}, got {len(v)}" 

282 ) 

283 return v 

284 

285 

286### STATE SCHEMA VALIDATORS ### 

287 

288 

289def get_or_create_run_name(name: Optional[str]) -> str: 1a

290 return name or generate_slug(2) 1cdfb

291 

292 

293### FILESYSTEM SCHEMA VALIDATORS ### 

294 

295 

296def stringify_path(value: Union[str, os.PathLike[str]]) -> str: 1a

297 return os.fspath(value) 

298 

299 

300def validate_basepath(value: str) -> str: 1a

301 scheme, netloc, _, _, _ = urllib.parse.urlsplit(value) 

302 

303 if not scheme: 

304 raise ValueError(f"Base path must start with a scheme. Got {value!r}.") 

305 

306 if not netloc: 

307 raise ValueError( 

308 f"Base path must include a location after the scheme. Got {value!r}." 

309 ) 

310 

311 if scheme == "file": 

312 raise ValueError( 

313 "Base path scheme cannot be 'file'. Use `LocalFileSystem` instead for" 

314 " local file access." 

315 ) 

316 

317 return value 

318 

319 

320### SERIALIZER SCHEMA VALIDATORS ### 

321 

322 

323def validate_picklelib(value: str) -> str: 1a

324 """ 

325 Check that the given pickle library is importable and has dumps/loads methods. 

326 """ 

327 try: 

328 pickler = from_qualified_name(value) 

329 except (ImportError, AttributeError) as exc: 

330 raise ValueError( 

331 f"Failed to import requested pickle library: {value!r}." 

332 ) from exc 

333 

334 if not callable(getattr(pickler, "dumps", None)): 

335 raise ValueError(f"Pickle library at {value!r} does not have a 'dumps' method.") 

336 

337 if not callable(getattr(pickler, "loads", None)): 

338 raise ValueError(f"Pickle library at {value!r} does not have a 'loads' method.") 

339 

340 return value 

341 

342 

343def validate_dump_kwargs(value: M) -> M: 1a

344 # `default` is set by `object_encoder`. A user provided callable would make this 

345 # class unserializable anyway. 

346 if "default" in value: 346 ↛ 347line 346 didn't jump to line 347 because the condition on line 346 was never true1agcb

347 raise ValueError("`default` cannot be provided. Use `object_encoder` instead.") 

348 return value 1agcb

349 

350 

351def validate_load_kwargs(value: M) -> M: 1a

352 # `object_hook` is set by `object_decoder`. A user provided callable would make 

353 # this class unserializable anyway. 

354 if "object_hook" in value: 

355 raise ValueError( 

356 "`object_hook` cannot be provided. Use `object_decoder` instead." 

357 ) 

358 return value 

359 

360 

361@overload 1a

362def cast_type_names_to_serializers(value: str) -> "Serializer[Any]": ... 362 ↛ exitline 362 didn't return from function 'cast_type_names_to_serializers' because 1a

363 

364 

365@overload 1a

366def cast_type_names_to_serializers(value: "Serializer[T]") -> "Serializer[T]": ... 366 ↛ exitline 366 didn't return from function 'cast_type_names_to_serializers' because 1a

367 

368 

369def cast_type_names_to_serializers( 1a

370 value: Union[str, "Serializer[Any]"], 

371) -> "Serializer[Any]": 

372 from prefect.serializers import Serializer 

373 

374 if isinstance(value, str): 

375 return Serializer(type=value) 

376 return value 

377 

378 

379def validate_compressionlib(value: str) -> str: 1a

380 """ 

381 Check that the given pickle library is importable and has compress/decompress 

382 methods. 

383 """ 

384 try: 

385 compressor = from_qualified_name(value) 

386 except (ImportError, AttributeError) as exc: 

387 raise ValueError( 

388 f"Failed to import requested compression library: {value!r}." 

389 ) from exc 

390 

391 if not callable(getattr(compressor, "compress", None)): 

392 raise ValueError( 

393 f"Compression library at {value!r} does not have a 'compress' method." 

394 ) 

395 

396 if not callable(getattr(compressor, "decompress", None)): 

397 raise ValueError( 

398 f"Compression library at {value!r} does not have a 'decompress' method." 

399 ) 

400 

401 return value 

402 

403 

404# TODO: if we use this elsewhere we can change the error message to be more generic 

405@overload 1a

406def list_length_50_or_less(v: int) -> int: ... 406 ↛ exitline 406 didn't return from function 'list_length_50_or_less' because 1a

407 

408 

409@overload 1a

410def list_length_50_or_less(v: float) -> float: ... 410 ↛ exitline 410 didn't return from function 'list_length_50_or_less' because 1a

411 

412 

413@overload 1a

414def list_length_50_or_less(v: list[int]) -> list[int]: ... 414 ↛ exitline 414 didn't return from function 'list_length_50_or_less' because 1a

415 

416 

417@overload 1a

418def list_length_50_or_less(v: list[float]) -> list[float]: ... 418 ↛ exitline 418 didn't return from function 'list_length_50_or_less' because 1a

419 

420 

421@overload 1a

422def list_length_50_or_less(v: None) -> None: ... 422 ↛ exitline 422 didn't return from function 'list_length_50_or_less' because 1a

423 

424 

425def list_length_50_or_less( 1a

426 v: Optional[int | float | list[int] | list[float]], 

427) -> Optional[int | float | list[int] | list[float]]: 

428 if isinstance(v, list) and (len(v) > 50): 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true1cdb

429 raise ValueError("Can not configure more than 50 retry delays per task.") 

430 return v 1cdb

431 

432 

433# TODO: if we use this elsewhere we can change the error message to be more generic 

434@overload 1a

435def validate_not_negative(v: float) -> float: ... 435 ↛ exitline 435 didn't return from function 'validate_not_negative' because 1a

436 

437 

438@overload 1a

439def validate_not_negative(v: None) -> None: ... 439 ↛ exitline 439 didn't return from function 'validate_not_negative' because 1a

440 

441 

442def validate_not_negative(v: Optional[float]) -> Optional[float]: 1a

443 if v is not None and v < 0: 1cdb

444 raise ValueError("`retry_jitter_factor` must be >= 0.") 1db

445 return v 1cdb

446 

447 

448def validate_default_queue_id_not_none(v: Optional[UUID]) -> UUID: 1a

449 if v is None: 449 ↛ 450line 449 didn't jump to line 450 because the condition on line 449 was never true1c

450 raise ValueError( 

451 "`default_queue_id` is a required field. If you are " 

452 "creating a new WorkPool and don't have a queue " 

453 "ID yet, use the `actions.WorkPoolCreate` model instead." 

454 ) 

455 return v 1c

456 

457 

458@overload 1a

459def validate_max_metadata_length(v: MM) -> MM: ... 459 ↛ exitline 459 didn't return from function 'validate_max_metadata_length' because 1a

460 

461 

462@overload 1a

463def validate_max_metadata_length(v: None) -> None: ... 463 ↛ exitline 463 didn't return from function 'validate_max_metadata_length' because 1a

464 

465 

466def validate_max_metadata_length(v: Optional[MM]) -> Optional[MM]: 1a

467 max_metadata_length = 500 1cdb

468 if v is None: 1cdb

469 return v 1cdb

470 for key in v.keys(): 1cdb

471 if len(str(v[key])) > max_metadata_length: 471 ↛ 472line 471 didn't jump to line 472 because the condition on line 471 was never true1cdb

472 v[key] = str(v[key])[:max_metadata_length] + "..." 

473 return v 1cdb

474 

475 

476### TASK RUN SCHEMA VALIDATORS ### 

477 

478 

479@overload 1a

480def validate_cache_key_length(cache_key: str) -> str: ... 480 ↛ exitline 480 didn't return from function 'validate_cache_key_length' because 1a

481 

482 

483@overload 1a

484def validate_cache_key_length(cache_key: None) -> None: ... 484 ↛ exitline 484 didn't return from function 'validate_cache_key_length' because 1a

485 

486 

487def validate_cache_key_length(cache_key: Optional[str]) -> Optional[str]: 1a

488 from prefect.settings import ( 1cdb

489 PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH, 

490 ) 

491 

492 if cache_key and len(cache_key) > PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH.value(): 492 ↛ 493line 492 didn't jump to line 493 because the condition on line 492 was never true1cdb

493 raise ValueError( 

494 "Cache key exceeded maximum allowed length of" 

495 f" {PREFECT_API_TASK_CACHE_KEY_MAX_LENGTH.value()} characters." 

496 ) 

497 return cache_key 1cdb

498 

499 

500def set_run_policy_deprecated_fields(values: MM) -> MM: 1a

501 """ 

502 If deprecated fields are provided, populate the corresponding new fields 

503 to preserve orchestration behavior. 

504 """ 

505 if not values.get("retries", None) and values.get("max_retries", 0) != 0: 1cdfb

506 values["retries"] = values["max_retries"] 1cb

507 

508 if ( 

509 not values.get("retry_delay", None) 

510 and values.get("retry_delay_seconds", 0) != 0 

511 ): 

512 values["retry_delay"] = values["retry_delay_seconds"] 1cb

513 

514 return values 1cdfb

515 

516 

517### PYTHON ENVIRONMENT SCHEMA VALIDATORS ### 

518 

519 

520@overload 1a

521def return_v_or_none(v: str) -> str: ... 521 ↛ exitline 521 didn't return from function 'return_v_or_none' because 1a

522 

523 

524@overload 1a

525def return_v_or_none(v: None) -> None: ... 525 ↛ exitline 525 didn't return from function 'return_v_or_none' because 1a

526 

527 

528def return_v_or_none(v: Optional[str]) -> Optional[str]: 1a

529 """Make sure that empty strings are treated as None""" 

530 if not v: 

531 return None 

532 return v 

533 

534 

535### BLOCK SCHEMA VALIDATORS ### 

536 

537 

538def validate_parent_and_ref_diff(values: M) -> M: 1a

539 parent_id = values.get("parent_block_document_id") 

540 ref_id = values.get("reference_block_document_id") 

541 if parent_id and ref_id and parent_id == ref_id: 

542 raise ValueError( 

543 "`parent_block_document_id` and `reference_block_document_id` cannot be" 

544 " the same" 

545 ) 

546 return values 

547 

548 

549def validate_name_present_on_nonanonymous_blocks(values: M) -> M: 1a

550 # anonymous blocks may have no name prior to actually being 

551 # stored in the database 

552 if not values.get("is_anonymous") and not values.get("name"): 1cdeb

553 raise ValueError("Names must be provided for block documents.") 1cdeb

554 return values 1cdb

555 

556 

557### PROCESS JOB CONFIGURATION VALIDATORS ### 

558 

559 

560@overload 1a

561def validate_working_dir(v: str) -> Path: ... 561 ↛ exitline 561 didn't return from function 'validate_working_dir' because 1a

562 

563 

564@overload 1a

565def validate_working_dir(v: None) -> None: ... 565 ↛ exitline 565 didn't return from function 'validate_working_dir' because 1a

566 

567 

568def validate_working_dir(v: Optional[Path | str]) -> Optional[Path]: 1a

569 """Make sure that the working directory is formatted for the current platform.""" 

570 if isinstance(v, str): 

571 return relative_path_to_current_platform(v) 

572 return v