Coverage for /usr/local/lib/python3.12/site-packages/prefect/variables.py: 35%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from typing import Any, Callable, Optional 1a

2 

3from pydantic import BaseModel, Field 1a

4 

5from prefect._internal.compatibility.async_dispatch import async_dispatch 1a

6from prefect._internal.compatibility.migration import getattr_migration 1a

7from prefect.client.orchestration import get_client 1a

8from prefect.client.schemas.actions import VariableCreate, VariableUpdate 1a

9from prefect.client.utilities import get_or_create_client 1a

10from prefect.exceptions import ObjectNotFound 1a

11from prefect.types import MAX_VARIABLE_NAME_LENGTH, StrictVariableValue 1a

12 

13 

14class Variable(BaseModel): 1a

15 """ 

16 Variables are named, mutable JSON values that can be shared across tasks and flows. 

17 

18 Arguments: 

19 name: A string identifying the variable. 

20 value: A string that is the value of the variable. 

21 tags: An optional list of strings to associate with the variable. 

22 """ 

23 

24 name: str = Field( 1a

25 default=..., 

26 description="The name of the variable", 

27 examples=["my_variable"], 

28 max_length=MAX_VARIABLE_NAME_LENGTH, 

29 ) 

30 value: StrictVariableValue = Field( 1a

31 default=..., 

32 description="The value of the variable", 

33 examples=["my-value"], 

34 ) 

35 tags: Optional[list[str]] = Field(default=None) 1a

36 

37 @classmethod 1a

38 async def aset( 1a

39 cls, 

40 name: str, 

41 value: StrictVariableValue, 

42 tags: Optional[list[str]] = None, 

43 overwrite: bool = False, 

44 ) -> "Variable": 

45 """ 

46 Asynchronously sets a new variable. If one exists with the same name, must pass `overwrite=True` 

47 

48 Returns the newly set variable object. 

49 

50 Args: 

51 - name: The name of the variable to set. 

52 - value: The value of the variable to set. 

53 - tags: An optional list of strings to associate with the variable. 

54 - overwrite: Whether to overwrite the variable if it already exists. 

55 

56 Example: 

57 Set a new variable and overwrite it if it already exists. 

58 

59 ``` 

60 from prefect.variables import Variable 

61 

62 @flow 

63 async def my_flow(): 

64 await Variable.aset(name="my_var",value="test_value", tags=["hi", "there"], overwrite=True) 

65 ``` 

66 """ 

67 client, _ = get_or_create_client() 

68 variable_exists = await client.read_variable_by_name(name) 

69 var_dict = {"name": name, "value": value, "tags": tags or []} 

70 

71 if variable_exists: 

72 if not overwrite: 

73 raise ValueError( 

74 f"Variable {name!r} already exists. Use `overwrite=True` to update it." 

75 ) 

76 await client.update_variable( 

77 variable=VariableUpdate.model_validate(var_dict) 

78 ) 

79 variable = await client.read_variable_by_name(name) 

80 for key in var_dict.keys(): 

81 var_dict.update({key: getattr(variable, key)}) 

82 else: 

83 await client.create_variable( 

84 variable=VariableCreate.model_validate(var_dict) 

85 ) 

86 

87 return cls.model_validate(var_dict) 

88 

89 @classmethod 1a

90 @async_dispatch(aset) 1a

91 def set( 1a

92 cls, 

93 name: str, 

94 value: StrictVariableValue, 

95 tags: Optional[list[str]] = None, 

96 overwrite: bool = False, 

97 ) -> "Variable": 

98 """ 

99 Sets a new variable. If one exists with the same name, must pass `overwrite=True` 

100 

101 Returns the newly set variable object. 

102 

103 Args: 

104 - name: The name of the variable to set. 

105 - value: The value of the variable to set. 

106 - tags: An optional list of strings to associate with the variable. 

107 - overwrite: Whether to overwrite the variable if it already exists. 

108 

109 Example: 

110 Set a new variable and overwrite it if it already exists. 

111 

112 ``` 

113 from prefect.variables import Variable 

114 

115 @flow 

116 def my_flow(): 

117 Variable.set(name="my_var",value="test_value", tags=["hi", "there"], overwrite=True) 

118 ``` 

119 """ 

120 with get_client(sync_client=True) as client: 

121 variable_exists = client.read_variable_by_name(name) 

122 var_dict = {"name": name, "value": value, "tags": tags or []} 

123 

124 if variable_exists: 

125 if not overwrite: 

126 raise ValueError( 

127 f"Variable {name!r} already exists. Use `overwrite=True` to update it." 

128 ) 

129 client.update_variable(variable=VariableUpdate.model_validate(var_dict)) 

130 variable = client.read_variable_by_name(name) 

131 for key in var_dict.keys(): 

132 var_dict.update({key: getattr(variable, key)}) 

133 else: 

134 client.create_variable(variable=VariableCreate.model_validate(var_dict)) 

135 

136 return cls.model_validate(var_dict) 

137 

138 @classmethod 1a

139 async def aget( 1a

140 cls, 

141 name: str, 

142 default: StrictVariableValue = None, 

143 ) -> StrictVariableValue: 

144 """ 

145 Asynchronously get a variable's value by name. 

146 

147 If the variable does not exist, return the default value. 

148 

149 Args: 

150 - name: The name of the variable value to get. 

151 - default: The default value to return if the variable does not exist. 

152 

153 Example: 

154 Get a variable's value by name. 

155 ```python 

156 from prefect import flow 

157 from prefect.variables import Variable 

158 

159 @flow 

160 async def my_flow(): 

161 var = await Variable.aget("my_var") 

162 ``` 

163 """ 

164 client, _ = get_or_create_client() 

165 variable = await client.read_variable_by_name(name) 

166 

167 return variable.value if variable else default 

168 

169 @classmethod 1a

170 @async_dispatch(aget) 1a

171 def get( 1a

172 cls, 

173 name: str, 

174 default: StrictVariableValue = None, 

175 ) -> StrictVariableValue: 

176 """ 

177 Get a variable's value by name. 

178 

179 If the variable does not exist, return the default value. 

180 

181 Args: 

182 - name: The name of the variable value to get. 

183 - default: The default value to return if the variable does not exist. 

184 

185 Example: 

186 Get a variable's value by name. 

187 ```python 

188 from prefect import flow 

189 from prefect.variables import Variable 

190 

191 @flow 

192 def my_flow(): 

193 var = Variable.get("my_var") 

194 ``` 

195 """ 

196 with get_client(sync_client=True) as client: 

197 variable = client.read_variable_by_name(name) 

198 

199 return variable.value if variable else default 

200 

201 @classmethod 1a

202 async def aunset(cls, name: str) -> bool: 1a

203 """ 

204 Asynchronously unset a variable by name. 

205 

206 Args: 

207 - name: The name of the variable to unset. 

208 

209 Returns `True` if the variable was deleted, `False` if the variable did not exist. 

210 

211 Example: 

212 Unset a variable by name. 

213 ```python 

214 from prefect import flow 

215 from prefect.variables import Variable 

216 

217 @flow 

218 async def my_flow(): 

219 await Variable.aunset("my_var") 

220 ``` 

221 """ 

222 client, _ = get_or_create_client() 

223 try: 

224 await client.delete_variable_by_name(name=name) 

225 return True 

226 except ObjectNotFound: 

227 return False 

228 

229 @classmethod 1a

230 @async_dispatch(aunset) 1a

231 def unset(cls, name: str) -> bool: 1a

232 """ 

233 Unset a variable by name. 

234 

235 Args: 

236 - name: The name of the variable to unset. 

237 

238 Returns `True` if the variable was deleted, `False` if the variable did not exist. 

239 

240 Example: 

241 Unset a variable by name. 

242 ```python 

243 from prefect import flow 

244 from prefect.variables import Variable 

245 

246 @flow 

247 def my_flow(): 

248 Variable.unset("my_var") 

249 ``` 

250 """ 

251 with get_client(sync_client=True) as client: 

252 try: 

253 client.delete_variable_by_name(name=name) 

254 return True 

255 except ObjectNotFound: 

256 return False 

257 

258 

259__getattr__: Callable[[str], Any] = getattr_migration(__name__) 1a