Coverage for /usr/local/lib/python3.12/site-packages/prefect/input/run_input.py: 26%
268 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2This module contains functions that allow sending type-checked `RunInput` data
3to flows at runtime. Flows can send back responses, establishing two-way
4channels with senders. These functions are particularly useful for systems that
5require ongoing data transfer or need to react to input quickly.
6real-time interaction and efficient data handling. It's designed to facilitate
7dynamic communication within distributed or microservices-oriented systems,
8making it ideal for scenarios requiring continuous data synchronization and
9processing. It's particularly useful for systems that require ongoing data
10input and output.
12The following is an example of two flows. One sends a random number to the
13other and waits for a response. The other receives the number, squares it, and
14sends the result back. The sender flow then prints the result.
16Sender flow:
18```python
19import random
20from uuid import UUID
21from prefect import flow
22from prefect.logging import get_run_logger
23from prefect.input import RunInput
25class NumberData(RunInput):
26 number: int
29@flow
30async def sender_flow(receiver_flow_run_id: UUID):
31 logger = get_run_logger()
33 the_number = random.randint(1, 100)
35 await NumberData(number=the_number).send_to(receiver_flow_run_id)
37 receiver = NumberData.receive(flow_run_id=receiver_flow_run_id)
38 squared = await receiver.next()
40 logger.info(f"{the_number} squared is {squared.number}")
41```
43Receiver flow:
44```python
45import random
46from uuid import UUID
47from prefect import flow
48from prefect.logging import get_run_logger
49from prefect.input import RunInput
51class NumberData(RunInput):
52 number: int
55@flow
56async def receiver_flow():
57 async for data in NumberData.receive():
58 squared = data.number ** 2
59 data.respond(NumberData(number=squared))
60```
61"""
63from __future__ import annotations 1a
65import inspect 1a
66from inspect import isclass 1a
67from typing import ( 1a
68 TYPE_CHECKING,
69 Any,
70 ClassVar,
71 Coroutine,
72 Dict,
73 Generic,
74 Literal,
75 Optional,
76 Set,
77 Type,
78 TypeVar,
79 Union,
80 cast,
81 overload,
82)
83from uuid import UUID, uuid4 1a
85import anyio 1a
86import pydantic 1a
87from pydantic import ConfigDict 1a
88from typing_extensions import Self 1a
90from prefect.input.actions import ( 1a
91 create_flow_run_input,
92 create_flow_run_input_from_model,
93 ensure_flow_run_id,
94 filter_flow_run_input,
95 read_flow_run_input,
96)
97from prefect.utilities.asyncutils import sync_compatible 1a
99if TYPE_CHECKING: 99 ↛ 100line 99 didn't jump to line 100 because the condition on line 99 was never true1a
100 from prefect.client.schemas.objects import FlowRunInput
101 from prefect.states import State
103from prefect._internal.pydantic.v2_schema import create_v2_schema, is_v2_model 1a
105R = TypeVar("R", bound="RunInput") 1a
106T = TypeVar("T", bound="object") 1a
108Keyset = Dict[ 1a
109 Union[Literal["description"], Literal["response"], Literal["schema"]], str
110]
113def keyset_from_paused_state(state: "State") -> Keyset: 1a
114 """
115 Get the keyset for the given Paused state.
117 Args:
118 - state (State): the state to get the keyset for
119 """
121 if not state.is_paused():
122 raise RuntimeError(f"{state.type.value!r} is unsupported.")
124 state_name = state.name or ""
125 base_key = f"{state_name.lower()}-{str(state.state_details.pause_key)}"
126 return keyset_from_base_key(base_key)
129def keyset_from_base_key(base_key: str) -> Keyset: 1a
130 """
131 Get the keyset for the given base key.
133 Args:
134 - base_key (str): the base key to get the keyset for
136 Returns:
137 - Dict[str, str]: the keyset
138 """
139 return {
140 "description": f"{base_key}-description",
141 "response": f"{base_key}-response",
142 "schema": f"{base_key}-schema",
143 }
146class RunInputMetadata(pydantic.BaseModel): 1a
147 key: str 1a
148 sender: Optional[str] = None 1a
149 receiver: UUID 1a
152class BaseRunInput(pydantic.BaseModel): 1a
153 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
155 _description: Optional[str] = pydantic.PrivateAttr(default=None) 1a
156 _metadata: RunInputMetadata = pydantic.PrivateAttr() 1a
158 @property 1a
159 def metadata(self) -> RunInputMetadata: 1a
160 return self._metadata
162 @classmethod 1a
163 def keyset_from_type(cls) -> Keyset: 1a
164 return keyset_from_base_key(cls.__name__.lower())
166 @classmethod 1a
167 @sync_compatible 1a
168 async def save(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None): 1a
169 """
170 Save the run input response to the given key.
172 Args:
173 - keyset (Keyset): the keyset to save the input for
174 - flow_run_id (UUID, optional): the flow run ID to save the input for
175 """
177 if is_v2_model(cls):
178 schema = create_v2_schema(cls.__name__, model_base=cls)
179 else:
180 schema = cls.model_json_schema(by_alias=True)
182 coro = create_flow_run_input(
183 key=keyset["schema"], value=schema, flow_run_id=flow_run_id
184 )
185 if TYPE_CHECKING:
186 assert inspect.iscoroutine(coro)
187 await coro
189 description = cls._description if isinstance(cls._description, str) else None
190 if description:
191 coro = create_flow_run_input(
192 key=keyset["description"],
193 value=description,
194 flow_run_id=flow_run_id,
195 )
196 if TYPE_CHECKING:
197 assert inspect.iscoroutine(coro)
198 await coro
200 @classmethod 1a
201 @sync_compatible 1a
202 async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self: 1a
203 """
204 Load the run input response from the given key.
206 Args:
207 - keyset (Keyset): the keyset to load the input for
208 - flow_run_id (UUID, optional): the flow run ID to load the input for
209 """
210 flow_run_id = ensure_flow_run_id(flow_run_id)
211 value = await read_flow_run_input(keyset["response"], flow_run_id=flow_run_id)
212 if value:
213 instance = cls(**value)
214 else:
215 instance = cls()
216 instance._metadata = RunInputMetadata(
217 key=keyset["response"], sender=None, receiver=flow_run_id
218 )
219 return instance
221 @classmethod 1a
222 def load_from_flow_run_input(cls, flow_run_input: "FlowRunInput") -> Self: 1a
223 """
224 Load the run input from a FlowRunInput object.
226 Args:
227 - flow_run_input (FlowRunInput): the flow run input to load the input for
228 """
229 instance = cls(**flow_run_input.decoded_value)
230 instance._metadata = RunInputMetadata(
231 key=flow_run_input.key,
232 sender=flow_run_input.sender,
233 receiver=flow_run_input.flow_run_id,
234 )
235 return instance
237 @classmethod 1a
238 def with_initial_data( 1a
239 cls: Type[R], description: Optional[str] = None, **kwargs: Any
240 ) -> Type[R]:
241 """
242 Create a new `RunInput` subclass with the given initial data as field
243 defaults.
245 Args:
246 - description (str, optional): a description to show when resuming
247 a flow run that requires input
248 - kwargs (Any): the initial data to populate the subclass
249 """
250 fields: Dict[str, Any] = {}
251 for key, value in kwargs.items():
252 original_field = cls.model_fields.get(key)
253 # Use the original field annotation if it exists, otherwise infer from value
254 field_type = original_field.annotation if original_field else type(value)
255 fields[key] = (field_type, value)
256 model = pydantic.create_model(cls.__name__, **fields, __base__=cls)
258 if description is not None:
259 model._description = description
261 return model
263 @sync_compatible 1a
264 async def respond( 1a
265 self,
266 run_input: "RunInput",
267 sender: Optional[str] = None,
268 key_prefix: Optional[str] = None,
269 ):
270 flow_run_id = None
271 if self.metadata.sender and self.metadata.sender.startswith("prefect.flow-run"):
272 _, _, id = self.metadata.sender.rpartition(".")
273 flow_run_id = UUID(id)
275 if not flow_run_id:
276 raise RuntimeError(
277 "Cannot respond to an input that was not sent by a flow run."
278 )
280 await _send_input(
281 flow_run_id=flow_run_id,
282 run_input=run_input,
283 sender=sender,
284 key_prefix=key_prefix,
285 )
287 @sync_compatible 1a
288 async def send_to( 1a
289 self,
290 flow_run_id: UUID,
291 sender: Optional[str] = None,
292 key_prefix: Optional[str] = None,
293 ):
294 await _send_input(
295 flow_run_id=flow_run_id,
296 run_input=self,
297 sender=sender,
298 key_prefix=key_prefix,
299 )
302class RunInput(BaseRunInput): 1a
303 @classmethod 1a
304 def receive( 1a
305 cls,
306 timeout: Optional[float] = 3600,
307 poll_interval: float = 10,
308 raise_timeout_error: bool = False,
309 exclude_keys: Optional[Set[str]] = None,
310 key_prefix: Optional[str] = None,
311 flow_run_id: Optional[UUID] = None,
312 ) -> GetInputHandler[Self]:
313 if key_prefix is None:
314 key_prefix = f"{cls.__name__.lower()}-auto"
316 return GetInputHandler(
317 run_input_cls=cls,
318 key_prefix=key_prefix,
319 timeout=timeout,
320 poll_interval=poll_interval,
321 raise_timeout_error=raise_timeout_error,
322 exclude_keys=exclude_keys,
323 flow_run_id=flow_run_id,
324 )
326 @classmethod 1a
327 def subclass_from_base_model_type( 1a
328 cls, model_cls: Type[pydantic.BaseModel]
329 ) -> Type["RunInput"]:
330 """
331 Create a new `RunInput` subclass from the given `pydantic.BaseModel`
332 subclass.
334 Args:
335 - model_cls (pydantic.BaseModel subclass): the class from which
336 to create the new `RunInput` subclass
337 """
338 return type(f"{model_cls.__name__}RunInput", (RunInput, model_cls), {}) # type: ignore
341class AutomaticRunInput(BaseRunInput, Generic[T]): 1a
342 value: T 1a
344 @classmethod 1a
345 @sync_compatible 1a
346 async def load(cls, keyset: Keyset, flow_run_id: Optional[UUID] = None) -> Self: 1a
347 """
348 Load the run input response from the given key.
350 Args:
351 - keyset (Keyset): the keyset to load the input for
352 - flow_run_id (UUID, optional): the flow run ID to load the input for
353 """
354 instance_coro = super().load(keyset, flow_run_id=flow_run_id)
355 if TYPE_CHECKING:
356 assert inspect.iscoroutine(instance_coro)
357 instance = await instance_coro
358 return instance.value
360 @classmethod 1a
361 def subclass_from_type(cls, _type: Type[T]) -> Type["AutomaticRunInput[T]"]: 1a
362 """
363 Create a new `AutomaticRunInput` subclass from the given type.
365 This method uses the type's name as a key prefix to identify related
366 flow run inputs. This helps in ensuring that values saved under a type
367 (like List[int]) are retrievable under the generic type name (like "list").
368 """
369 fields: Dict[str, Any] = {"value": (_type, ...)}
371 # Explanation for using getattr for type name extraction:
372 # - "__name__": This is the usual attribute for getting the name of
373 # most types.
374 # - "_name": Used as a fallback, some type annotations in Python 3.9
375 # and earlier might only have this attribute instead of __name__.
376 # - If neither is available, defaults to an empty string to prevent
377 # errors, but typically we should find at least one valid name
378 # attribute. This will match all automatic inputs sent to the flow
379 # run, rather than a specific type.
380 #
381 # This approach ensures compatibility across Python versions and
382 # handles various edge cases in type annotations.
384 type_prefix: str = getattr(
385 _type, "__name__", getattr(_type, "_name", "")
386 ).lower()
388 class_name = f"{type_prefix}AutomaticRunInput"
390 # Creating a new Pydantic model class dynamically with the name based
391 # on the type prefix.
392 new_cls: Type["AutomaticRunInput[T]"] = pydantic.create_model(
393 class_name, **fields, __base__=AutomaticRunInput
394 )
395 return new_cls
397 @classmethod 1a
398 def receive( 1a
399 cls,
400 timeout: Optional[float] = 3600,
401 poll_interval: float = 10,
402 raise_timeout_error: bool = False,
403 exclude_keys: Optional[Set[str]] = None,
404 key_prefix: Optional[str] = None,
405 flow_run_id: Optional[UUID] = None,
406 with_metadata: bool = False,
407 ) -> GetAutomaticInputHandler[T]:
408 key_prefix = key_prefix or f"{cls.__name__.lower()}-auto"
410 return GetAutomaticInputHandler(
411 run_input_cls=cls,
412 key_prefix=key_prefix,
413 timeout=timeout,
414 poll_interval=poll_interval,
415 raise_timeout_error=raise_timeout_error,
416 exclude_keys=exclude_keys,
417 flow_run_id=flow_run_id,
418 with_metadata=with_metadata,
419 )
422def run_input_subclass_from_type( 1a
423 _type: Union[Type[R], Type[T], pydantic.BaseModel],
424) -> Union[Type[AutomaticRunInput[T]], Type[R]]:
425 """
426 Create a new `RunInput` subclass from the given type.
427 """
428 if isclass(_type):
429 if issubclass(_type, RunInput):
430 return cast(Type[R], _type)
431 elif issubclass(_type, pydantic.BaseModel):
432 return cast(Type[R], RunInput.subclass_from_base_model_type(_type))
434 # Could be something like a typing._GenericAlias or any other type that
435 # isn't a `RunInput` subclass or `pydantic.BaseModel` subclass. Try passing
436 # it to AutomaticRunInput to see if we can create a model from it.
437 return cast(
438 Type[AutomaticRunInput[T]],
439 AutomaticRunInput.subclass_from_type(cast(Type[T], _type)),
440 )
443class GetInputHandler(Generic[R]): 1a
444 def __init__( 1a
445 self,
446 run_input_cls: Type[R],
447 key_prefix: str,
448 timeout: float | None = 3600,
449 poll_interval: float = 10,
450 raise_timeout_error: bool = False,
451 exclude_keys: Optional[Set[str]] = None,
452 flow_run_id: Optional[UUID] = None,
453 ):
454 self.run_input_cls: Type[R] = run_input_cls
455 self.key_prefix: str = key_prefix
456 self.timeout: float | None = timeout
457 self.poll_interval: float = poll_interval
458 self.exclude_keys: set[str] = set()
459 self.raise_timeout_error: bool = raise_timeout_error
460 self.flow_run_id: UUID = ensure_flow_run_id(flow_run_id)
462 if exclude_keys is not None:
463 self.exclude_keys.update(exclude_keys)
465 def __iter__(self) -> Self: 1a
466 return self
468 def __next__(self) -> R: 1a
469 try:
470 return cast(R, self.next())
471 except TimeoutError:
472 if self.raise_timeout_error:
473 raise
474 raise StopIteration
476 def __aiter__(self) -> Self: 1a
477 return self
479 async def __anext__(self) -> R: 1a
480 try:
481 coro = self.next()
482 if TYPE_CHECKING:
483 assert inspect.iscoroutine(coro)
484 return await coro
485 except TimeoutError:
486 if self.raise_timeout_error:
487 raise
488 raise StopAsyncIteration
490 async def filter_for_inputs(self) -> list["FlowRunInput"]: 1a
491 flow_run_inputs_coro = filter_flow_run_input(
492 key_prefix=self.key_prefix,
493 limit=1,
494 exclude_keys=self.exclude_keys,
495 flow_run_id=self.flow_run_id,
496 )
497 if TYPE_CHECKING:
498 assert inspect.iscoroutine(flow_run_inputs_coro)
500 flow_run_inputs = await flow_run_inputs_coro
502 if flow_run_inputs:
503 self.exclude_keys.add(*[i.key for i in flow_run_inputs])
505 return flow_run_inputs
507 def to_instance(self, flow_run_input: "FlowRunInput") -> R: 1a
508 return self.run_input_cls.load_from_flow_run_input(flow_run_input)
510 @sync_compatible 1a
511 async def next(self) -> R: 1a
512 flow_run_inputs = await self.filter_for_inputs()
513 if flow_run_inputs:
514 return self.to_instance(flow_run_inputs[0])
516 with anyio.fail_after(self.timeout):
517 while True:
518 await anyio.sleep(self.poll_interval)
519 flow_run_inputs = await self.filter_for_inputs()
520 if flow_run_inputs:
521 return self.to_instance(flow_run_inputs[0])
524class GetAutomaticInputHandler(Generic[T]): 1a
525 def __init__( 1a
526 self,
527 run_input_cls: Type[AutomaticRunInput[T]],
528 key_prefix: str,
529 timeout: float | None = 3600,
530 poll_interval: float = 10,
531 raise_timeout_error: bool = False,
532 exclude_keys: Optional[Set[str]] = None,
533 flow_run_id: Optional[UUID] = None,
534 with_metadata: bool = False,
535 ):
536 self.run_input_cls: Type[AutomaticRunInput[T]] = run_input_cls
537 self.key_prefix: str = key_prefix
538 self.timeout: float | None = timeout
539 self.poll_interval: float = poll_interval
540 self.exclude_keys: set[str] = set()
541 self.raise_timeout_error: bool = raise_timeout_error
542 self.flow_run_id: UUID = ensure_flow_run_id(flow_run_id)
543 self.with_metadata = with_metadata
545 if exclude_keys is not None:
546 self.exclude_keys.update(exclude_keys)
548 def __iter__(self) -> Self: 1a
549 return self
551 def __next__(self) -> T | AutomaticRunInput[T]: 1a
552 try:
553 not_coro = self.next()
554 if TYPE_CHECKING:
555 assert not isinstance(not_coro, Coroutine)
556 return not_coro
557 except TimeoutError:
558 if self.raise_timeout_error:
559 raise
560 raise StopIteration
562 def __aiter__(self) -> Self: 1a
563 return self
565 async def __anext__(self) -> Union[T, AutomaticRunInput[T]]: 1a
566 try:
567 coro = self.next()
568 if TYPE_CHECKING:
569 assert inspect.iscoroutine(coro)
570 return cast(Union[T, AutomaticRunInput[T]], await coro)
571 except TimeoutError:
572 if self.raise_timeout_error:
573 raise
574 raise StopAsyncIteration
576 async def filter_for_inputs(self) -> list["FlowRunInput"]: 1a
577 flow_run_inputs_coro = filter_flow_run_input(
578 key_prefix=self.key_prefix,
579 limit=1,
580 exclude_keys=self.exclude_keys,
581 flow_run_id=self.flow_run_id,
582 )
583 if TYPE_CHECKING:
584 assert inspect.iscoroutine(flow_run_inputs_coro)
586 flow_run_inputs = await flow_run_inputs_coro
588 if flow_run_inputs:
589 self.exclude_keys.add(*[i.key for i in flow_run_inputs])
591 return flow_run_inputs
593 @sync_compatible 1a
594 async def next(self) -> Union[T, AutomaticRunInput[T]]: 1a
595 flow_run_inputs = await self.filter_for_inputs()
596 if flow_run_inputs:
597 return self.to_instance(flow_run_inputs[0])
599 with anyio.fail_after(self.timeout):
600 while True:
601 await anyio.sleep(self.poll_interval)
602 flow_run_inputs = await self.filter_for_inputs()
603 if flow_run_inputs:
604 return self.to_instance(flow_run_inputs[0])
606 def to_instance( 1a
607 self, flow_run_input: "FlowRunInput"
608 ) -> Union[T, AutomaticRunInput[T]]:
609 run_input = self.run_input_cls.load_from_flow_run_input(flow_run_input)
611 if self.with_metadata:
612 return run_input
613 return run_input.value
616async def _send_input( 1a
617 flow_run_id: UUID,
618 run_input: RunInput | pydantic.BaseModel,
619 sender: Optional[str] = None,
620 key_prefix: Optional[str] = None,
621):
622 _run_input: Union[RunInput, AutomaticRunInput[Any]]
623 if isinstance(run_input, RunInput):
624 _run_input = run_input
625 else:
626 input_cls: Type[AutomaticRunInput[Any]] = run_input_subclass_from_type(
627 type(run_input)
628 )
629 _run_input = input_cls(value=run_input)
631 if key_prefix is None:
632 key_prefix = f"{_run_input.__class__.__name__.lower()}-auto"
634 key = f"{key_prefix}-{uuid4()}"
636 coro = create_flow_run_input_from_model(
637 key=key, flow_run_id=flow_run_id, model_instance=_run_input, sender=sender
638 )
639 if TYPE_CHECKING:
640 assert inspect.iscoroutine(coro)
642 await coro
645@sync_compatible 1a
646async def send_input( 1a
647 run_input: Any,
648 flow_run_id: UUID,
649 sender: Optional[str] = None,
650 key_prefix: Optional[str] = None,
651):
652 await _send_input(
653 flow_run_id=flow_run_id,
654 run_input=run_input,
655 sender=sender,
656 key_prefix=key_prefix,
657 )
660@overload 1a
661def receive_input( # type: ignore[overload-overlap] 661 ↛ exitline 661 didn't return from function 'receive_input' because 1a
662 input_type: Union[Type[R], pydantic.BaseModel],
663 timeout: Optional[float] = 3600,
664 poll_interval: float = 10,
665 raise_timeout_error: bool = False,
666 exclude_keys: Optional[Set[str]] = None,
667 key_prefix: Optional[str] = None,
668 flow_run_id: Optional[UUID] = None,
669 with_metadata: bool = False,
670) -> GetInputHandler[R]: ...
673@overload 1a
674def receive_input( 674 ↛ exitline 674 didn't return from function 'receive_input' because 1a
675 input_type: Type[T],
676 timeout: Optional[float] = 3600,
677 poll_interval: float = 10,
678 raise_timeout_error: bool = False,
679 exclude_keys: Optional[Set[str]] = None,
680 key_prefix: Optional[str] = None,
681 flow_run_id: Optional[UUID] = None,
682 with_metadata: bool = False,
683) -> GetAutomaticInputHandler[T]: ...
686def receive_input( 1a
687 input_type: Union[Type[R], Type[T], pydantic.BaseModel],
688 timeout: Optional[float] = 3600,
689 poll_interval: float = 10,
690 raise_timeout_error: bool = False,
691 exclude_keys: Optional[Set[str]] = None,
692 key_prefix: Optional[str] = None,
693 flow_run_id: Optional[UUID] = None,
694 with_metadata: bool = False,
695) -> Union[GetAutomaticInputHandler[T], GetInputHandler[R]]:
696 # The typing in this module is a bit complex, and at this point `mypy`
697 # thinks that `run_input_subclass_from_type` accepts a `Type[Never]` but
698 # the signature is the same as here:
699 # Union[Type[R], Type[T], pydantic.BaseModel],
700 # Seems like a possible mypy bug, so we'll ignore the type check here.
701 input_cls: Union[Type[AutomaticRunInput[T]], Type[R]] = (
702 run_input_subclass_from_type(input_type)
703 ) # type: ignore[arg-type]
705 if issubclass(input_cls, AutomaticRunInput):
706 return input_cls.receive(
707 timeout=timeout,
708 poll_interval=poll_interval,
709 raise_timeout_error=raise_timeout_error,
710 exclude_keys=exclude_keys,
711 key_prefix=key_prefix,
712 flow_run_id=flow_run_id,
713 with_metadata=with_metadata,
714 )
715 else:
716 return input_cls.receive(
717 timeout=timeout,
718 poll_interval=poll_interval,
719 raise_timeout_error=raise_timeout_error,
720 exclude_keys=exclude_keys,
721 key_prefix=key_prefix,
722 flow_run_id=flow_run_id,
723 )