Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/schema_tools/hydration.py: 37%

180 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import json 1a

2from abc import ABC, abstractmethod 1a

3from collections.abc import Callable, Sequence 1a

4from typing import Any, Optional, cast 1a

5 

6import jinja2 1a

7from pydantic import BaseModel, Field 1a

8from sqlalchemy.ext.asyncio import AsyncSession 1a

9from typing_extensions import Self, TypeAlias, TypeIs 1a

10 

11from prefect.types import StrictVariableValue 1a

12 

13 

14class HydrationContext(BaseModel): 1a

15 workspace_variables: dict[ 1a

16 str, 

17 StrictVariableValue, 

18 ] = Field(default_factory=dict) 

19 render_workspace_variables: bool = Field(default=False) 1a

20 raise_on_error: bool = Field(default=False) 1a

21 render_jinja: bool = Field(default=False) 1a

22 jinja_context: dict[str, Any] = Field(default_factory=dict) 1a

23 

24 @classmethod 1a

25 async def build( 1a

26 cls, 

27 session: AsyncSession, 

28 raise_on_error: bool = False, 

29 render_jinja: bool = False, 

30 render_workspace_variables: bool = False, 

31 ) -> Self: 

32 from prefect.server.database.orm_models import Variable 

33 from prefect.server.models.variables import read_variables 

34 

35 variables: Sequence[Variable] 

36 if render_workspace_variables: 

37 variables = await read_variables( 

38 session=session, 

39 ) 

40 else: 

41 variables = [] 

42 

43 return cls( 

44 workspace_variables={ 

45 variable.name: variable.value for variable in variables 

46 }, 

47 raise_on_error=raise_on_error, 

48 render_jinja=render_jinja, 

49 render_workspace_variables=render_workspace_variables, 

50 ) 

51 

52 

53Handler: TypeAlias = Callable[[dict[str, Any], HydrationContext], Any] 1a

54PrefectKind: TypeAlias = Optional[str] 1a

55 

56_handlers: dict[PrefectKind, Handler] = {} 1a

57 

58 

59class Placeholder: 1a

60 def __eq__(self, other: Any) -> bool: 1a

61 return isinstance(other, type(self)) 

62 

63 @property 1a

64 def is_error(self) -> bool: 1a

65 return False 

66 

67 

68class RemoveValue(Placeholder): 1a

69 pass 1a

70 

71 

72def _remove_value(value: Any) -> TypeIs[RemoveValue]: 1a

73 return isinstance(value, RemoveValue) 

74 

75 

76class HydrationError(Placeholder, Exception, ABC): 1a

77 def __init__(self, detail: Optional[str] = None): 1a

78 self.detail = detail 

79 

80 @property 1a

81 def is_error(self) -> bool: 1a

82 return True 

83 

84 @property 1a

85 @abstractmethod 1a

86 def message(self) -> str: 1a

87 raise NotImplementedError("Must be implemented by subclass") 

88 

89 def __eq__(self, other: Any) -> bool: 1a

90 return isinstance(other, type(self)) and self.message == other.message 

91 

92 def __str__(self) -> str: 1a

93 return self.message 

94 

95 

96class KeyNotFound(HydrationError): 1a

97 @property 1a

98 def message(self) -> str: 1a

99 return f"Missing '{self.key}' key in __prefect object" 

100 

101 @property 1a

102 @abstractmethod 1a

103 def key(self) -> str: 1a

104 raise NotImplementedError("Must be implemented by subclass") 

105 

106 

107class ValueNotFound(KeyNotFound): 1a

108 @property 1a

109 def key(self) -> str: 1a

110 return "value" 

111 

112 

113class TemplateNotFound(KeyNotFound): 1a

114 @property 1a

115 def key(self) -> str: 1a

116 return "template" 

117 

118 

119class VariableNameNotFound(KeyNotFound): 1a

120 @property 1a

121 def key(self) -> str: 1a

122 return "variable_name" 

123 

124 

125class InvalidJSON(HydrationError): 1a

126 @property 1a

127 def message(self) -> str: 1a

128 message = "Invalid JSON" 

129 if self.detail: 

130 message += f": {self.detail}" 

131 return message 

132 

133 

134class InvalidJinja(HydrationError): 1a

135 @property 1a

136 def message(self) -> str: 1a

137 message = "Invalid jinja" 

138 if self.detail: 

139 message += f": {self.detail}" 

140 return message 

141 

142 

143class WorkspaceVariableNotFound(HydrationError): 1a

144 @property 1a

145 def variable_name(self) -> str: 1a

146 assert self.detail is not None 

147 return self.detail 

148 

149 @property 1a

150 def message(self) -> str: 1a

151 return f"Variable '{self.detail}' not found in workspace." 

152 

153 

154class WorkspaceVariable(Placeholder): 1a

155 def __init__(self, variable_name: str) -> None: 1a

156 self.variable_name = variable_name 

157 

158 def __eq__(self, other: Any) -> bool: 1a

159 return ( 

160 isinstance(other, type(self)) and self.variable_name == other.variable_name 

161 ) 

162 

163 

164class ValidJinja(Placeholder): 1a

165 def __init__(self, template: str) -> None: 1a

166 self.template = template 

167 

168 def __eq__(self, other: Any) -> bool: 1a

169 return isinstance(other, type(self)) and self.template == other.template 

170 

171 

172def handler(kind: PrefectKind) -> Callable[[Handler], Handler]: 1a

173 def decorator(func: Handler) -> Handler: 1a

174 _handlers[kind] = func 1a

175 return func 1a

176 

177 return decorator 1a

178 

179 

180def call_handler(kind: PrefectKind, obj: dict[str, Any], ctx: HydrationContext) -> Any: 1a

181 if kind not in _handlers: 

182 return obj.get("value", None) 

183 

184 res = _handlers[kind](obj, ctx) 

185 if ctx.raise_on_error and isinstance(res, HydrationError): 

186 raise res 

187 return res 

188 

189 

190@handler("none") 1a

191def null_handler(obj: dict[str, Any], ctx: HydrationContext): 1a

192 if "value" in obj: 

193 # null handler is a pass through, so we want to continue to hydrate 

194 return _hydrate(obj["value"], ctx) 

195 else: 

196 return ValueNotFound() 

197 

198 

199@handler("json") 1a

200def json_handler(obj: dict[str, Any], ctx: HydrationContext): 1a

201 if "value" in obj: 

202 if isinstance(obj["value"], dict): 

203 dehydrated_json = _hydrate(obj["value"], ctx) 

204 else: 

205 dehydrated_json = obj["value"] 

206 

207 # If the result is a Placeholder, we should return it as is 

208 if isinstance(dehydrated_json, Placeholder): 

209 return dehydrated_json 

210 

211 try: 

212 return json.loads(dehydrated_json) 

213 except (json.decoder.JSONDecodeError, TypeError) as e: 

214 return InvalidJSON(detail=str(e)) 

215 else: 

216 # If `value` is not in the object, we need special handling to help 

217 # the UI. For now if an object looks like {"__prefect_kind": "json"} 

218 # We will remove it from the parent object. e.x. 

219 # {"a": {"__prefect_kind": "json"}} -> {} 

220 # or 

221 # [{"__prefect_kind": "json"}] -> [] 

222 return RemoveValue() 

223 

224 

225@handler("jinja") 1a

226def jinja_handler(obj: dict[str, Any], ctx: HydrationContext) -> Any: 1a

227 from prefect.server.utilities.user_templates import ( 

228 TemplateSecurityError, 

229 render_user_template_sync, 

230 validate_user_template, 

231 ) 

232 

233 if "template" in obj: 

234 if isinstance(obj["template"], dict): 

235 dehydrated_jinja = _hydrate(obj["template"], ctx) 

236 else: 

237 dehydrated_jinja = obj["template"] 

238 

239 # If the result is a Placeholder, we should return it as is 

240 if isinstance(dehydrated_jinja, Placeholder): 

241 return dehydrated_jinja 

242 

243 try: 

244 validate_user_template(dehydrated_jinja) 

245 except (jinja2.exceptions.TemplateSyntaxError, TemplateSecurityError) as exc: 

246 return InvalidJinja(detail=str(exc)) 

247 

248 if ctx.render_jinja: 

249 return render_user_template_sync(dehydrated_jinja, ctx.jinja_context) 

250 else: 

251 return ValidJinja(template=dehydrated_jinja) 

252 else: 

253 return TemplateNotFound() 

254 

255 

256@handler("workspace_variable") 1a

257def workspace_variable_handler(obj: dict[str, Any], ctx: HydrationContext) -> Any: 1a

258 if "variable_name" in obj: 

259 if isinstance(obj["variable_name"], dict): 

260 dehydrated_variable = _hydrate(obj["variable_name"], ctx) 

261 else: 

262 dehydrated_variable = obj["variable_name"] 

263 

264 # If the result is a Placeholder, we should return it as is 

265 if isinstance(dehydrated_variable, Placeholder): 

266 return dehydrated_variable 

267 

268 if not ctx.render_workspace_variables: 

269 return WorkspaceVariable(variable_name=dehydrated_variable) 

270 

271 if dehydrated_variable in ctx.workspace_variables: 

272 return ctx.workspace_variables[dehydrated_variable] 

273 else: 

274 return WorkspaceVariableNotFound(detail=dehydrated_variable) 

275 else: 

276 # Special handling if `variable_name` is not in the object. 

277 # If an object looks like {"__prefect_kind": "workspace_variable"} 

278 # we will remove it from the parent object. e.x. 

279 # {"a": {"__prefect_kind": "workspace_variable"}} -> {} 

280 # or 

281 # [{"__prefect_kind": "workspace_variable"}] -> [] 

282 # or 

283 # {"__prefect_kind": "workspace_variable"} -> {} 

284 return RemoveValue() 

285 

286 

287def hydrate( 1a

288 obj: dict[str, Any], ctx: Optional[HydrationContext] = None 

289) -> dict[str, Any]: 

290 res: dict[str, Any] = _hydrate(obj, ctx) 

291 

292 if _remove_value(res): 

293 res = {} 

294 

295 return res 

296 

297 

298def _hydrate(obj: Any, ctx: Optional[HydrationContext] = None) -> Any: 1a

299 if ctx is None: 

300 ctx = HydrationContext() 

301 

302 if isinstance(obj, dict) and "__prefect_kind" in obj: 

303 obj_dict: dict[str, Any] = obj 

304 prefect_kind = obj_dict["__prefect_kind"] 

305 return call_handler(prefect_kind, obj_dict, ctx) 

306 else: 

307 if isinstance(obj, dict): 

308 return { 

309 key: hydrated_value 

310 for key, value in cast(dict[str, Any], obj).items() 

311 if not _remove_value(hydrated_value := _hydrate(value, ctx)) 

312 } 

313 elif isinstance(obj, list): 

314 return [ 

315 hydrated_element 

316 for element in cast(list[Any], obj) 

317 if not _remove_value(hydrated_element := _hydrate(element, ctx)) 

318 ] 

319 else: 

320 return obj