Coverage for opt/mealie/lib/python3.12/site-packages/mealie/schema/response/query_filter.py: 74%
404 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 14:03 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 14:03 +0000
1from __future__ import annotations 1t
3import re 1t
4from collections import deque 1t
5from enum import Enum 1t
6from typing import Any, cast 1t
7from uuid import UUID 1t
9import sqlalchemy as sa 1t
10from dateutil import parser as date_parser 1t
11from dateutil.parser import ParserError 1t
12from humps import decamelize 1t
13from sqlalchemy.ext.associationproxy import AssociationProxyInstance 1t
14from sqlalchemy.orm import InstrumentedAttribute, Mapper 1t
15from sqlalchemy.sql import sqltypes 1t
17from mealie.db.models._model_base import SqlAlchemyBase 1t
18from mealie.db.models._model_utils.datetime import NaiveDateTime 1t
19from mealie.db.models._model_utils.guid import GUID 1t
20from mealie.schema._mealie.mealie_model import MealieModel 1t
23class RelationalKeyword(Enum): 1t
24 IS = "IS" 1t
25 IS_NOT = "IS NOT" 1t
26 IN = "IN" 1t
27 NOT_IN = "NOT IN" 1t
28 CONTAINS_ALL = "CONTAINS ALL" 1t
29 LIKE = "LIKE" 1t
30 NOT_LIKE = "NOT LIKE" 1t
32 @classmethod 1t
33 def parse_component(cls, component: str) -> list[str] | None: 1t
34 """
35 Try to parse a component using a relational keyword
37 If no matching keyword is found, returns None
38 """
40 # extract the attribute name from the component
41 parsed_component = component.split(maxsplit=1) 1cdnoqkjpgraiehlfmsb
42 if len(parsed_component) < 2: 1cdnoqkjpgraiehlfmsb
43 return None 1cdnoqkjpgraiehlfmsb
45 # assume the component has already filtered out the value and try to match a keyword
46 # if we try to filter out the value without checking first, keywords with spaces won't parse correctly
47 possible_keyword = parsed_component[1].strip().lower() 1cdnoqkjpgraiehlfmsb
48 for rel_kw in sorted([keyword.value for keyword in cls], key=len, reverse=True): 1cdnoqkjpgraiehlfmsb
49 if rel_kw.lower() != possible_keyword: 49 ↛ 52line 49 didn't jump to line 52 because the condition on line 49 was always true1cdnoqkjpgraiehlfmsb
50 continue 1cdnoqkjpgraiehlfmsb
52 parsed_component[1] = rel_kw
53 return parsed_component
55 # there was no match, so the component may still have the value in it
56 try: 1cdnoqkjpgraiehlfmsb
57 _possible_keyword, _value = parsed_component[-1].rsplit(maxsplit=1) 1cdnoqkjpgraiehlfmsb
58 parsed_component = [parsed_component[0], _possible_keyword, _value] 1dqjaiehfmsb
59 except ValueError: 1cdnoqkjpgraiehlfmb
60 # the component has no value to filter out
61 return None 1cdnoqkjpgraiehlfmb
63 possible_keyword = parsed_component[1].strip().lower() 1dqjaiehfmsb
64 for rel_kw in sorted([keyword.value for keyword in cls], key=len, reverse=True): 1dqjaiehfmsb
65 if rel_kw.lower() != possible_keyword: 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was always true1dqjaiehfmsb
66 continue 1dqjaiehfmsb
68 parsed_component[1] = rel_kw
69 return parsed_component
71 return None 1dqjaiehfmsb
74class RelationalOperator(Enum): 1t
75 EQ = "=" 1t
76 NOTEQ = "<>" 1t
77 GT = ">" 1t
78 LT = "<" 1t
79 GTE = ">=" 1t
80 LTE = "<=" 1t
82 @classmethod 1t
83 def parse_component(cls, component: str) -> list[str] | None: 1t
84 """
85 Try to parse a component using a relational operator
87 If no matching operator is found, returns None
88 """
90 for rel_op in sorted([operator.value for operator in cls], key=len, reverse=True): 1cdnoqkjpgraiehlfmsb
91 if rel_op not in component: 1cdnoqkjpgraiehlfmsb
92 continue 1cdnoqkjpgraiehlfmsb
94 parsed_component = [base_component.strip() for base_component in component.split(rel_op) if base_component] 1cdnoqkjpgraiehlfmsb
95 parsed_component.insert(1, rel_op) 1cdnoqkjpgraiehlfmsb
96 return parsed_component 1cdnoqkjpgraiehlfmsb
98 return None 1cdnoqkjpgraiehlfmsb
101class LogicalOperator(Enum): 1t
102 AND = "AND" 1t
103 OR = "OR" 1t
106class QueryFilterJSONPart(MealieModel): 1t
107 left_parenthesis: str | None = None 1t
108 right_parenthesis: str | None = None 1t
109 logical_operator: LogicalOperator | None = None 1t
111 attribute_name: str | None = None 1t
112 relational_operator: RelationalKeyword | RelationalOperator | None = None 1t
113 value: str | list[str] | None = None 1t
116class QueryFilterJSON(MealieModel): 1t
117 parts: list[QueryFilterJSONPart] = [] 1t
120class QueryFilterBuilderComponent: 1t
121 """A single relational statement"""
123 @staticmethod 1t
124 def strip_quotes_from_string(val: str) -> str: 1t
125 if len(val) > 2 and val[0] == '"' and val[-1] == '"': 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true1cdnoqkjpgraiehlfmsb
126 return val[1:-1]
127 else:
128 return val 1cdnoqkjpgraiehlfmsb
130 def __init__( 1t
131 self, attribute_name: str, relationship: RelationalKeyword | RelationalOperator, value: str | list[str]
132 ) -> None:
133 self.attribute_name = decamelize(attribute_name) 1cdnoqkjpgraiehlfmsb
134 self.relationship = relationship 1cdnoqkjpgraiehlfmsb
136 # remove encasing quotes
137 if isinstance(value, str): 137 ↛ 140line 137 didn't jump to line 140 because the condition on line 137 was always true1cdnoqkjpgraiehlfmsb
138 value = self.strip_quotes_from_string(value) 1cdnoqkjpgraiehlfmsb
140 elif isinstance(value, list):
141 value = [self.strip_quotes_from_string(v) for v in value]
143 # validate relationship/value pairs
144 if relationship in [ 144 ↛ 149line 144 didn't jump to line 149 because the condition on line 144 was never true1cdnoqkjpgraiehlfmsb
145 RelationalKeyword.IN,
146 RelationalKeyword.NOT_IN,
147 RelationalKeyword.CONTAINS_ALL,
148 ] and not isinstance(value, list):
149 raise ValueError(
150 f"invalid query string: {relationship.value} must be given a list of values"
151 f"enclosed by {QueryFilterBuilder.l_list_sep} and {QueryFilterBuilder.r_list_sep}"
152 )
154 if relationship is RelationalKeyword.IS or relationship is RelationalKeyword.IS_NOT: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true1cdnoqkjpgraiehlfmsb
155 if not isinstance(value, str) or value.lower() not in ["null", "none"]:
156 raise ValueError(
157 f'invalid query string: "{relationship.value}" can only be used with "NULL", not "{value}"'
158 )
160 self.value = None
161 else:
162 self.value = value 1cdnoqkjpgraiehlfmsb
164 def __repr__(self) -> str: 1t
165 return f"[{self.attribute_name} {self.relationship.value} {self.value}]"
167 def validate(self, model_attr_type: Any) -> Any: 1t
168 """Validate value against an model attribute's type and return a validated value, or raise a ValueError"""
170 sanitized_values: list[Any]
171 if not isinstance(self.value, list): 171 ↛ 174line 171 didn't jump to line 174 because the condition on line 171 was always true1cdnoqkjpgraiehlfmb
172 sanitized_values = [self.value] 1cdnoqkjpgraiehlfmb
173 else:
174 sanitized_values = list(self.value)
176 for i, v in enumerate(sanitized_values): 1cdnoqkjpgraiehlfmb
177 # always allow querying for null values
178 if v is None: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true1cdnoqkjpgraiehlfmb
179 continue
181 if isinstance(model_attr_type, sqltypes.String): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true1cdnoqkjpgraiehlfmb
182 sanitized_values[i] = v.lower()
184 if self.relationship is RelationalKeyword.LIKE or self.relationship is RelationalKeyword.NOT_LIKE: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true1cdnoqkjpgraiehlfmb
185 if not isinstance(model_attr_type, sqltypes.String):
186 raise ValueError(
187 f'invalid query string: "{self.relationship.value}" can only be used with string columns'
188 )
190 if isinstance(model_attr_type, (GUID)): 1cdnoqkjpgraiehlfmb
191 try: 1cdnoqkjpgraiehlfmb
192 # we don't set value since a UUID is functionally identical to a string here
193 UUID(v) 1cdnoqkjpgraiehlfmb
194 except ValueError as e:
195 raise ValueError(f"invalid query string: invalid UUID '{v}'") from e
197 if isinstance(model_attr_type, sqltypes.Date | sqltypes.DateTime | NaiveDateTime): 1cdnoqkjpgraiehlfmb
198 try: 1cdkgaiehfb
199 dt = date_parser.parse(v) 1cdkgaiehfb
200 sanitized_values[i] = dt.date() if isinstance(model_attr_type, sqltypes.Date) else dt 1cdkgaiehfb
201 except ParserError as e:
202 raise ValueError(f"invalid query string: unknown date or datetime format '{v}'") from e
204 if isinstance(model_attr_type, sqltypes.Boolean): 1cdnoqkjpgraiehlfmb
205 try: 1damb
206 sanitized_values[i] = v.lower()[0] in ["t", "y"] or v == "1" 1damb
207 except IndexError as e:
208 raise ValueError("invalid query string") from e
210 return sanitized_values if isinstance(self.value, list) else sanitized_values[0] 1cdnoqkjpgraiehlfmb
212 def as_json_model(self) -> QueryFilterJSONPart: 1t
213 return QueryFilterJSONPart(
214 left_parenthesis=None,
215 right_parenthesis=None,
216 logical_operator=None,
217 attribute_name=self.attribute_name,
218 relational_operator=self.relationship,
219 value=self.value,
220 )
223class QueryFilterBuilder: 1t
224 l_group_sep: str = "(" 1t
225 r_group_sep: str = ")" 1t
226 group_seps: set[str] = {l_group_sep, r_group_sep} 1t
228 l_list_sep: str = "[" 1t
229 r_list_sep: str = "]" 1t
230 list_item_sep: str = "," 1t
232 def __init__(self, filter_string: str) -> None: 1t
233 # parse filter string
234 components = QueryFilterBuilder._break_filter_string_into_components(filter_string) 1cdnoqkjpgraiehlfmsb
235 base_components = QueryFilterBuilder._break_components_into_base_components(components) 1cdnoqkjpgraiehlfmsb
236 if base_components.count(QueryFilterBuilder.l_group_sep) != base_components.count( 1cdnoqkjpgraiehlfmsb
237 QueryFilterBuilder.r_group_sep
238 ):
239 raise ValueError("invalid query string: parenthesis are unbalanced") 1noqjpgahlmb
241 # parse base components into a filter group
242 self.filter_components = QueryFilterBuilder._parse_base_components_into_filter_components(base_components) 1cdnoqkjpgraiehlfmsb
244 def __repr__(self) -> str: 1t
245 joined = " ".join(
246 [
247 str(component.value if isinstance(component, LogicalOperator) else component)
248 for component in self.filter_components
249 ],
250 )
252 return f"<<{joined}>>"
254 @classmethod 1t
255 def _consolidate_group( 1t
256 cls, group: list[sa.ColumnElement], logical_operators: deque[LogicalOperator]
257 ) -> sa.ColumnElement:
258 consolidated_group_builder: sa.ColumnElement | None = None 1cdnoqkjpgraiehlfmsb
259 for i, element in enumerate(reversed(group)): 1cdnoqkjpgraiehlfmsb
260 if not i: 1cdnoqkjpgraiehlfmb
261 consolidated_group_builder = element 1cdnoqkjpgraiehlfmb
262 else:
263 operator = logical_operators.pop() 1cdkgaehfmb
264 if operator is LogicalOperator.AND: 264 ↛ 266line 264 didn't jump to line 266 because the condition on line 264 was always true1cdkgaehfmb
265 consolidated_group_builder = sa.and_(consolidated_group_builder, element) 1cdkgaehfmb
266 elif operator is LogicalOperator.OR:
267 consolidated_group_builder = sa.or_(consolidated_group_builder, element)
268 else:
269 raise ValueError(f"invalid logical operator {operator}")
271 if i == len(group) - 1: 1cdnoqkjpgraiehlfmb
272 return consolidated_group_builder.self_group() 1cdnoqkjpgraiehlfmb
274 @classmethod 1t
275 def get_model_and_model_attr_from_attr_string[Model: SqlAlchemyBase]( 1t
276 cls, attr_string: str, model: type[Model], *, query: sa.Select | None = None
277 ) -> tuple[SqlAlchemyBase, InstrumentedAttribute, sa.Select | None]:
278 """
279 Take an attribute string and traverse a database model and its relationships to get the desired
280 model and model attribute. Optionally provide a query to apply the necessary table joins.
282 If the attribute string is invalid, raises a `ValueError`.
284 For instance, the attribute string "user.name" on `RecipeModel`
285 will return the `User` model's `name` attribute.
287 Works with shallow attributes (e.g. "slug" from `RecipeModel`)
288 and arbitrarily deep ones (e.g. "recipe.group.preferences" on `RecipeTimelineEvent`).
289 """
290 mapper: Mapper
291 model_attr: InstrumentedAttribute | None = None 1tucdnoqkjpgraiehlfmsb
293 attribute_chain = decamelize(attr_string).split(".") 1tucdnoqkjpgraiehlfmsb
294 if not attribute_chain: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true1tucdnoqkjpgraiehlfmsb
295 raise ValueError("invalid query string: attribute name cannot be empty")
297 current_model: SqlAlchemyBase = model # type: ignore 1tucdnoqkjpgraiehlfmsb
298 for i, attribute_link in enumerate(attribute_chain): 298 ↛ 331line 298 didn't jump to line 331 because the loop on line 298 didn't complete1tucdnoqkjpgraiehlfmsb
299 try: 1tucdnoqkjpgraiehlfmsb
300 model_attr = getattr(current_model, attribute_link) 1tucdnoqkjpgraiehlfmsb
302 # proxied attributes can't be joined to the query directly, so we need to inspect the proxy
303 # and get the actual model and its attribute
304 if isinstance(model_attr, AssociationProxyInstance): 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true1tucdnoqkjpgraiehlfmsb
305 proxied_attribute_link = model_attr.target_collection
306 next_attribute_link = model_attr.value_attr
307 model_attr = getattr(current_model, proxied_attribute_link)
309 if query is not None:
310 query = query.join(model_attr, isouter=True)
312 mapper = sa.inspect(current_model)
313 relationship = mapper.relationships[proxied_attribute_link]
314 current_model = relationship.mapper.class_
315 model_attr = getattr(current_model, next_attribute_link)
317 # at the end of the chain there are no more relationships to inspect
318 if i == len(attribute_chain) - 1: 318 ↛ 321line 318 didn't jump to line 321 because the condition on line 318 was always true1tucdnoqkjpgraiehlfmsb
319 break 1tucdnoqkjpgraiehlfmsb
321 if query is not None:
322 query = query.join(model_attr, isouter=True)
324 mapper = sa.inspect(current_model)
325 relationship = mapper.relationships[attribute_link]
326 current_model = relationship.mapper.class_
328 except (AttributeError, KeyError) as e: 1cdnoqkjpgraiehlfmsb
329 raise ValueError(f"invalid attribute string: '{attr_string}' does not exist on this schema") from e 1cdnoqkjpgraiehlfmsb
331 if model_attr is None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true1tucdnoqkjpgraiehlfmsb
332 raise ValueError(f"invalid attribute string: '{attr_string}'")
334 return current_model, model_attr, query 1tucdnoqkjpgraiehlfmsb
336 @classmethod 1t
337 def _transform_model_attr(cls, model_attr: InstrumentedAttribute, model_attr_type: Any) -> InstrumentedAttribute: 1t
338 if isinstance(model_attr_type, sqltypes.String): 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true1cdnoqkjpgraiehlfmb
339 model_attr = sa.func.lower(model_attr)
341 return model_attr 1cdnoqkjpgraiehlfmb
343 @classmethod 1t
344 def _get_filter_element[Model: SqlAlchemyBase]( 1t
345 cls,
346 query: sa.Select,
347 component: QueryFilterBuilderComponent,
348 model: type[Model],
349 model_attr: InstrumentedAttribute,
350 model_attr_type: Any,
351 ) -> sa.ColumnElement:
352 original_model_attr = model_attr 1cdnoqkjpgraiehlfmb
353 model_attr = cls._transform_model_attr(model_attr, model_attr_type) 1cdnoqkjpgraiehlfmb
354 value = component.validate(model_attr_type) 1cdnoqkjpgraiehlfmb
356 # Keywords
357 if component.relationship is RelationalKeyword.IS: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true1cdnoqkjpgraiehlfmb
358 element = model_attr.is_(value)
359 elif component.relationship is RelationalKeyword.IS_NOT: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true1cdnoqkjpgraiehlfmb
360 element = model_attr.is_not(value)
361 elif component.relationship is RelationalKeyword.IN: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true1cdnoqkjpgraiehlfmb
362 element = model_attr.in_(value)
363 elif component.relationship is RelationalKeyword.NOT_IN: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true1cdnoqkjpgraiehlfmb
364 if original_model_attr.parent.entity != model:
365 subq = query.with_only_columns(model.id).where(model_attr.in_(value))
366 element = sa.not_(model.id.in_(subq))
367 else:
368 element = sa.not_(model_attr.in_(value))
370 elif component.relationship is RelationalKeyword.CONTAINS_ALL: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true1cdnoqkjpgraiehlfmb
371 if len(value) == 1:
372 element = model_attr.in_(value)
373 else:
374 primary_model_attr: InstrumentedAttribute = getattr(model, component.attribute_name.split(".")[0])
375 element = sa.and_(*(primary_model_attr.any(model_attr == v) for v in value))
376 elif component.relationship is RelationalKeyword.LIKE: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true1cdnoqkjpgraiehlfmb
377 element = model_attr.ilike(value)
378 elif component.relationship is RelationalKeyword.NOT_LIKE: 378 ↛ 379line 378 didn't jump to line 379 because the condition on line 378 was never true1cdnoqkjpgraiehlfmb
379 element = model_attr.not_ilike(value)
381 # Operators
382 elif component.relationship is RelationalOperator.EQ: 1cdnoqkjpgraiehlfmb
383 element = model_attr == value 1cdnoqkjpgraiehlfmb
384 elif component.relationship is RelationalOperator.NOTEQ: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true1cdkgaiehfb
385 element = model_attr != value
386 elif component.relationship is RelationalOperator.GT: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true1cdkgaiehfb
387 element = model_attr > value
388 elif component.relationship is RelationalOperator.LT: 388 ↛ 389line 388 didn't jump to line 389 because the condition on line 388 was never true1cdkgaiehfb
389 element = model_attr < value
390 elif component.relationship is RelationalOperator.GTE: 1cdkgaiehfb
391 element = model_attr >= value 1cdkgaiehfb
392 elif component.relationship is RelationalOperator.LTE: 392 ↛ 395line 392 didn't jump to line 395 because the condition on line 392 was always true1cdkgaiehfb
393 element = model_attr <= value 1cdkgaiehfb
394 else:
395 raise ValueError(f"invalid relationship {component.relationship}")
397 return element 1cdnoqkjpgraiehlfmb
399 def filter_query[Model: SqlAlchemyBase]( 1t
400 self, query: sa.Select, model: type[Model], column_aliases: dict[str, sa.ColumnElement] | None = None
401 ) -> sa.Select:
402 """
403 Filters a query based on the parsed filter string.
404 If you need to filter on a custom column expression (e.g. a computed property), you can supply column aliases
405 """
406 column_aliases = column_aliases or {} 1cdnoqkjpgraiehlfmsb
408 # join tables and build model chain
409 attr_model_map: dict[int, Any] = {} 1cdnoqkjpgraiehlfmsb
410 model_attr: InstrumentedAttribute
411 for i, component in enumerate(self.filter_components): 1cdnoqkjpgraiehlfmsb
412 if not isinstance(component, QueryFilterBuilderComponent): 1cdnoqkjpgraiehlfmsb
413 continue 1cdnoqkjpgaiehlfmsb
415 nested_model, model_attr, query = self.get_model_and_model_attr_from_attr_string( 1cdnoqkjpgraiehlfmsb
416 component.attribute_name, model, query=query
417 )
418 attr_model_map[i] = nested_model 1cdnoqkjpgraiehlfmb
420 # build query filter
421 partial_group: list[sa.ColumnElement] = [] 1cdnoqkjpgraiehlfmsb
422 partial_group_stack: deque[list[sa.ColumnElement]] = deque() 1cdnoqkjpgraiehlfmsb
423 logical_operator_stack: deque[LogicalOperator] = deque() 1cdnoqkjpgraiehlfmsb
424 for i, component in enumerate(self.filter_components): 1cdnoqkjpgraiehlfmsb
425 if component == self.l_group_sep: 1cdnoqkjpgraiehlfmb
426 partial_group_stack.append(partial_group) 1cdnoqkjpgaiehlfmb
427 partial_group = [] 1cdnoqkjpgaiehlfmb
429 elif component == self.r_group_sep: 1cdnoqkjpgraiehlfmb
430 if partial_group: 1cdnoqkjpgaiehlfmb
431 complete_group = self._consolidate_group(partial_group, logical_operator_stack) 1cdkaiefb
432 partial_group = partial_group_stack.pop() 1cdkaiefb
433 partial_group.append(complete_group) 1cdkaiefb
434 else:
435 partial_group = partial_group_stack.pop() 1cdnoqkjpgaiehlfmb
437 elif isinstance(component, LogicalOperator): 1cdnoqkjpgraiehlfmb
438 logical_operator_stack.append(component) 1cdnoqkjpgaiehlfmb
440 else:
441 component = cast(QueryFilterBuilderComponent, component) 1cdnoqkjpgraiehlfmb
442 base_attribute_name = component.attribute_name.split(".")[-1] 1cdnoqkjpgraiehlfmb
443 model_attr = getattr(attr_model_map[i], base_attribute_name) 1cdnoqkjpgraiehlfmb
445 if (column_alias := column_aliases.get(base_attribute_name)) is not None: 445 ↛ 446line 445 didn't jump to line 446 because the condition on line 445 was never true1cdnoqkjpgraiehlfmb
446 model_attr = column_alias
448 element = self._get_filter_element(query, component, model, model_attr, model_attr.type) 1cdnoqkjpgraiehlfmb
449 partial_group.append(element) 1cdnoqkjpgraiehlfmb
451 # combine the completed groups into one filter
452 while True: 1cdnoqkjpgraiehlfmsb
453 consolidated_group = self._consolidate_group(partial_group, logical_operator_stack) 1cdnoqkjpgraiehlfmsb
454 if not partial_group_stack: 454 ↛ 457line 454 didn't jump to line 457 because the condition on line 454 was always true1cdnoqkjpgraiehlfmsb
455 return query.filter(consolidated_group) 1cdnoqkjpgraiehlfmsb
456 else:
457 partial_group = partial_group_stack.pop()
458 partial_group.append(consolidated_group)
460 @staticmethod 1t
461 def _break_filter_string_into_components(filter_string: str) -> list[str]: 1t
462 """Recursively break filter string into components based on parenthesis groupings"""
463 components = [filter_string] 1cdnoqkjpgraiehlfmsb
464 in_quotes = False 1cdnoqkjpgraiehlfmsb
465 while True: 1cdnoqkjpgraiehlfmsb
466 subcomponents = [] 1cdnoqkjpgraiehlfmsb
467 for component in components: 1cdnoqkjpgraiehlfmsb
468 # don't parse components comprised of only a separator
469 if component in QueryFilterBuilder.group_seps: 1cdnoqkjpgraiehlfmsb
470 subcomponents.append(component) 1cdnoqkjpgaiehlfmsb
471 continue 1cdnoqkjpgaiehlfmsb
473 # construct a component until it hits the right separator
474 new_component = "" 1cdnoqkjpgraiehlfmsb
475 for c in component: 1cdnoqkjpgraiehlfmsb
476 # ignore characters in-between quotes
477 if c == '"': 1cdnoqkjpgraiehlfmsb
478 in_quotes = not in_quotes 1calfb
480 if c in QueryFilterBuilder.group_seps and not in_quotes: 1cdnoqkjpgraiehlfmsb
481 if new_component: 1cdnoqkjpgaiehlfmsb
482 subcomponents.append(new_component) 1cdnoqkjpgaiehlfmsb
484 subcomponents.append(c) 1cdnoqkjpgaiehlfmsb
485 new_component = "" 1cdnoqkjpgaiehlfmsb
486 continue 1cdnoqkjpgaiehlfmsb
488 new_component += c 1cdnoqkjpgraiehlfmsb
490 if new_component: 1cdnoqkjpgraiehlfmsb
491 subcomponents.append(new_component.strip()) 1cdnoqkjpgraiehlfmsb
493 if components == subcomponents: 1cdnoqkjpgraiehlfmsb
494 break 1cdnoqkjpgraiehlfmsb
496 components = subcomponents 1cdnoqkjpgraiehlfmsb
498 return components 1cdnoqkjpgraiehlfmsb
500 @staticmethod 1t
501 def _break_components_into_base_components(components: list[str]) -> list[str | list[str]]: 1t
502 """Further break down components by splitting at relational and logical operators"""
503 pattern = "|".join([f"\\b{operator.value}\\b" for operator in LogicalOperator]) 1cdnoqkjpgraiehlfmsb
504 logical_operators = re.compile(f"({pattern})", flags=re.IGNORECASE) 1cdnoqkjpgraiehlfmsb
506 in_list = False 1cdnoqkjpgraiehlfmsb
507 base_components: list[str | list] = [] 1cdnoqkjpgraiehlfmsb
508 list_value_components = [] 1cdnoqkjpgraiehlfmsb
509 for component in components: 1cdnoqkjpgraiehlfmsb
510 # parse out lists as their own singular sub component
511 subcomponents = component.split(QueryFilterBuilder.l_list_sep) 1cdnoqkjpgraiehlfmsb
512 for i, subcomponent in enumerate(subcomponents): 1cdnoqkjpgraiehlfmsb
513 if not i: 1cdnoqkjpgraiehlfmsb
514 continue 1cdnoqkjpgraiehlfmsb
516 for j, list_value_string in enumerate(subcomponent.split(QueryFilterBuilder.r_list_sep)): 1cdnojpgaelb
517 if j % 2: 1cdnojpgaelb
518 continue 1c
520 list_value_components.append( 1cdnojpgaelb
521 [val.strip() for val in list_value_string.split(QueryFilterBuilder.list_item_sep)]
522 )
524 quote_offset = 0 1cdnoqkjpgraiehlfmsb
525 subcomponents = component.split('"') 1cdnoqkjpgraiehlfmsb
526 for i, subcomponent in enumerate(subcomponents): 1cdnoqkjpgraiehlfmsb
527 # we are in a list subcomponent, which is already handled
528 if in_list: 1cdnoqkjpgraiehlfmsb
529 if QueryFilterBuilder.r_list_sep in subcomponent: 1cdnojpgaelb
530 # filter out the remainder of the list subcomponent and continue parsing
531 base_components.append(list_value_components.pop(0)) 1c
532 subcomponent = subcomponent.split(QueryFilterBuilder.r_list_sep, maxsplit=1)[-1].strip() 1c
533 in_list = False 1c
534 else:
535 continue 1cdnojpgaelb
537 # don't parse components comprised of only a separator
538 if subcomponent in QueryFilterBuilder.group_seps: 1cdnoqkjpgraiehlfmsb
539 quote_offset += 1 1cdnoqkjpgaiehlfmsb
540 base_components.append(subcomponent) 1cdnoqkjpgaiehlfmsb
541 continue 1cdnoqkjpgaiehlfmsb
543 # this subcomponent was surrounded in quotes, so we keep it as-is
544 if (i + quote_offset) % 2: 1cdnoqkjpgraiehlfmsb
545 base_components.append(f'"{subcomponent.strip()}"') 1calfb
546 continue 1calfb
548 # if the final subcomponent has quotes, it creates an extra empty subcomponent at the end
549 if not subcomponent: 1cdnoqkjpgraiehlfmsb
550 continue 1ab
552 # continue parsing this subcomponent up to the list, then skip over subsequent subcomponents
553 if not in_list and QueryFilterBuilder.l_list_sep in subcomponent: 1cdnoqkjpgraiehlfmsb
554 subcomponent, _new_sub_component = subcomponent.split(QueryFilterBuilder.l_list_sep, maxsplit=1) 1cdnojpgaelb
555 subcomponent = subcomponent.strip() 1cdnojpgaelb
556 subcomponents.insert(i + 1, _new_sub_component) 1cdnojpgaelb
557 quote_offset += 1 1cdnojpgaelb
558 in_list = True 1cdnojpgaelb
560 # parse out logical operators
561 new_components = [ 1cdnoqkjpgraiehlfmsb
562 base_component.strip() for base_component in logical_operators.split(subcomponent) if base_component
563 ]
565 # parse out relational keywords and operators
566 # each base_subcomponent has exactly zero or one keyword or operator
567 for component in new_components: 1cdnoqkjpgraiehlfmsb
568 if not component: 568 ↛ 569line 568 didn't jump to line 569 because the condition on line 568 was never true1cdnoqkjpgraiehlfmsb
569 continue
571 # we try relational operators first since they aren't required to be surrounded by spaces
572 parsed_component = RelationalOperator.parse_component(component) 1cdnoqkjpgraiehlfmsb
573 if parsed_component is not None: 1cdnoqkjpgraiehlfmsb
574 base_components.extend(parsed_component) 1cdnoqkjpgraiehlfmsb
575 continue 1cdnoqkjpgraiehlfmsb
577 parsed_component = RelationalKeyword.parse_component(component) 1cdnoqkjpgraiehlfmsb
578 if parsed_component is not None: 578 ↛ 579line 578 didn't jump to line 579 because the condition on line 578 was never true1cdnoqkjpgraiehlfmsb
579 base_components.extend(parsed_component)
580 continue
582 # this component does not have any keywords or operators, so we just add it as-is
583 base_components.append(component) 1cdnoqkjpgraiehlfmsb
585 return base_components 1cdnoqkjpgraiehlfmsb
587 @staticmethod 1t
588 def _parse_base_components_into_filter_components( 1t
589 base_components: list[str | list[str]],
590 ) -> list[str | QueryFilterBuilderComponent | LogicalOperator]:
591 """Walk through base components and construct filter collections"""
592 relational_keywords = [kw.value for kw in RelationalKeyword] 1cdnoqkjpgraiehlfmsb
593 relational_operators = [op.value for op in RelationalOperator] 1cdnoqkjpgraiehlfmsb
594 logical_operators = [op.value for op in LogicalOperator] 1cdnoqkjpgraiehlfmsb
596 # parse QueryFilterComponents and logical operators
597 components: list[str | QueryFilterBuilderComponent | LogicalOperator] = [] 1cdnoqkjpgraiehlfmsb
598 for i, base_component in enumerate(base_components): 1cdnoqkjpgraiehlfmsb
599 if isinstance(base_component, list): 1cdnoqkjpgraiehlfmsb
600 continue 1c
602 if base_component in QueryFilterBuilder.group_seps: 1cdnoqkjpgraiehlfmsb
603 components.append(base_component) 1cdnoqkjpgaiehlfmsb
605 elif base_component in relational_keywords or base_component in relational_operators: 1cdnoqkjpgraiehlfmsb
606 relationship: RelationalKeyword | RelationalOperator
607 if base_component in relational_keywords: 607 ↛ 608line 607 didn't jump to line 608 because the condition on line 607 was never true1cdnoqkjpgraiehlfmsb
608 relationship = RelationalKeyword(base_components[i])
609 else:
610 relationship = RelationalOperator(base_components[i]) 1cdnoqkjpgraiehlfmsb
612 components.append( 1cdnoqkjpgraiehlfmsb
613 QueryFilterBuilderComponent(
614 attribute_name=base_components[i - 1], # type: ignore
615 relationship=relationship,
616 value=base_components[i + 1],
617 )
618 )
620 elif base_component.upper() in logical_operators: 1cdnoqkjpgraiehlfmsb
621 components.append(LogicalOperator(base_component.upper())) 1cdnoqkjpgaiehlfmsb
623 return components 1cdnoqkjpgraiehlfmsb
625 def as_json_model(self) -> QueryFilterJSON: 1t
626 parts: list[QueryFilterJSONPart] = [] 1cdnoqkjpgraiehlfmsb
628 current_part: QueryFilterJSONPart | None = None 1cdnoqkjpgraiehlfmsb
629 left_parens: list[str] = [] 1cdnoqkjpgraiehlfmsb
630 right_parens: list[str] = [] 1cdnoqkjpgraiehlfmsb
631 last_logical_operator: LogicalOperator | None = None 1cdnoqkjpgraiehlfmsb
633 def add_part(): 1cdnoqkjpgraiehlfmsb
634 nonlocal current_part, left_parens, right_parens, last_logical_operator
635 if not current_part: 635 ↛ 638line 635 didn't jump to line 638 because the condition on line 635 was always true1cdnoqkjpgraiehlfmsb
636 return 1cdnoqkjpgraiehlfmsb
638 current_part.left_parenthesis = "".join(left_parens) or None
639 current_part.right_parenthesis = "".join(right_parens) or None
640 current_part.logical_operator = last_logical_operator
642 parts.append(current_part)
643 current_part = None
644 left_parens.clear()
645 right_parens.clear()
646 last_logical_operator = None
648 for component in self.filter_components: 648 ↛ 649line 648 didn't jump to line 649 because the loop on line 648 never started1cdnoqkjpgraiehlfmsb
649 if isinstance(component, QueryFilterBuilderComponent):
650 if current_part:
651 add_part()
652 current_part = component.as_json_model()
654 elif isinstance(component, LogicalOperator):
655 if current_part:
656 add_part()
657 last_logical_operator = component
659 elif isinstance(component, str):
660 if component == QueryFilterBuilder.l_group_sep:
661 left_parens.append(component)
662 elif component == QueryFilterBuilder.r_group_sep:
663 right_parens.append(component)
665 # add last part, if any
666 add_part() 1cdnoqkjpgraiehlfmsb
667 return QueryFilterJSON(parts=parts) 1cdnoqkjpgraiehlfmsb