Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/callables.py: 23%

293 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Utilities for working with Python callables. 

3""" 

4 

5import ast 1a

6import importlib.util 1a

7import inspect 1a

8import warnings 1a

9from collections import OrderedDict 1a

10from collections.abc import Iterable 1a

11from functools import partial 1a

12from logging import Logger 1a

13from pathlib import Path 1a

14from typing import Any, Callable, Optional, Union, cast 1a

15 

16import cloudpickle # type: ignore # no stubs available 1a

17import pydantic 1a

18from griffe import Docstring, DocstringSectionKind, Parser, parse 1a

19from typing_extensions import Literal, TypeVar 1a

20 

21from prefect._internal.pydantic.v1_schema import has_v1_type_as_param 1a

22from prefect._internal.pydantic.v2_schema import ( 1a

23 create_v2_schema, 

24 process_v2_params, 

25) 

26from prefect.exceptions import ( 1a

27 MappingLengthMismatch, 

28 MappingMissingIterable, 

29 ParameterBindError, 

30 ReservedArgumentError, 

31 SignatureMismatchError, 

32) 

33from prefect.logging.loggers import disable_logger, get_logger 1a

34from prefect.utilities.annotations import allow_failure, quote, unmapped 1a

35from prefect.utilities.collections import isiterable 1a

36from prefect.utilities.importtools import safe_load_namespace 1a

37 

38logger: Logger = get_logger(__name__) 1a

39 

40R = TypeVar("R", infer_variance=True) 1a

41 

42 

43def get_call_parameters( 1a

44 fn: Callable[..., Any], 

45 call_args: tuple[Any, ...], 

46 call_kwargs: dict[str, Any], 

47 apply_defaults: bool = True, 

48) -> dict[str, Any]: 

49 """ 

50 Bind a call to a function to get parameter/value mapping. Default values on 

51 the signature will be included if not overridden. 

52 

53 If the function has a `__prefect_self__` attribute, it will be included as 

54 the first parameter. This attribute is set when Prefect decorates a bound 

55 method, so this approach allows Prefect to work with bound methods in a way 

56 that is consistent with how Python handles them (i.e. users don't have to 

57 pass the instance argument to the method) while still making the implicit self 

58 argument visible to all of Prefect's parameter machinery (such as cache key 

59 functions). 

60 

61 Raises a ParameterBindError if the arguments/kwargs are not valid for the 

62 function 

63 """ 

64 if hasattr(fn, "__prefect_self__"): 

65 call_args = (getattr(fn, "__prefect_self__"), *call_args) 

66 elif hasattr(fn, "__prefect_cls__"): 

67 call_args = (getattr(fn, "__prefect_cls__"), *call_args) 

68 

69 try: 

70 bound_signature = inspect.signature(fn).bind(*call_args, **call_kwargs) 

71 except TypeError as exc: 

72 raise ParameterBindError.from_bind_failure(fn, exc, call_args, call_kwargs) 

73 

74 if apply_defaults: 

75 bound_signature.apply_defaults() 

76 

77 # We cast from `OrderedDict` to `dict` because Dask will not convert futures in an 

78 # ordered dictionary to values during execution; this is the default behavior in 

79 # Python 3.9 anyway. 

80 return dict(bound_signature.arguments) 

81 

82 

83def get_parameter_defaults( 1a

84 fn: Callable[..., Any], 

85) -> dict[str, Any]: 

86 """ 

87 Get default parameter values for a callable. 

88 """ 

89 signature = inspect.signature(fn) 

90 

91 parameter_defaults: dict[str, Any] = {} 

92 

93 for name, param in signature.parameters.items(): 

94 if param.default is not signature.empty: 

95 parameter_defaults[name] = param.default 

96 

97 return parameter_defaults 

98 

99 

100def explode_variadic_parameter( 1a

101 fn: Callable[..., Any], parameters: dict[str, Any] 

102) -> dict[str, Any]: 

103 """ 

104 Given a parameter dictionary, move any parameters stored in a variadic keyword 

105 argument parameter (i.e. **kwargs) into the top level. 

106 

107 Example: 

108 

109 ```python 

110 def foo(a, b, **kwargs): 

111 pass 

112 

113 parameters = {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}} 

114 explode_variadic_parameter(foo, parameters) 

115 # {"a": 1, "b": 2, "c": 3, "d": 4} 

116 ``` 

117 """ 

118 variadic_key = None 

119 for key, parameter in inspect.signature(fn).parameters.items(): 

120 if parameter.kind == parameter.VAR_KEYWORD: 

121 variadic_key = key 

122 break 

123 

124 if not variadic_key: 

125 return parameters 

126 

127 new_parameters = parameters.copy() 

128 for key, value in new_parameters.pop(variadic_key, {}).items(): 

129 new_parameters[key] = value 

130 

131 return new_parameters 

132 

133 

134def collapse_variadic_parameters( 1a

135 fn: Callable[..., Any], parameters: dict[str, Any] 

136) -> dict[str, Any]: 

137 """ 

138 Given a parameter dictionary, move any parameters stored not present in the 

139 signature into the variadic keyword argument. 

140 

141 Example: 

142 

143 ```python 

144 def foo(a, b, **kwargs): 

145 pass 

146 

147 parameters = {"a": 1, "b": 2, "c": 3, "d": 4} 

148 collapse_variadic_parameters(foo, parameters) 

149 # {"a": 1, "b": 2, "kwargs": {"c": 3, "d": 4}} 

150 ``` 

151 """ 

152 signature_parameters = inspect.signature(fn).parameters 

153 variadic_key = None 

154 for key, parameter in signature_parameters.items(): 

155 if parameter.kind == parameter.VAR_KEYWORD: 

156 variadic_key = key 

157 break 

158 

159 missing_parameters = set(parameters.keys()) - set(signature_parameters.keys()) 

160 

161 if not missing_parameters: 

162 # no missing parameters, return parameters unchanged 

163 return parameters 

164 

165 if not variadic_key: 

166 raise ValueError( 

167 f"Signature for {fn} does not include any variadic keyword argument " 

168 "but parameters were given that are not present in the signature." 

169 ) 

170 

171 new_parameters = parameters.copy() 

172 new_parameters[variadic_key] = { 

173 key: new_parameters.pop(key) for key in missing_parameters 

174 } 

175 return new_parameters 

176 

177 

178def parameters_to_args_kwargs( 1a

179 fn: Callable[..., Any], 

180 parameters: dict[str, Any], 

181) -> tuple[tuple[Any, ...], dict[str, Any]]: 

182 """ 

183 Convert a `parameters` dictionary to positional and keyword arguments 

184 

185 The function _must_ have an identical signature to the original function or this 

186 will return an empty tuple and dict. 

187 """ 

188 function_params = inspect.signature(fn).parameters.keys() 

189 # Check for parameters that are not present in the function signature 

190 unknown_params = parameters.keys() - function_params 

191 if unknown_params: 

192 raise SignatureMismatchError.from_bad_params( 

193 list(function_params), list(parameters) 

194 ) 

195 bound_signature = inspect.signature(fn).bind_partial() 

196 bound_signature.arguments = OrderedDict(parameters) 

197 

198 return bound_signature.args, bound_signature.kwargs 

199 

200 

201def call_with_parameters(fn: Callable[..., R], parameters: dict[str, Any]) -> R: 1a

202 """ 

203 Call a function with parameters extracted with `get_call_parameters` 

204 

205 The function _must_ have an identical signature to the original function or this 

206 will fail. If you need to send to a function with a different signature, extract 

207 the args/kwargs using `parameters_to_positional_and_keyword` directly 

208 """ 

209 args, kwargs = parameters_to_args_kwargs(fn, parameters) 

210 return fn(*args, **kwargs) 

211 

212 

213def cloudpickle_wrapped_call( 1a

214 __fn: Callable[..., Any], *args: Any, **kwargs: Any 

215) -> Callable[[], bytes]: 

216 """ 

217 Serializes a function call using cloudpickle then returns a callable which will 

218 execute that call and return a cloudpickle serialized return value 

219 

220 This is particularly useful for sending calls to libraries that only use the Python 

221 built-in pickler (e.g. `anyio.to_process` and `multiprocessing`) but may require 

222 a wider range of pickling support. 

223 """ 

224 payload = cloudpickle.dumps((__fn, args, kwargs)) # type: ignore # no stubs available 

225 return partial(_run_serialized_call, payload) 

226 

227 

228def _run_serialized_call(payload: bytes) -> bytes: 1a

229 """ 

230 Defined at the top-level so it can be pickled by the Python pickler. 

231 Used by `cloudpickle_wrapped_call`. 

232 """ 

233 fn, args, kwargs = cloudpickle.loads(payload) 

234 retval = fn(*args, **kwargs) 

235 return cloudpickle.dumps(retval) # type: ignore # no stubs available 

236 

237 

238class ParameterSchema(pydantic.BaseModel): 1a

239 """Simple data model corresponding to an OpenAPI `Schema`.""" 

240 

241 title: Literal["Parameters"] = "Parameters" 1a

242 type: Literal["object"] = "object" 1a

243 properties: dict[str, Any] = pydantic.Field(default_factory=dict) 1a

244 required: list[str] = pydantic.Field(default_factory=list) 1a

245 definitions: dict[str, Any] = pydantic.Field(default_factory=dict) 1a

246 

247 def model_dump_for_openapi(self) -> dict[str, Any]: 1a

248 result = self.model_dump(mode="python", exclude_none=True) 

249 if "required" in result and not result["required"]: 

250 del result["required"] 

251 return result 

252 

253 

254def parameter_docstrings(docstring: Optional[str]) -> dict[str, str]: 1a

255 """ 

256 Given a docstring in Google docstring format, parse the parameter section 

257 and return a dictionary that maps parameter names to docstring. 

258 

259 Args: 

260 docstring: The function's docstring. 

261 

262 Returns: 

263 Mapping from parameter names to docstrings. 

264 """ 

265 param_docstrings: dict[str, str] = {} 1a

266 

267 if not docstring: 267 ↛ 268line 267 didn't jump to line 268 because the condition on line 267 was never true1a

268 return param_docstrings 

269 

270 with disable_logger("griffe"): 1a

271 parsed = parse(Docstring(docstring), Parser.google) 1a

272 for section in parsed: 1a

273 if section.kind != DocstringSectionKind.parameters: 1a

274 continue 1a

275 param_docstrings = { 1a

276 parameter.name: parameter.description for parameter in section.value 

277 } 

278 

279 return param_docstrings 1a

280 

281 

282def process_v1_params( 1a

283 param: inspect.Parameter, 

284 *, 

285 position: int, 

286 docstrings: dict[str, str], 

287 aliases: dict[str, str], 

288) -> tuple[str, Any, Any]: 

289 import pydantic.v1 as pydantic_v1 

290 

291 # Pydantic model creation will fail if names collide with the BaseModel type 

292 if hasattr(pydantic_v1.BaseModel, param.name): 

293 name = param.name + "__" 

294 aliases[name] = param.name 

295 else: 

296 name = param.name 

297 

298 type_ = Any if param.annotation is inspect.Parameter.empty else param.annotation 

299 

300 with warnings.catch_warnings(): 

301 # Note: pydantic.v1 doesn't have the warnings module, so we can't suppress them 

302 warnings.filterwarnings("ignore", category=DeprecationWarning) 

303 field: Any = pydantic_v1.Field( 

304 default=... if param.default is param.empty else param.default, 

305 title=param.name, 

306 description=docstrings.get(param.name, None), 

307 alias=aliases.get(name), 

308 position=position, 

309 ) 

310 return name, type_, field 

311 

312 

313def create_v1_schema( 1a

314 name_: str, model_cfg: type[Any], model_fields: Optional[dict[str, Any]] = None 

315) -> dict[str, Any]: 

316 import pydantic.v1 as pydantic_v1 

317 

318 with warnings.catch_warnings(): 

319 # Note: pydantic.v1 doesn't have the warnings module, so we can't suppress them 

320 warnings.filterwarnings("ignore", category=DeprecationWarning) 

321 

322 model_fields = model_fields or {} 

323 model: type[pydantic_v1.BaseModel] = pydantic_v1.create_model( 

324 name_, 

325 __config__=model_cfg, 

326 **model_fields, 

327 ) 

328 return model.schema(by_alias=True) 

329 

330 

331def parameter_schema(fn: Callable[..., Any]) -> ParameterSchema: 1a

332 """Given a function, generates an OpenAPI-compatible description 

333 of the function's arguments, including: 

334 - name 

335 - typing information 

336 - whether it is required 

337 - a default value 

338 - additional constraints (like possible enum values) 

339 

340 Args: 

341 fn (Callable): The function whose arguments will be serialized 

342 

343 Returns: 

344 ParameterSchema: the argument schema 

345 """ 

346 try: 1a

347 signature = inspect.signature(fn, eval_str=True) # novm 1a

348 except (NameError, TypeError): 

349 # `eval_str` is not available in Python < 3.10 

350 signature = inspect.signature(fn) 

351 

352 docstrings = parameter_docstrings(inspect.getdoc(fn)) 1a

353 

354 return generate_parameter_schema(signature, docstrings) 1a

355 

356 

357def parameter_schema_from_entrypoint(entrypoint: str) -> ParameterSchema: 1a

358 """ 

359 Generate a parameter schema from an entrypoint string. 

360 

361 Will load the source code of the function and extract the signature and docstring 

362 to generate the schema. 

363 

364 Useful for generating a schema for a function when instantiating the function may 

365 not be possible due to missing imports or other issues. 

366 

367 Args: 

368 entrypoint: A string representing the entrypoint to a function. The string 

369 should be in the format of `module.path.to.function:do_stuff`. 

370 

371 Returns: 

372 ParameterSchema: The parameter schema for the function. 

373 """ 

374 filepath = None 

375 if ":" in entrypoint: 

376 # split by the last colon once to handle Windows paths with drive letters i.e C:\path\to\file.py:do_stuff 

377 path, func_name = entrypoint.rsplit(":", maxsplit=1) 

378 source_code = Path(path).read_text() 

379 filepath = path 

380 else: 

381 path, func_name = entrypoint.rsplit(".", maxsplit=1) 

382 spec = importlib.util.find_spec(path) 

383 if not spec or not spec.origin: 

384 raise ValueError(f"Could not find module {path!r}") 

385 source_code = Path(spec.origin).read_text() 

386 signature = _generate_signature_from_source(source_code, func_name, filepath) 

387 docstring = _get_docstring_from_source(source_code, func_name) 

388 return generate_parameter_schema(signature, parameter_docstrings(docstring)) 

389 

390 

391def generate_parameter_schema( 1a

392 signature: inspect.Signature, docstrings: dict[str, str] 

393) -> ParameterSchema: 

394 """ 

395 Generate a parameter schema from a function signature and docstrings. 

396 

397 To get a signature from a function, use `inspect.signature(fn)` or 

398 `_generate_signature_from_source(source_code, func_name)`. 

399 

400 Args: 

401 signature: The function signature. 

402 docstrings: A dictionary mapping parameter names to docstrings. 

403 

404 Returns: 

405 ParameterSchema: The parameter schema. 

406 """ 

407 

408 model_fields: dict[str, Any] = {} 1a

409 aliases: dict[str, str] = {} 1a

410 

411 if not has_v1_type_as_param(signature): 411 ↛ 419line 411 didn't jump to line 419 because the condition on line 411 was always true1a

412 config = pydantic.ConfigDict(arbitrary_types_allowed=True) 1a

413 

414 create_schema = partial(create_v2_schema, model_cfg=config) 1a

415 process_params = process_v2_params 1a

416 

417 else: 

418 

419 class ModelConfig: 

420 arbitrary_types_allowed = True 

421 

422 create_schema = partial(create_v1_schema, model_cfg=ModelConfig) 

423 process_params = process_v1_params 

424 

425 for position, param in enumerate(signature.parameters.values()): 1a

426 name, type_, field = process_params( 1a

427 param, position=position, docstrings=docstrings, aliases=aliases 

428 ) 

429 

430 if name in ("cls", "self"): 430 ↛ 431line 430 didn't jump to line 431 because the condition on line 430 was never true1a

431 continue # Exclude 'cls'/'self' as they're implicitly passed and not real flow parameters 

432 

433 # Generate a Pydantic model at each step so we can check if this parameter 

434 # type supports schema generation 

435 try: 1a

436 create_schema("CheckParameter", model_fields={name: (type_, field)}) 1a

437 except (ValueError, TypeError): 

438 # This field's type is not valid for schema creation, update it to `Any` 

439 type_ = Any 

440 model_fields[name] = (type_, field) 1a

441 

442 # Generate the final model and schema 

443 schema = create_schema("Parameters", model_fields=model_fields) 1a

444 return ParameterSchema(**schema) 1a

445 

446 

447def raise_for_reserved_arguments( 1a

448 fn: Callable[..., Any], reserved_arguments: Iterable[str] 

449) -> None: 

450 """Raise a ReservedArgumentError if `fn` has any parameters that conflict 

451 with the names contained in `reserved_arguments`.""" 

452 function_parameters = inspect.signature(fn).parameters 1ab

453 

454 for argument in reserved_arguments: 1ab

455 if argument in function_parameters: 455 ↛ 456line 455 didn't jump to line 456 because the condition on line 455 was never true1ab

456 raise ReservedArgumentError( 

457 f"{argument!r} is a reserved argument name and cannot be used." 

458 ) 

459 

460 

461def _generate_signature_from_source( 1a

462 source_code: str, func_name: str, filepath: Optional[str] = None 

463) -> inspect.Signature: 

464 """ 

465 Extract the signature of a function from its source code. 

466 

467 Will ignore missing imports and exceptions while loading local class definitions. 

468 

469 Args: 

470 source_code: The source code where the function named `func_name` is declared. 

471 func_name: The name of the function. 

472 

473 Returns: 

474 The signature of the function. 

475 """ 

476 # Load the namespace from the source code. Missing imports and exceptions while 

477 # loading local class definitions are ignored. 

478 namespace = safe_load_namespace(source_code, filepath=filepath) 

479 # Parse the source code into an AST 

480 parsed_code = ast.parse(source_code) 

481 

482 func_def = next( 

483 ( 

484 node 

485 for node in ast.walk(parsed_code) 

486 if isinstance( 

487 node, 

488 ( 

489 ast.FunctionDef, 

490 ast.AsyncFunctionDef, 

491 ), 

492 ) 

493 and node.name == func_name 

494 ), 

495 None, 

496 ) 

497 if func_def is None: 

498 raise ValueError(f"Function {func_name} not found in source code") 

499 parameters: list[inspect.Parameter] = [] 

500 

501 # Handle annotations for positional only args e.g. def func(a, /, b, c) 

502 for arg in func_def.args.posonlyargs: 

503 name = arg.arg 

504 annotation = arg.annotation 

505 if annotation is not None: 

506 try: 

507 ann_code = compile(ast.Expression(annotation), "<string>", "eval") 

508 annotation = eval(ann_code, namespace) 

509 except Exception as e: 

510 logger.debug("Failed to evaluate annotation for %s: %s", name, e) 

511 annotation = inspect.Parameter.empty 

512 else: 

513 annotation = inspect.Parameter.empty 

514 

515 param = inspect.Parameter( 

516 name, inspect.Parameter.POSITIONAL_ONLY, annotation=annotation 

517 ) 

518 parameters.append(param) 

519 

520 # Determine the annotations for args e.g. def func(a: int, b: str, c: float) 

521 for arg in func_def.args.args: 

522 name = arg.arg 

523 annotation = arg.annotation 

524 if annotation is not None: 

525 try: 

526 # Compile and evaluate the annotation 

527 ann_code = compile(ast.Expression(annotation), "<string>", "eval") 

528 annotation = eval(ann_code, namespace) 

529 except Exception as e: 

530 # Don't raise an error if the annotation evaluation fails. Set the 

531 # annotation to `inspect.Parameter.empty` instead which is equivalent to 

532 # not having an annotation. 

533 logger.debug("Failed to evaluate annotation for %s: %s", name, e) 

534 annotation = inspect.Parameter.empty 

535 else: 

536 annotation = inspect.Parameter.empty 

537 

538 param = inspect.Parameter( 

539 name, inspect.Parameter.POSITIONAL_OR_KEYWORD, annotation=annotation 

540 ) 

541 parameters.append(param) 

542 

543 # Handle default values for args e.g. def func(a=1, b="hello", c=3.14) 

544 defaults = [None] * ( 

545 len(func_def.args.args) - len(func_def.args.defaults) 

546 ) + func_def.args.defaults 

547 for param, default in zip(parameters, defaults): 

548 if default is not None: 

549 try: 

550 def_code = compile(ast.Expression(default), "<string>", "eval") 

551 default = eval(def_code, namespace) 

552 except Exception as e: 

553 logger.debug( 

554 "Failed to evaluate default value for %s: %s", param.name, e 

555 ) 

556 default = None # Set to None if evaluation fails 

557 parameters[parameters.index(param)] = param.replace(default=default) 

558 

559 # Handle annotations for keyword only args e.g. def func(*, a: int, b: str) 

560 for kwarg in func_def.args.kwonlyargs: 

561 name = kwarg.arg 

562 annotation = kwarg.annotation 

563 if annotation is not None: 

564 try: 

565 ann_code = compile(ast.Expression(annotation), "<string>", "eval") 

566 annotation = eval(ann_code, namespace) 

567 except Exception as e: 

568 logger.debug("Failed to evaluate annotation for %s: %s", name, e) 

569 annotation = inspect.Parameter.empty 

570 else: 

571 annotation = inspect.Parameter.empty 

572 

573 param = inspect.Parameter( 

574 name, inspect.Parameter.KEYWORD_ONLY, annotation=annotation 

575 ) 

576 parameters.append(param) 

577 

578 # Handle default values for keyword only args e.g. def func(*, a=1, b="hello") 

579 defaults = [None] * ( 

580 len(func_def.args.kwonlyargs) - len(func_def.args.kw_defaults) 

581 ) + func_def.args.kw_defaults 

582 for param, default in zip(parameters[-len(func_def.args.kwonlyargs) :], defaults): 

583 if default is not None: 

584 try: 

585 def_code = compile(ast.Expression(default), "<string>", "eval") 

586 default = eval(def_code, namespace) 

587 except Exception as e: 

588 logger.debug( 

589 "Failed to evaluate default value for %s: %s", param.name, e 

590 ) 

591 default = None 

592 parameters[parameters.index(param)] = param.replace(default=default) 

593 

594 # Handle annotations for varargs and kwargs e.g. def func(*args: int, **kwargs: str) 

595 if func_def.args.vararg: 

596 parameters.append( 

597 inspect.Parameter( 

598 func_def.args.vararg.arg, inspect.Parameter.VAR_POSITIONAL 

599 ) 

600 ) 

601 if func_def.args.kwarg: 

602 parameters.append( 

603 inspect.Parameter(func_def.args.kwarg.arg, inspect.Parameter.VAR_KEYWORD) 

604 ) 

605 

606 # Handle return annotation e.g. def func() -> int 

607 return_annotation = func_def.returns 

608 if return_annotation is not None: 

609 try: 

610 ret_ann_code = compile( 

611 ast.Expression(return_annotation), "<string>", "eval" 

612 ) 

613 return_annotation = eval(ret_ann_code, namespace) 

614 except Exception as e: 

615 logger.debug("Failed to evaluate return annotation: %s", e) 

616 return_annotation = inspect.Signature.empty 

617 

618 return inspect.Signature(parameters, return_annotation=return_annotation) 

619 

620 

621def _get_docstring_from_source(source_code: str, func_name: str) -> Optional[str]: 1a

622 """ 

623 Extract the docstring of a function from its source code. 

624 

625 Args: 

626 source_code (str): The source code of the function. 

627 func_name (str): The name of the function. 

628 

629 Returns: 

630 The docstring of the function. If the function has no docstring, returns None. 

631 """ 

632 parsed_code = ast.parse(source_code) 

633 

634 func_def = next( 

635 ( 

636 node 

637 for node in ast.walk(parsed_code) 

638 if isinstance( 

639 node, 

640 ( 

641 ast.FunctionDef, 

642 ast.AsyncFunctionDef, 

643 ), 

644 ) 

645 and node.name == func_name 

646 ), 

647 None, 

648 ) 

649 if func_def is None: 

650 raise ValueError(f"Function {func_name} not found in source code") 

651 

652 if ( 

653 func_def.body 

654 and isinstance(func_def.body[0], ast.Expr) 

655 and isinstance(func_def.body[0].value, ast.Constant) 

656 ): 

657 return str(func_def.body[0].value.value) 

658 return None 

659 

660 

661def expand_mapping_parameters( 1a

662 func: Callable[..., Any], parameters: dict[str, Any] 

663) -> list[dict[str, Any]]: 

664 """ 

665 Generates a list of call parameters to be used for individual calls in a mapping 

666 operation. 

667 

668 Args: 

669 func: The function to be called 

670 parameters: A dictionary of parameters with iterables to be mapped over 

671 

672 Returns: 

673 list: A list of dictionaries to be used as parameters for each 

674 call in the mapping operation 

675 """ 

676 # Ensure that any parameters in kwargs are expanded before this check 

677 parameters = explode_variadic_parameter(func, parameters) 

678 

679 iterable_parameters: dict[str, list[Any]] = {} 

680 static_parameters: dict[str, Any] = {} 

681 annotated_parameters: dict[str, Union[allow_failure[Any], quote[Any]]] = {} 

682 for key, val in parameters.items(): 

683 if isinstance(val, (allow_failure, quote)): 

684 # Unwrap annotated parameters to determine if they are iterable 

685 annotated_parameters[key] = val 

686 val: Any = val.unwrap() 

687 

688 if isinstance(val, unmapped): 

689 static_parameters[key] = cast(unmapped[Any], val).value 

690 elif isiterable(val): 

691 iterable_parameters[key] = list(val) 

692 else: 

693 static_parameters[key] = val 

694 

695 if not iterable_parameters: 

696 raise MappingMissingIterable( 

697 "No iterable parameters were received. Parameters for map must " 

698 f"include at least one iterable. Parameters: {parameters}" 

699 ) 

700 

701 iterable_parameter_lengths = { 

702 key: len(val) for key, val in iterable_parameters.items() 

703 } 

704 lengths = set(iterable_parameter_lengths.values()) 

705 if len(lengths) > 1: 

706 raise MappingLengthMismatch( 

707 "Received iterable parameters with different lengths. Parameters for map" 

708 f" must all be the same length. Got lengths: {iterable_parameter_lengths}" 

709 ) 

710 

711 map_length = list(lengths)[0] 

712 

713 call_parameters_list: list[dict[str, Any]] = [] 

714 for i in range(map_length): 

715 call_parameters = {key: value[i] for key, value in iterable_parameters.items()} 

716 call_parameters.update({key: value for key, value in static_parameters.items()}) 

717 

718 # Add default values for parameters; these are skipped earlier since they should 

719 # not be mapped over 

720 for key, value in get_parameter_defaults(func).items(): 

721 call_parameters.setdefault(key, value) 

722 

723 # Re-apply annotations to each key again 

724 for key, annotation in annotated_parameters.items(): 

725 call_parameters[key] = annotation.rewrap(call_parameters[key]) 

726 

727 # Collapse any previously exploded kwargs 

728 call_parameters_list.append(collapse_variadic_parameters(func, call_parameters)) 

729 

730 return call_parameters_list