Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/schema_tools/hydration.py: 37%
180 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import json 1a
2from abc import ABC, abstractmethod 1a
3from collections.abc import Callable, Sequence 1a
4from typing import Any, Optional, cast 1a
6import jinja2 1a
7from pydantic import BaseModel, Field 1a
8from sqlalchemy.ext.asyncio import AsyncSession 1a
9from typing_extensions import Self, TypeAlias, TypeIs 1a
11from prefect.types import StrictVariableValue 1a
14class HydrationContext(BaseModel): 1a
15 workspace_variables: dict[ 1a
16 str,
17 StrictVariableValue,
18 ] = Field(default_factory=dict)
19 render_workspace_variables: bool = Field(default=False) 1a
20 raise_on_error: bool = Field(default=False) 1a
21 render_jinja: bool = Field(default=False) 1a
22 jinja_context: dict[str, Any] = Field(default_factory=dict) 1a
24 @classmethod 1a
25 async def build( 1a
26 cls,
27 session: AsyncSession,
28 raise_on_error: bool = False,
29 render_jinja: bool = False,
30 render_workspace_variables: bool = False,
31 ) -> Self:
32 from prefect.server.database.orm_models import Variable
33 from prefect.server.models.variables import read_variables
35 variables: Sequence[Variable]
36 if render_workspace_variables:
37 variables = await read_variables(
38 session=session,
39 )
40 else:
41 variables = []
43 return cls(
44 workspace_variables={
45 variable.name: variable.value for variable in variables
46 },
47 raise_on_error=raise_on_error,
48 render_jinja=render_jinja,
49 render_workspace_variables=render_workspace_variables,
50 )
53Handler: TypeAlias = Callable[[dict[str, Any], HydrationContext], Any] 1a
54PrefectKind: TypeAlias = Optional[str] 1a
56_handlers: dict[PrefectKind, Handler] = {} 1a
59class Placeholder: 1a
60 def __eq__(self, other: Any) -> bool: 1a
61 return isinstance(other, type(self))
63 @property 1a
64 def is_error(self) -> bool: 1a
65 return False
68class RemoveValue(Placeholder): 1a
69 pass 1a
72def _remove_value(value: Any) -> TypeIs[RemoveValue]: 1a
73 return isinstance(value, RemoveValue)
76class HydrationError(Placeholder, Exception, ABC): 1a
77 def __init__(self, detail: Optional[str] = None): 1a
78 self.detail = detail
80 @property 1a
81 def is_error(self) -> bool: 1a
82 return True
84 @property 1a
85 @abstractmethod 1a
86 def message(self) -> str: 1a
87 raise NotImplementedError("Must be implemented by subclass")
89 def __eq__(self, other: Any) -> bool: 1a
90 return isinstance(other, type(self)) and self.message == other.message
92 def __str__(self) -> str: 1a
93 return self.message
96class KeyNotFound(HydrationError): 1a
97 @property 1a
98 def message(self) -> str: 1a
99 return f"Missing '{self.key}' key in __prefect object"
101 @property 1a
102 @abstractmethod 1a
103 def key(self) -> str: 1a
104 raise NotImplementedError("Must be implemented by subclass")
107class ValueNotFound(KeyNotFound): 1a
108 @property 1a
109 def key(self) -> str: 1a
110 return "value"
113class TemplateNotFound(KeyNotFound): 1a
114 @property 1a
115 def key(self) -> str: 1a
116 return "template"
119class VariableNameNotFound(KeyNotFound): 1a
120 @property 1a
121 def key(self) -> str: 1a
122 return "variable_name"
125class InvalidJSON(HydrationError): 1a
126 @property 1a
127 def message(self) -> str: 1a
128 message = "Invalid JSON"
129 if self.detail:
130 message += f": {self.detail}"
131 return message
134class InvalidJinja(HydrationError): 1a
135 @property 1a
136 def message(self) -> str: 1a
137 message = "Invalid jinja"
138 if self.detail:
139 message += f": {self.detail}"
140 return message
143class WorkspaceVariableNotFound(HydrationError): 1a
144 @property 1a
145 def variable_name(self) -> str: 1a
146 assert self.detail is not None
147 return self.detail
149 @property 1a
150 def message(self) -> str: 1a
151 return f"Variable '{self.detail}' not found in workspace."
154class WorkspaceVariable(Placeholder): 1a
155 def __init__(self, variable_name: str) -> None: 1a
156 self.variable_name = variable_name
158 def __eq__(self, other: Any) -> bool: 1a
159 return (
160 isinstance(other, type(self)) and self.variable_name == other.variable_name
161 )
164class ValidJinja(Placeholder): 1a
165 def __init__(self, template: str) -> None: 1a
166 self.template = template
168 def __eq__(self, other: Any) -> bool: 1a
169 return isinstance(other, type(self)) and self.template == other.template
172def handler(kind: PrefectKind) -> Callable[[Handler], Handler]: 1a
173 def decorator(func: Handler) -> Handler: 1a
174 _handlers[kind] = func 1a
175 return func 1a
177 return decorator 1a
180def call_handler(kind: PrefectKind, obj: dict[str, Any], ctx: HydrationContext) -> Any: 1a
181 if kind not in _handlers:
182 return obj.get("value", None)
184 res = _handlers[kind](obj, ctx)
185 if ctx.raise_on_error and isinstance(res, HydrationError):
186 raise res
187 return res
190@handler("none") 1a
191def null_handler(obj: dict[str, Any], ctx: HydrationContext): 1a
192 if "value" in obj:
193 # null handler is a pass through, so we want to continue to hydrate
194 return _hydrate(obj["value"], ctx)
195 else:
196 return ValueNotFound()
199@handler("json") 1a
200def json_handler(obj: dict[str, Any], ctx: HydrationContext): 1a
201 if "value" in obj:
202 if isinstance(obj["value"], dict):
203 dehydrated_json = _hydrate(obj["value"], ctx)
204 else:
205 dehydrated_json = obj["value"]
207 # If the result is a Placeholder, we should return it as is
208 if isinstance(dehydrated_json, Placeholder):
209 return dehydrated_json
211 try:
212 return json.loads(dehydrated_json)
213 except (json.decoder.JSONDecodeError, TypeError) as e:
214 return InvalidJSON(detail=str(e))
215 else:
216 # If `value` is not in the object, we need special handling to help
217 # the UI. For now if an object looks like {"__prefect_kind": "json"}
218 # We will remove it from the parent object. e.x.
219 # {"a": {"__prefect_kind": "json"}} -> {}
220 # or
221 # [{"__prefect_kind": "json"}] -> []
222 return RemoveValue()
225@handler("jinja") 1a
226def jinja_handler(obj: dict[str, Any], ctx: HydrationContext) -> Any: 1a
227 from prefect.server.utilities.user_templates import (
228 TemplateSecurityError,
229 render_user_template_sync,
230 validate_user_template,
231 )
233 if "template" in obj:
234 if isinstance(obj["template"], dict):
235 dehydrated_jinja = _hydrate(obj["template"], ctx)
236 else:
237 dehydrated_jinja = obj["template"]
239 # If the result is a Placeholder, we should return it as is
240 if isinstance(dehydrated_jinja, Placeholder):
241 return dehydrated_jinja
243 try:
244 validate_user_template(dehydrated_jinja)
245 except (jinja2.exceptions.TemplateSyntaxError, TemplateSecurityError) as exc:
246 return InvalidJinja(detail=str(exc))
248 if ctx.render_jinja:
249 return render_user_template_sync(dehydrated_jinja, ctx.jinja_context)
250 else:
251 return ValidJinja(template=dehydrated_jinja)
252 else:
253 return TemplateNotFound()
256@handler("workspace_variable") 1a
257def workspace_variable_handler(obj: dict[str, Any], ctx: HydrationContext) -> Any: 1a
258 if "variable_name" in obj:
259 if isinstance(obj["variable_name"], dict):
260 dehydrated_variable = _hydrate(obj["variable_name"], ctx)
261 else:
262 dehydrated_variable = obj["variable_name"]
264 # If the result is a Placeholder, we should return it as is
265 if isinstance(dehydrated_variable, Placeholder):
266 return dehydrated_variable
268 if not ctx.render_workspace_variables:
269 return WorkspaceVariable(variable_name=dehydrated_variable)
271 if dehydrated_variable in ctx.workspace_variables:
272 return ctx.workspace_variables[dehydrated_variable]
273 else:
274 return WorkspaceVariableNotFound(detail=dehydrated_variable)
275 else:
276 # Special handling if `variable_name` is not in the object.
277 # If an object looks like {"__prefect_kind": "workspace_variable"}
278 # we will remove it from the parent object. e.x.
279 # {"a": {"__prefect_kind": "workspace_variable"}} -> {}
280 # or
281 # [{"__prefect_kind": "workspace_variable"}] -> []
282 # or
283 # {"__prefect_kind": "workspace_variable"} -> {}
284 return RemoveValue()
287def hydrate( 1a
288 obj: dict[str, Any], ctx: Optional[HydrationContext] = None
289) -> dict[str, Any]:
290 res: dict[str, Any] = _hydrate(obj, ctx)
292 if _remove_value(res):
293 res = {}
295 return res
298def _hydrate(obj: Any, ctx: Optional[HydrationContext] = None) -> Any: 1a
299 if ctx is None:
300 ctx = HydrationContext()
302 if isinstance(obj, dict) and "__prefect_kind" in obj:
303 obj_dict: dict[str, Any] = obj
304 prefect_kind = obj_dict["__prefect_kind"]
305 return call_handler(prefect_kind, obj_dict, ctx)
306 else:
307 if isinstance(obj, dict):
308 return {
309 key: hydrated_value
310 for key, value in cast(dict[str, Any], obj).items()
311 if not _remove_value(hydrated_value := _hydrate(value, ctx))
312 }
313 elif isinstance(obj, list):
314 return [
315 hydrated_element
316 for element in cast(list[Any], obj)
317 if not _remove_value(hydrated_element := _hydrate(element, ctx))
318 ]
319 else:
320 return obj