Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/annotations.py: 58%
46 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import warnings 1a
2from operator import itemgetter 1a
3from typing import Any, cast 1a
5from pydantic import GetCoreSchemaHandler 1a
6from pydantic_core import core_schema, to_json 1a
7from typing_extensions import Self, TypeVar 1a
9T = TypeVar("T", infer_variance=True) 1a
12class BaseAnnotation(tuple[T]): 1a
13 """
14 Base class for Prefect annotation types.
16 Inherits from `tuple` for unpacking support in other tools.
17 """
19 __slots__ = () 1a
21 def __new__(cls, value: T) -> Self: 1a
22 return super().__new__(cls, (value,))
24 # use itemgetter to minimise overhead, just like namedtuple generated code would
25 value: T = cast(T, property(itemgetter(0))) 1a
27 def unwrap(self) -> T: 1a
28 return self[0]
30 def rewrap(self, value: T) -> Self: 1a
31 return type(self)(value)
33 def __eq__(self, other: Any) -> bool: 1a
34 if type(self) is not type(other):
35 return False
36 return super().__eq__(other)
38 def __repr__(self) -> str: 1a
39 return f"{type(self).__name__}({self[0]!r})"
42class unmapped(BaseAnnotation[T]): 1a
43 """
44 Wrapper for iterables.
46 Indicates that this input should be sent as-is to all runs created during a mapping
47 operation instead of being split.
48 """
50 def __getitem__(self, _: object) -> T: # type: ignore[override] # pyright: ignore[reportIncompatibleMethodOverride] 1a
51 # Internally, this acts as an infinite array where all items are the same value
52 return super().__getitem__(0)
55class allow_failure(BaseAnnotation[T]): 1a
56 """
57 Wrapper for states or futures.
59 Indicates that the upstream run for this input can be failed.
61 Generally, Prefect will not allow a downstream run to start if any of its inputs
62 are failed. This annotation allows you to opt into receiving a failed input
63 downstream.
65 If the input is from a failed run, the attached exception will be passed to your
66 function.
67 """
70class quote(BaseAnnotation[T]): 1a
71 """
72 Simple wrapper to mark an expression as a different type so it will not be coerced
73 by Prefect. For example, if you want to return a state from a flow without having
74 the flow assume that state.
76 quote will also instruct prefect to ignore introspection of the wrapped object
77 when passed as flow or task parameter. Parameter introspection can be a
78 significant performance hit when the object is a large collection,
79 e.g. a large dictionary or DataFrame, and each element needs to be visited. This
80 will disable task dependency tracking for the wrapped object, but likely will
81 increase performance.
83 ```
84 @task
85 def my_task(df):
86 ...
88 @flow
89 def my_flow():
90 my_task(quote(df))
91 ```
92 """
94 def unquote(self) -> T: 1a
95 return self.unwrap()
98# Backwards compatibility stub for `Quote` class
99class Quote(quote[T]): 1a
100 def __new__(cls, expr: T) -> Self: 1a
101 warnings.warn(
102 "Use of `Quote` is deprecated. Use `quote` instead.",
103 DeprecationWarning,
104 stacklevel=2,
105 )
106 return super().__new__(cls, expr)
109class NotSet: 1a
110 """
111 Singleton to distinguish `None` from a value that is not provided by the user.
112 """
115class freeze(BaseAnnotation[T]): 1a
116 """
117 Wrapper for parameters in deployments.
119 Indicates that this parameter should be frozen in the UI and not editable
120 when creating flow runs from this deployment.
122 Example:
123 ```python
124 @flow
125 def my_flow(customer_id: str):
126 # flow logic
128 deployment = my_flow.deploy(parameters={"customer_id": freeze("customer123")})
129 ```
130 """
132 def __new__(cls, value: T) -> Self: 1a
133 try:
134 to_json(value)
135 except Exception:
136 raise ValueError("Value must be JSON serializable")
137 return super().__new__(cls, value)
139 def unfreeze(self) -> T: 1a
140 """Return the unwrapped value."""
141 return self.unwrap()
143 @classmethod 1a
144 def __get_pydantic_core_schema__( 1a
145 cls, source: type[Any], handler: GetCoreSchemaHandler
146 ) -> core_schema.CoreSchema:
147 return core_schema.no_info_after_validator_function(
148 cls, # Use the class itself as the validator
149 core_schema.any_schema(),
150 serialization=core_schema.plain_serializer_function_ser_schema(
151 lambda x: x.unfreeze() # Serialize by unwrapping the value
152 ),
153 )