Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/annotations.py: 58%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import warnings 1a

2from operator import itemgetter 1a

3from typing import Any, cast 1a

4 

5from pydantic import GetCoreSchemaHandler 1a

6from pydantic_core import core_schema, to_json 1a

7from typing_extensions import Self, TypeVar 1a

8 

9T = TypeVar("T", infer_variance=True) 1a

10 

11 

12class BaseAnnotation(tuple[T]): 1a

13 """ 

14 Base class for Prefect annotation types. 

15 

16 Inherits from `tuple` for unpacking support in other tools. 

17 """ 

18 

19 __slots__ = () 1a

20 

21 def __new__(cls, value: T) -> Self: 1a

22 return super().__new__(cls, (value,)) 

23 

24 # use itemgetter to minimise overhead, just like namedtuple generated code would 

25 value: T = cast(T, property(itemgetter(0))) 1a

26 

27 def unwrap(self) -> T: 1a

28 return self[0] 

29 

30 def rewrap(self, value: T) -> Self: 1a

31 return type(self)(value) 

32 

33 def __eq__(self, other: Any) -> bool: 1a

34 if type(self) is not type(other): 

35 return False 

36 return super().__eq__(other) 

37 

38 def __repr__(self) -> str: 1a

39 return f"{type(self).__name__}({self[0]!r})" 

40 

41 

42class unmapped(BaseAnnotation[T]): 1a

43 """ 

44 Wrapper for iterables. 

45 

46 Indicates that this input should be sent as-is to all runs created during a mapping 

47 operation instead of being split. 

48 """ 

49 

50 def __getitem__(self, _: object) -> T: # type: ignore[override] # pyright: ignore[reportIncompatibleMethodOverride] 1a

51 # Internally, this acts as an infinite array where all items are the same value 

52 return super().__getitem__(0) 

53 

54 

55class allow_failure(BaseAnnotation[T]): 1a

56 """ 

57 Wrapper for states or futures. 

58 

59 Indicates that the upstream run for this input can be failed. 

60 

61 Generally, Prefect will not allow a downstream run to start if any of its inputs 

62 are failed. This annotation allows you to opt into receiving a failed input 

63 downstream. 

64 

65 If the input is from a failed run, the attached exception will be passed to your 

66 function. 

67 """ 

68 

69 

70class quote(BaseAnnotation[T]): 1a

71 """ 

72 Simple wrapper to mark an expression as a different type so it will not be coerced 

73 by Prefect. For example, if you want to return a state from a flow without having 

74 the flow assume that state. 

75 

76 quote will also instruct prefect to ignore introspection of the wrapped object 

77 when passed as flow or task parameter. Parameter introspection can be a 

78 significant performance hit when the object is a large collection, 

79 e.g. a large dictionary or DataFrame, and each element needs to be visited. This 

80 will disable task dependency tracking for the wrapped object, but likely will 

81 increase performance. 

82 

83 ``` 

84 @task 

85 def my_task(df): 

86 ... 

87 

88 @flow 

89 def my_flow(): 

90 my_task(quote(df)) 

91 ``` 

92 """ 

93 

94 def unquote(self) -> T: 1a

95 return self.unwrap() 

96 

97 

98# Backwards compatibility stub for `Quote` class 

99class Quote(quote[T]): 1a

100 def __new__(cls, expr: T) -> Self: 1a

101 warnings.warn( 

102 "Use of `Quote` is deprecated. Use `quote` instead.", 

103 DeprecationWarning, 

104 stacklevel=2, 

105 ) 

106 return super().__new__(cls, expr) 

107 

108 

109class NotSet: 1a

110 """ 

111 Singleton to distinguish `None` from a value that is not provided by the user. 

112 """ 

113 

114 

115class freeze(BaseAnnotation[T]): 1a

116 """ 

117 Wrapper for parameters in deployments. 

118 

119 Indicates that this parameter should be frozen in the UI and not editable 

120 when creating flow runs from this deployment. 

121 

122 Example: 

123 ```python 

124 @flow 

125 def my_flow(customer_id: str): 

126 # flow logic 

127 

128 deployment = my_flow.deploy(parameters={"customer_id": freeze("customer123")}) 

129 ``` 

130 """ 

131 

132 def __new__(cls, value: T) -> Self: 1a

133 try: 

134 to_json(value) 

135 except Exception: 

136 raise ValueError("Value must be JSON serializable") 

137 return super().__new__(cls, value) 

138 

139 def unfreeze(self) -> T: 1a

140 """Return the unwrapped value.""" 

141 return self.unwrap() 

142 

143 @classmethod 1a

144 def __get_pydantic_core_schema__( 1a

145 cls, source: type[Any], handler: GetCoreSchemaHandler 

146 ) -> core_schema.CoreSchema: 

147 return core_schema.no_info_after_validator_function( 

148 cls, # Use the class itself as the validator 

149 core_schema.any_schema(), 

150 serialization=core_schema.plain_serializer_function_ser_schema( 

151 lambda x: x.unfreeze() # Serialize by unwrapping the value 

152 ), 

153 )