Coverage for /usr/local/lib/python3.12/site-packages/prefect/input/run_input.py: 26%

268 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2This module contains functions that allow sending type-checked `RunInput` data 

3to flows at runtime. Flows can send back responses, establishing two-way 

4channels with senders. These functions are particularly useful for systems that 

5require ongoing data transfer or need to react to input quickly. 

6real-time interaction and efficient data handling. It's designed to facilitate 

7dynamic communication within distributed or microservices-oriented systems, 

8making it ideal for scenarios requiring continuous data synchronization and 

9processing. It's particularly useful for systems that require ongoing data 

10input and output. 

11 

12The following is an example of two flows. One sends a random number to the 

13other and waits for a response. The other receives the number, squares it, and 

14sends the result back. The sender flow then prints the result. 

15 

16Sender flow: 

17 

18```python 

19import random 

20from uuid import UUID 

21from prefect import flow 

22from prefect.logging import get_run_logger 

23from prefect.input import RunInput 

24 

25class NumberData(RunInput): 

26 number: int 

27 

28 

29@flow 

30async def sender_flow(receiver_flow_run_id: UUID): 

31 logger = get_run_logger() 

32 

33 the_number = random.randint(1, 100) 

34 

35 await NumberData(number=the_number).send_to(receiver_flow_run_id) 

36 

37 receiver = NumberData.receive(flow_run_id=receiver_flow_run_id) 

38 squared = await receiver.next() 

39 

40 logger.info(f"{the_number} squared is {squared.number}") 

41``` 

42 

43Receiver flow: 

44```python 

45import random 

46from uuid import UUID 

47from prefect import flow 

48from prefect.logging import get_run_logger 

49from prefect.input import RunInput 

50 

51class NumberData(RunInput): 

52 number: int 

53 

54 

55@flow 

56async def receiver_flow(): 

57 async for data in NumberData.receive(): 

58 squared = data.number ** 2 

59 data.respond(NumberData(number=squared)) 

60``` 

61""" 

62 

63from __future__ import annotations 1a

64 

65import inspect 1a

66from inspect import isclass 1a

67from typing import ( 1a

68 TYPE_CHECKING, 

69 Any, 

70 ClassVar, 

71 Coroutine, 

72 Dict, 

73 Generic, 

74 Literal, 

75 Optional, 

76 Set, 

77 Type, 

78 TypeVar, 

79 Union, 

80 cast, 

81 overload, 

82) 

83from uuid import UUID, uuid4 1a

84 

85import anyio 1a

86import pydantic 1a

87from pydantic import ConfigDict 1a

88from typing_extensions import Self 1a

89 

90from prefect.input.actions import ( 1a

91 create_flow_run_input, 

92 create_flow_run_input_from_model, 

93 ensure_flow_run_id, 

94 filter_flow_run_input, 

95 read_flow_run_input, 

96) 

97from prefect.utilities.asyncutils import sync_compatible 1a

98 

99if TYPE_CHECKING: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true1a

100 from prefect.client.schemas.objects import FlowRunInput 

101 from prefect.states import State 

102 

103from prefect._internal.pydantic.v2_schema import create_v2_schema, is_v2_model 1a

104 

105R = TypeVar("R", bound="RunInput") 1a

106T = TypeVar("T", bound="object") 1a

107 

108Keyset = Dict[ 1a

109 Union[Literal["description"], Literal["response"], Literal["schema"]], str 

110] 

111 

112 

113def keyset_from_paused_state(state: "State") -> Keyset: 1a

114 """ 

115 Get the keyset for the given Paused state. 

116 

117 Args: 

118 - state (State): the state to get the keyset for 

119 """ 

120 

121 if not state.is_paused(): 

122 raise RuntimeError(f"{state.type.value!r} is unsupported.") 

123 

124 state_name = state.name or "" 

125 base_key = f"{state_name.lower()}-{str(state.state_details.pause_key)}" 

126 return keyset_from_base_key(base_key) 

127 

128 

129def keyset_from_base_key(base_key: str) -> Keyset: 1a

130 """ 

131 Get the keyset for the given base key. 

132 

133 Args: 

134 - base_key (str): the base key to get the keyset for 

135 

136 Returns: 

137 - Dict[str, str]: the keyset 

138 """ 

139 return { 

140 "description": f"{base_key}-description", 

141 "response": f"{base_key}-response", 

142 "schema": f"{base_key}-schema", 

143 } 

144 

145 

146class RunInputMetadata(pydantic.BaseModel): 1a

147 key: str 1a

148 sender: Optional[str] = None 1a

149 receiver: UUID 1a

150 

151 

152class BaseRunInput(pydantic.BaseModel): 1a

153 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

154 

155 _description: Optional[str] = pydantic.PrivateAttr(default=None) 1a

156 _metadata: RunInputMetadata = pydantic.PrivateAttr() 1a

157 

158 @property 1a

159 def metadata(self) -> RunInputMetadata: 1a

160 return self._metadata 

161 

162 @classmethod 1a

163 def keyset_from_type(cls) -> Keyset: 1a

164 return keyset_from_base_key(cls.__name__.lower()) 

165 

166 @classmethod 1a

167 @sync_compatible 1a

168 async def save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None): 1a

169 """ 

170 Save the run input response to the given key. 

171 

172 Args: 

173 - keyset (Keyset): the keyset to save the input for 

174 - flow_run_id (UUID, optional): the flow run ID to save the input for 

175 """ 

176 

177 if is_v2_model(cls): 

178 schema = create_v2_schema(cls.__name__, model_base=cls) 

179 else: 

180 schema = cls.model_json_schema(by_alias=True) 

181 

182 coro = create_flow_run_input( 

183 key=keyset["schema"], value=schema, flow_run_id=flow_run_id 

184 ) 

185 if TYPE_CHECKING: 

186 assert inspect.iscoroutine(coro) 

187 await coro 

188 

189 description = cls._description if isinstance(cls._description, str) else None 

190 if description: 

191 coro = create_flow_run_input( 

192 key=keyset["description"], 

193 value=description, 

194 flow_run_id=flow_run_id, 

195 ) 

196 if TYPE_CHECKING: 

197 assert inspect.iscoroutine(coro) 

198 await coro 

199 

200 @classmethod 1a

201 @sync_compatible 1a

202 async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self: 1a

203 """ 

204 Load the run input response from the given key. 

205 

206 Args: 

207 - keyset (Keyset): the keyset to load the input for 

208 - flow_run_id (UUID, optional): the flow run ID to load the input for 

209 """ 

210 flow_run_id = ensure_flow_run_id(flow_run_id) 

211 value = await read_flow_run_input(keyset["response"], flow_run_id=flow_run_id) 

212 if value: 

213 instance = cls(**value) 

214 else: 

215 instance = cls() 

216 instance._metadata = RunInputMetadata( 

217 key=keyset["response"], sender=None, receiver=flow_run_id 

218 ) 

219 return instance 

220 

221 @classmethod 1a

222 def load_from_flow_run_input(cls, flow_run_input: "FlowRunInput") -> Self: 1a

223 """ 

224 Load the run input from a FlowRunInput object. 

225 

226 Args: 

227 - flow_run_input (FlowRunInput): the flow run input to load the input for 

228 """ 

229 instance = cls(**flow_run_input.decoded_value) 

230 instance._metadata = RunInputMetadata( 

231 key=flow_run_input.key, 

232 sender=flow_run_input.sender, 

233 receiver=flow_run_input.flow_run_id, 

234 ) 

235 return instance 

236 

237 @classmethod 1a

238 def with_initial_data( 1a

239 cls: Type[R], description: Optional[str] = None, **kwargs: Any 

240 ) -> Type[R]: 

241 """ 

242 Create a new `RunInput` subclass with the given initial data as field 

243 defaults. 

244 

245 Args: 

246 - description (str, optional): a description to show when resuming 

247 a flow run that requires input 

248 - kwargs (Any): the initial data to populate the subclass 

249 """ 

250 fields: Dict[str, Any] = {} 

251 for key, value in kwargs.items(): 

252 original_field = cls.model_fields.get(key) 

253 # Use the original field annotation if it exists, otherwise infer from value 

254 field_type = original_field.annotation if original_field else type(value) 

255 fields[key] = (field_type, value) 

256 model = pydantic.create_model(cls.__name__, **fields, __base__=cls) 

257 

258 if description is not None: 

259 model._description = description 

260 

261 return model 

262 

263 @sync_compatible 1a

264 async def respond( 1a

265 self, 

266 run_input: "RunInput", 

267 sender: Optional[str] = None, 

268 key_prefix: Optional[str] = None, 

269 ): 

270 flow_run_id = None 

271 if self.metadata.sender and self.metadata.sender.startswith("prefect.flow-run"): 

272 _, _, id = self.metadata.sender.rpartition(".") 

273 flow_run_id = UUID(id) 

274 

275 if not flow_run_id: 

276 raise RuntimeError( 

277 "Cannot respond to an input that was not sent by a flow run." 

278 ) 

279 

280 await _send_input( 

281 flow_run_id=flow_run_id, 

282 run_input=run_input, 

283 sender=sender, 

284 key_prefix=key_prefix, 

285 ) 

286 

287 @sync_compatible 1a

288 async def send_to( 1a

289 self, 

290 flow_run_id: UUID, 

291 sender: Optional[str] = None, 

292 key_prefix: Optional[str] = None, 

293 ): 

294 await _send_input( 

295 flow_run_id=flow_run_id, 

296 run_input=self, 

297 sender=sender, 

298 key_prefix=key_prefix, 

299 ) 

300 

301 

302class RunInput(BaseRunInput): 1a

303 @classmethod 1a

304 def receive( 1a

305 cls, 

306 timeout: Optional[float] = 3600, 

307 poll_interval: float = 10, 

308 raise_timeout_error: bool = False, 

309 exclude_keys: Optional[Set[str]] = None, 

310 key_prefix: Optional[str] = None, 

311 flow_run_id: Optional[UUID] = None, 

312 ) -> GetInputHandler[Self]: 

313 if key_prefix is None: 

314 key_prefix = f"{cls.__name__.lower()}-auto" 

315 

316 return GetInputHandler( 

317 run_input_cls=cls, 

318 key_prefix=key_prefix, 

319 timeout=timeout, 

320 poll_interval=poll_interval, 

321 raise_timeout_error=raise_timeout_error, 

322 exclude_keys=exclude_keys, 

323 flow_run_id=flow_run_id, 

324 ) 

325 

326 @classmethod 1a

327 def subclass_from_base_model_type( 1a

328 cls, model_cls: Type[pydantic.BaseModel] 

329 ) -> Type["RunInput"]: 

330 """ 

331 Create a new `RunInput` subclass from the given `pydantic.BaseModel` 

332 subclass. 

333 

334 Args: 

335 - model_cls (pydantic.BaseModel subclass): the class from which 

336 to create the new `RunInput` subclass 

337 """ 

338 return type(f"{model_cls.__name__}RunInput", (RunInput, model_cls), {}) # type: ignore 

339 

340 

341class AutomaticRunInput(BaseRunInput, Generic[T]): 1a

342 value: T 1a

343 

344 @classmethod 1a

345 @sync_compatible 1a

346 async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self: 1a

347 """ 

348 Load the run input response from the given key. 

349 

350 Args: 

351 - keyset (Keyset): the keyset to load the input for 

352 - flow_run_id (UUID, optional): the flow run ID to load the input for 

353 """ 

354 instance_coro = super().load(keyset, flow_run_id=flow_run_id) 

355 if TYPE_CHECKING: 

356 assert inspect.iscoroutine(instance_coro) 

357 instance = await instance_coro 

358 return instance.value 

359 

360 @classmethod 1a

361 def subclass_from_type(cls, _type: Type[T]) -> Type["AutomaticRunInput[T]"]: 1a

362 """ 

363 Create a new `AutomaticRunInput` subclass from the given type. 

364 

365 This method uses the type's name as a key prefix to identify related 

366 flow run inputs. This helps in ensuring that values saved under a type 

367 (like List[int]) are retrievable under the generic type name (like "list"). 

368 """ 

369 fields: Dict[str, Any] = {"value": (_type, ...)} 

370 

371 # Explanation for using getattr for type name extraction: 

372 # - "__name__": This is the usual attribute for getting the name of 

373 # most types. 

374 # - "_name": Used as a fallback, some type annotations in Python 3.9 

375 # and earlier might only have this attribute instead of __name__. 

376 # - If neither is available, defaults to an empty string to prevent 

377 # errors, but typically we should find at least one valid name 

378 # attribute. This will match all automatic inputs sent to the flow 

379 # run, rather than a specific type. 

380 # 

381 # This approach ensures compatibility across Python versions and 

382 # handles various edge cases in type annotations. 

383 

384 type_prefix: str = getattr( 

385 _type, "__name__", getattr(_type, "_name", "") 

386 ).lower() 

387 

388 class_name = f"{type_prefix}AutomaticRunInput" 

389 

390 # Creating a new Pydantic model class dynamically with the name based 

391 # on the type prefix. 

392 new_cls: Type["AutomaticRunInput[T]"] = pydantic.create_model( 

393 class_name, **fields, __base__=AutomaticRunInput 

394 ) 

395 return new_cls 

396 

397 @classmethod 1a

398 def receive( 1a

399 cls, 

400 timeout: Optional[float] = 3600, 

401 poll_interval: float = 10, 

402 raise_timeout_error: bool = False, 

403 exclude_keys: Optional[Set[str]] = None, 

404 key_prefix: Optional[str] = None, 

405 flow_run_id: Optional[UUID] = None, 

406 with_metadata: bool = False, 

407 ) -> GetAutomaticInputHandler[T]: 

408 key_prefix = key_prefix or f"{cls.__name__.lower()}-auto" 

409 

410 return GetAutomaticInputHandler( 

411 run_input_cls=cls, 

412 key_prefix=key_prefix, 

413 timeout=timeout, 

414 poll_interval=poll_interval, 

415 raise_timeout_error=raise_timeout_error, 

416 exclude_keys=exclude_keys, 

417 flow_run_id=flow_run_id, 

418 with_metadata=with_metadata, 

419 ) 

420 

421 

422def run_input_subclass_from_type( 1a

423 _type: Union[Type[R], Type[T], pydantic.BaseModel], 

424) -> Union[Type[AutomaticRunInput[T]], Type[R]]: 

425 """ 

426 Create a new `RunInput` subclass from the given type. 

427 """ 

428 if isclass(_type): 

429 if issubclass(_type, RunInput): 

430 return cast(Type[R], _type) 

431 elif issubclass(_type, pydantic.BaseModel): 

432 return cast(Type[R], RunInput.subclass_from_base_model_type(_type)) 

433 

434 # Could be something like a typing._GenericAlias or any other type that 

435 # isn't a `RunInput` subclass or `pydantic.BaseModel` subclass. Try passing 

436 # it to AutomaticRunInput to see if we can create a model from it. 

437 return cast( 

438 Type[AutomaticRunInput[T]], 

439 AutomaticRunInput.subclass_from_type(cast(Type[T], _type)), 

440 ) 

441 

442 

443class GetInputHandler(Generic[R]): 1a

444 def __init__( 1a

445 self, 

446 run_input_cls: Type[R], 

447 key_prefix: str, 

448 timeout: float | None = 3600, 

449 poll_interval: float = 10, 

450 raise_timeout_error: bool = False, 

451 exclude_keys: Optional[Set[str]] = None, 

452 flow_run_id: Optional[UUID] = None, 

453 ): 

454 self.run_input_cls: Type[R] = run_input_cls 

455 self.key_prefix: str = key_prefix 

456 self.timeout: float | None = timeout 

457 self.poll_interval: float = poll_interval 

458 self.exclude_keys: set[str] = set() 

459 self.raise_timeout_error: bool = raise_timeout_error 

460 self.flow_run_id: UUID = ensure_flow_run_id(flow_run_id) 

461 

462 if exclude_keys is not None: 

463 self.exclude_keys.update(exclude_keys) 

464 

465 def __iter__(self) -> Self: 1a

466 return self 

467 

468 def __next__(self) -> R: 1a

469 try: 

470 return cast(R, self.next()) 

471 except TimeoutError: 

472 if self.raise_timeout_error: 

473 raise 

474 raise StopIteration 

475 

476 def __aiter__(self) -> Self: 1a

477 return self 

478 

479 async def __anext__(self) -> R: 1a

480 try: 

481 coro = self.next() 

482 if TYPE_CHECKING: 

483 assert inspect.iscoroutine(coro) 

484 return await coro 

485 except TimeoutError: 

486 if self.raise_timeout_error: 

487 raise 

488 raise StopAsyncIteration 

489 

490 async def filter_for_inputs(self) -> list["FlowRunInput"]: 1a

491 flow_run_inputs_coro = filter_flow_run_input( 

492 key_prefix=self.key_prefix, 

493 limit=1, 

494 exclude_keys=self.exclude_keys, 

495 flow_run_id=self.flow_run_id, 

496 ) 

497 if TYPE_CHECKING: 

498 assert inspect.iscoroutine(flow_run_inputs_coro) 

499 

500 flow_run_inputs = await flow_run_inputs_coro 

501 

502 if flow_run_inputs: 

503 self.exclude_keys.add(*[i.key for i in flow_run_inputs]) 

504 

505 return flow_run_inputs 

506 

507 def to_instance(self, flow_run_input: "FlowRunInput") -> R: 1a

508 return self.run_input_cls.load_from_flow_run_input(flow_run_input) 

509 

510 @sync_compatible 1a

511 async def next(self) -> R: 1a

512 flow_run_inputs = await self.filter_for_inputs() 

513 if flow_run_inputs: 

514 return self.to_instance(flow_run_inputs[0]) 

515 

516 with anyio.fail_after(self.timeout): 

517 while True: 

518 await anyio.sleep(self.poll_interval) 

519 flow_run_inputs = await self.filter_for_inputs() 

520 if flow_run_inputs: 

521 return self.to_instance(flow_run_inputs[0]) 

522 

523 

524class GetAutomaticInputHandler(Generic[T]): 1a

525 def __init__( 1a

526 self, 

527 run_input_cls: Type[AutomaticRunInput[T]], 

528 key_prefix: str, 

529 timeout: float | None = 3600, 

530 poll_interval: float = 10, 

531 raise_timeout_error: bool = False, 

532 exclude_keys: Optional[Set[str]] = None, 

533 flow_run_id: Optional[UUID] = None, 

534 with_metadata: bool = False, 

535 ): 

536 self.run_input_cls: Type[AutomaticRunInput[T]] = run_input_cls 

537 self.key_prefix: str = key_prefix 

538 self.timeout: float | None = timeout 

539 self.poll_interval: float = poll_interval 

540 self.exclude_keys: set[str] = set() 

541 self.raise_timeout_error: bool = raise_timeout_error 

542 self.flow_run_id: UUID = ensure_flow_run_id(flow_run_id) 

543 self.with_metadata = with_metadata 

544 

545 if exclude_keys is not None: 

546 self.exclude_keys.update(exclude_keys) 

547 

548 def __iter__(self) -> Self: 1a

549 return self 

550 

551 def __next__(self) -> T | AutomaticRunInput[T]: 1a

552 try: 

553 not_coro = self.next() 

554 if TYPE_CHECKING: 

555 assert not isinstance(not_coro, Coroutine) 

556 return not_coro 

557 except TimeoutError: 

558 if self.raise_timeout_error: 

559 raise 

560 raise StopIteration 

561 

562 def __aiter__(self) -> Self: 1a

563 return self 

564 

565 async def __anext__(self) -> Union[T, AutomaticRunInput[T]]: 1a

566 try: 

567 coro = self.next() 

568 if TYPE_CHECKING: 

569 assert inspect.iscoroutine(coro) 

570 return cast(Union[T, AutomaticRunInput[T]], await coro) 

571 except TimeoutError: 

572 if self.raise_timeout_error: 

573 raise 

574 raise StopAsyncIteration 

575 

576 async def filter_for_inputs(self) -> list["FlowRunInput"]: 1a

577 flow_run_inputs_coro = filter_flow_run_input( 

578 key_prefix=self.key_prefix, 

579 limit=1, 

580 exclude_keys=self.exclude_keys, 

581 flow_run_id=self.flow_run_id, 

582 ) 

583 if TYPE_CHECKING: 

584 assert inspect.iscoroutine(flow_run_inputs_coro) 

585 

586 flow_run_inputs = await flow_run_inputs_coro 

587 

588 if flow_run_inputs: 

589 self.exclude_keys.add(*[i.key for i in flow_run_inputs]) 

590 

591 return flow_run_inputs 

592 

593 @sync_compatible 1a

594 async def next(self) -> Union[T, AutomaticRunInput[T]]: 1a

595 flow_run_inputs = await self.filter_for_inputs() 

596 if flow_run_inputs: 

597 return self.to_instance(flow_run_inputs[0]) 

598 

599 with anyio.fail_after(self.timeout): 

600 while True: 

601 await anyio.sleep(self.poll_interval) 

602 flow_run_inputs = await self.filter_for_inputs() 

603 if flow_run_inputs: 

604 return self.to_instance(flow_run_inputs[0]) 

605 

606 def to_instance( 1a

607 self, flow_run_input: "FlowRunInput" 

608 ) -> Union[T, AutomaticRunInput[T]]: 

609 run_input = self.run_input_cls.load_from_flow_run_input(flow_run_input) 

610 

611 if self.with_metadata: 

612 return run_input 

613 return run_input.value 

614 

615 

616async def _send_input( 1a

617 flow_run_id: UUID, 

618 run_input: RunInput | pydantic.BaseModel, 

619 sender: Optional[str] = None, 

620 key_prefix: Optional[str] = None, 

621): 

622 _run_input: Union[RunInput, AutomaticRunInput[Any]] 

623 if isinstance(run_input, RunInput): 

624 _run_input = run_input 

625 else: 

626 input_cls: Type[AutomaticRunInput[Any]] = run_input_subclass_from_type( 

627 type(run_input) 

628 ) 

629 _run_input = input_cls(value=run_input) 

630 

631 if key_prefix is None: 

632 key_prefix = f"{_run_input.__class__.__name__.lower()}-auto" 

633 

634 key = f"{key_prefix}-{uuid4()}" 

635 

636 coro = create_flow_run_input_from_model( 

637 key=key, flow_run_id=flow_run_id, model_instance=_run_input, sender=sender 

638 ) 

639 if TYPE_CHECKING: 

640 assert inspect.iscoroutine(coro) 

641 

642 await coro 

643 

644 

645@sync_compatible 1a

646async def send_input( 1a

647 run_input: Any, 

648 flow_run_id: UUID, 

649 sender: Optional[str] = None, 

650 key_prefix: Optional[str] = None, 

651): 

652 await _send_input( 

653 flow_run_id=flow_run_id, 

654 run_input=run_input, 

655 sender=sender, 

656 key_prefix=key_prefix, 

657 ) 

658 

659 

660@overload 1a

661def receive_input( # type: ignore[overload-overlap] 661 ↛ exitline 661 didn't return from function 'receive_input' because 1a

662 input_type: Union[Type[R], pydantic.BaseModel], 

663 timeout: Optional[float] = 3600, 

664 poll_interval: float = 10, 

665 raise_timeout_error: bool = False, 

666 exclude_keys: Optional[Set[str]] = None, 

667 key_prefix: Optional[str] = None, 

668 flow_run_id: Optional[UUID] = None, 

669 with_metadata: bool = False, 

670) -> GetInputHandler[R]: ... 

671 

672 

673@overload 1a

674def receive_input( 674 ↛ exitline 674 didn't return from function 'receive_input' because 1a

675 input_type: Type[T], 

676 timeout: Optional[float] = 3600, 

677 poll_interval: float = 10, 

678 raise_timeout_error: bool = False, 

679 exclude_keys: Optional[Set[str]] = None, 

680 key_prefix: Optional[str] = None, 

681 flow_run_id: Optional[UUID] = None, 

682 with_metadata: bool = False, 

683) -> GetAutomaticInputHandler[T]: ... 

684 

685 

686def receive_input( 1a

687 input_type: Union[Type[R], Type[T], pydantic.BaseModel], 

688 timeout: Optional[float] = 3600, 

689 poll_interval: float = 10, 

690 raise_timeout_error: bool = False, 

691 exclude_keys: Optional[Set[str]] = None, 

692 key_prefix: Optional[str] = None, 

693 flow_run_id: Optional[UUID] = None, 

694 with_metadata: bool = False, 

695) -> Union[GetAutomaticInputHandler[T], GetInputHandler[R]]: 

696 # The typing in this module is a bit complex, and at this point `mypy` 

697 # thinks that `run_input_subclass_from_type` accepts a `Type[Never]` but 

698 # the signature is the same as here: 

699 # Union[Type[R], Type[T], pydantic.BaseModel], 

700 # Seems like a possible mypy bug, so we'll ignore the type check here. 

701 input_cls: Union[Type[AutomaticRunInput[T]], Type[R]] = ( 

702 run_input_subclass_from_type(input_type) 

703 ) # type: ignore[arg-type] 

704 

705 if issubclass(input_cls, AutomaticRunInput): 

706 return input_cls.receive( 

707 timeout=timeout, 

708 poll_interval=poll_interval, 

709 raise_timeout_error=raise_timeout_error, 

710 exclude_keys=exclude_keys, 

711 key_prefix=key_prefix, 

712 flow_run_id=flow_run_id, 

713 with_metadata=with_metadata, 

714 ) 

715 else: 

716 return input_cls.receive( 

717 timeout=timeout, 

718 poll_interval=poll_interval, 

719 raise_timeout_error=raise_timeout_error, 

720 exclude_keys=exclude_keys, 

721 key_prefix=key_prefix, 

722 flow_run_id=flow_run_id, 

723 )