Coverage for opt/mealie/lib/python3.12/site-packages/mealie/schema/response/query_filter.py: 73%

404 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1from __future__ import annotations 1P

2 

3import re 1P

4from collections import deque 1P

5from enum import Enum 1P

6from typing import Any, cast 1P

7from uuid import UUID 1P

8 

9import sqlalchemy as sa 1P

10from dateutil import parser as date_parser 1P

11from dateutil.parser import ParserError 1P

12from humps import decamelize 1P

13from sqlalchemy.ext.associationproxy import AssociationProxyInstance 1P

14from sqlalchemy.orm import InstrumentedAttribute, Mapper 1P

15from sqlalchemy.sql import sqltypes 1P

16 

17from mealie.db.models._model_base import SqlAlchemyBase 1P

18from mealie.db.models._model_utils.datetime import NaiveDateTime 1P

19from mealie.db.models._model_utils.guid import GUID 1P

20from mealie.schema._mealie.mealie_model import MealieModel 1P

21 

22 

23class RelationalKeyword(Enum): 1P

24 IS = "IS" 1P

25 IS_NOT = "IS NOT" 1P

26 IN = "IN" 1P

27 NOT_IN = "NOT IN" 1P

28 CONTAINS_ALL = "CONTAINS ALL" 1P

29 LIKE = "LIKE" 1P

30 NOT_LIKE = "NOT LIKE" 1P

31 

32 @classmethod 1P

33 def parse_component(cls, component: str) -> list[str] | None: 1P

34 """ 

35 Try to parse a component using a relational keyword 

36 

37 If no matching keyword is found, returns None 

38 """ 

39 

40 # extract the attribute name from the component 

41 parsed_component = component.split(maxsplit=1) 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

42 if len(parsed_component) < 2: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

43 return None 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

44 

45 # assume the component has already filtered out the value and try to match a keyword 

46 # if we try to filter out the value without checking first, keywords with spaces won't parse correctly 

47 possible_keyword = parsed_component[1].strip().lower() 1LqGJIKMdcfFrgshkwCtuAijHxlnzvebymoEpa

48 for rel_kw in sorted([keyword.value for keyword in cls], key=len, reverse=True): 1LqGJIKMdcfFrgshkwCtuAijHxlnzvebymoEpa

49 if rel_kw.lower() != possible_keyword: 49 ↛ 52line 49 didn't jump to line 52 because the condition on line 49 was always true1LqGJIKMdcfFrgshkwCtuAijHxlnzvebymoEpa

50 continue 1LqGJIKMdcfFrgshkwCtuAijHxlnzvebymoEpa

51 

52 parsed_component[1] = rel_kw 

53 return parsed_component 

54 

55 # there was no match, so the component may still have the value in it 

56 try: 1LqGJIKMdcfFrgshkwCtuAijHxlnzvebymoEpa

57 _possible_keyword, _value = parsed_component[-1].rsplit(maxsplit=1) 1LqGJIKMdcfFrgshkwCtuAijHxlnzvebymoEpa

58 parsed_component = [parsed_component[0], _possible_keyword, _value] 1qGIdcfFrgshkwCtuAijHxlnzvebymoEpa

59 except ValueError: 1LqGJIKMdcfFrgshkwCtuijHxlnzvebymoEpa

60 # the component has no value to filter out 

61 return None 1LqGJIKMdcfFrgshkwCtuijHxlnzvebymoEpa

62 

63 possible_keyword = parsed_component[1].strip().lower() 1qGIdcfFrgshkwCtuAijHxlnzvebymoEpa

64 for rel_kw in sorted([keyword.value for keyword in cls], key=len, reverse=True): 1qGIdcfFrgshkwCtuAijHxlnzvebymoEpa

65 if rel_kw.lower() != possible_keyword: 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was always true1qGIdcfFrgshkwCtuAijHxlnzvebymoEpa

66 continue 1qGIdcfFrgshkwCtuAijHxlnzvebymoEpa

67 

68 parsed_component[1] = rel_kw 

69 return parsed_component 

70 

71 return None 1qGIdcfFrgshkwCtuAijHxlnzvebymoEpa

72 

73 

74class RelationalOperator(Enum): 1P

75 EQ = "=" 1P

76 NOTEQ = "<>" 1P

77 GT = ">" 1P

78 LT = "<" 1P

79 GTE = ">=" 1P

80 LTE = "<=" 1P

81 

82 @classmethod 1P

83 def parse_component(cls, component: str) -> list[str] | None: 1P

84 """ 

85 Try to parse a component using a relational operator 

86 

87 If no matching operator is found, returns None 

88 """ 

89 

90 for rel_op in sorted([operator.value for operator in cls], key=len, reverse=True): 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

91 if rel_op not in component: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

92 continue 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

93 

94 parsed_component = [base_component.strip() for base_component in component.split(rel_op) if base_component] 1LqGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

95 parsed_component.insert(1, rel_op) 1LqGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

96 return parsed_component 1LqGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

97 

98 return None 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

99 

100 

101class LogicalOperator(Enum): 1P

102 AND = "AND" 1P

103 OR = "OR" 1P

104 

105 

106class QueryFilterJSONPart(MealieModel): 1P

107 left_parenthesis: str | None = None 1P

108 right_parenthesis: str | None = None 1P

109 logical_operator: LogicalOperator | None = None 1P

110 

111 attribute_name: str | None = None 1P

112 relational_operator: RelationalKeyword | RelationalOperator | None = None 1P

113 value: str | list[str] | None = None 1P

114 

115 

116class QueryFilterJSON(MealieModel): 1P

117 parts: list[QueryFilterJSONPart] = [] 1P

118 

119 

120class QueryFilterBuilderComponent: 1P

121 """A single relational statement""" 

122 

123 @staticmethod 1P

124 def strip_quotes_from_string(val: str) -> str: 1P

125 if len(val) > 2 and val[0] == '"' and val[-1] == '"': 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

126 return val[1:-1] 

127 else: 

128 return val 1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

129 

130 def __init__( 1P

131 self, attribute_name: str, relationship: RelationalKeyword | RelationalOperator, value: str | list[str] 

132 ) -> None: 

133 self.attribute_name = decamelize(attribute_name) 1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

134 self.relationship = relationship 1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

135 

136 # remove encasing quotes 

137 if isinstance(value, str): 137 ↛ 140line 137 didn't jump to line 140 because the condition on line 137 was always true1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

138 value = self.strip_quotes_from_string(value) 1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

139 

140 elif isinstance(value, list): 

141 value = [self.strip_quotes_from_string(v) for v in value] 

142 

143 # validate relationship/value pairs 

144 if relationship in [ 144 ↛ 149line 144 didn't jump to line 149 because the condition on line 144 was never true1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

145 RelationalKeyword.IN, 

146 RelationalKeyword.NOT_IN, 

147 RelationalKeyword.CONTAINS_ALL, 

148 ] and not isinstance(value, list): 

149 raise ValueError( 

150 f"invalid query string: {relationship.value} must be given a list of values" 

151 f"enclosed by {QueryFilterBuilder.l_list_sep} and {QueryFilterBuilder.r_list_sep}" 

152 ) 

153 

154 if relationship is RelationalKeyword.IS or relationship is RelationalKeyword.IS_NOT: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

155 if not isinstance(value, str) or value.lower() not in ["null", "none"]: 

156 raise ValueError( 

157 f'invalid query string: "{relationship.value}" can only be used with "NULL", not "{value}"' 

158 ) 

159 

160 self.value = None 

161 else: 

162 self.value = value 1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

163 

164 def __repr__(self) -> str: 1P

165 return f"[{self.attribute_name} {self.relationship.value} {self.value}]" 

166 

167 def validate(self, model_attr_type: Any) -> Any: 1P

168 """Validate value against an model attribute's type and return a validated value, or raise a ValueError""" 

169 

170 sanitized_values: list[Any] 

171 if not isinstance(self.value, list): 171 ↛ 174line 171 didn't jump to line 174 because the condition on line 171 was always true1qdcfrgshkwtuAijxlnzvebymoBDpa

172 sanitized_values = [self.value] 1qdcfrgshkwtuAijxlnzvebymoBDpa

173 else: 

174 sanitized_values = list(self.value) 

175 

176 for i, v in enumerate(sanitized_values): 1qdcfrgshkwtuAijxlnzvebymoBDpa

177 # always allow querying for null values 

178 if v is None: 178 ↛ 179line 178 didn't jump to line 179 because the condition on line 178 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

179 continue 

180 

181 if isinstance(model_attr_type, sqltypes.String): 181 ↛ 182line 181 didn't jump to line 182 because the condition on line 181 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

182 sanitized_values[i] = v.lower() 

183 

184 if self.relationship is RelationalKeyword.LIKE or self.relationship is RelationalKeyword.NOT_LIKE: 184 ↛ 185line 184 didn't jump to line 185 because the condition on line 184 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

185 if not isinstance(model_attr_type, sqltypes.String): 

186 raise ValueError( 

187 f'invalid query string: "{self.relationship.value}" can only be used with string columns' 

188 ) 

189 

190 if isinstance(model_attr_type, (GUID)): 1qdcfrgshkwtuAijxlnzvebymoBDpa

191 try: 1dcfrgshkwtuAijxlnzvebymoDpa

192 # we don't set value since a UUID is functionally identical to a string here 

193 UUID(v) 1dcfrgshkwtuAijxlnzvebymoDpa

194 except ValueError as e: 

195 raise ValueError(f"invalid query string: invalid UUID '{v}'") from e 

196 

197 if isinstance(model_attr_type, sqltypes.Date | sqltypes.DateTime | NaiveDateTime): 1qdcfrgshkwtuAijxlnzvebymoBDpa

198 try: 1qdcfghijebBa

199 dt = date_parser.parse(v) 1qdcfghijebBa

200 sanitized_values[i] = dt.date() if isinstance(model_attr_type, sqltypes.Date) else dt 1qdcfghijebBa

201 except ParserError as e: 

202 raise ValueError(f"invalid query string: unknown date or datetime format '{v}'") from e 

203 

204 if isinstance(model_attr_type, sqltypes.Boolean): 204 ↛ 205line 204 didn't jump to line 205 because the condition on line 204 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

205 try: 

206 sanitized_values[i] = v.lower()[0] in ["t", "y"] or v == "1" 

207 except IndexError as e: 

208 raise ValueError("invalid query string") from e 

209 

210 return sanitized_values if isinstance(self.value, list) else sanitized_values[0] 1qdcfrgshkwtuAijxlnzvebymoBDpa

211 

212 def as_json_model(self) -> QueryFilterJSONPart: 1P

213 return QueryFilterJSONPart( 

214 left_parenthesis=None, 

215 right_parenthesis=None, 

216 logical_operator=None, 

217 attribute_name=self.attribute_name, 

218 relational_operator=self.relationship, 

219 value=self.value, 

220 ) 

221 

222 

223class QueryFilterBuilder: 1P

224 l_group_sep: str = "(" 1P

225 r_group_sep: str = ")" 1P

226 group_seps: set[str] = {l_group_sep, r_group_sep} 1P

227 

228 l_list_sep: str = "[" 1P

229 r_list_sep: str = "]" 1P

230 list_item_sep: str = "," 1P

231 

232 def __init__(self, filter_string: str) -> None: 1P

233 # parse filter string 

234 components = QueryFilterBuilder._break_filter_string_into_components(filter_string) 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

235 base_components = QueryFilterBuilder._break_components_into_base_components(components) 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

236 if base_components.count(QueryFilterBuilder.l_group_sep) != base_components.count( 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

237 QueryFilterBuilder.r_group_sep 

238 ): 

239 raise ValueError("invalid query string: parenthesis are unbalanced") 1qMdcfFrgshkwtuijxlvebyma

240 

241 # parse base components into a filter group 

242 self.filter_components = QueryFilterBuilder._parse_base_components_into_filter_components(base_components) 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

243 

244 def __repr__(self) -> str: 1P

245 joined = " ".join( 

246 [ 

247 str(component.value if isinstance(component, LogicalOperator) else component) 

248 for component in self.filter_components 

249 ], 

250 ) 

251 

252 return f"<<{joined}>>" 

253 

254 @classmethod 1P

255 def _consolidate_group( 1P

256 cls, group: list[sa.ColumnElement], logical_operators: deque[LogicalOperator] 

257 ) -> sa.ColumnElement: 

258 consolidated_group_builder: sa.ColumnElement | None = None 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

259 for i, element in enumerate(reversed(group)): 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

260 if not i: 1qdcfrgshkwtuAijxlnzvebymoBDpa

261 consolidated_group_builder = element 1qdcfrgshkwtuAijxlnzvebymoBDpa

262 else: 

263 operator = logical_operators.pop() 1qdcjea

264 if operator is LogicalOperator.AND: 264 ↛ 266line 264 didn't jump to line 266 because the condition on line 264 was always true1qdcjea

265 consolidated_group_builder = sa.and_(consolidated_group_builder, element) 1qdcjea

266 elif operator is LogicalOperator.OR: 

267 consolidated_group_builder = sa.or_(consolidated_group_builder, element) 

268 else: 

269 raise ValueError(f"invalid logical operator {operator}") 

270 

271 if i == len(group) - 1: 1qdcfrgshkwtuAijxlnzvebymoBDpa

272 return consolidated_group_builder.self_group() 1qdcfrgshkwtuAijxlnzvebymoBDpa

273 

274 @classmethod 1P

275 def get_model_and_model_attr_from_attr_string[Model: SqlAlchemyBase]( 1P

276 cls, attr_string: str, model: type[Model], *, query: sa.Select | None = None 

277 ) -> tuple[SqlAlchemyBase, InstrumentedAttribute, sa.Select | None]: 

278 """ 

279 Take an attribute string and traverse a database model and its relationships to get the desired 

280 model and model attribute. Optionally provide a query to apply the necessary table joins. 

281 

282 If the attribute string is invalid, raises a `ValueError`. 

283 

284 For instance, the attribute string "user.name" on `RecipeModel` 

285 will return the `User` model's `name` attribute. 

286 

287 Works with shallow attributes (e.g. "slug" from `RecipeModel`) 

288 and arbitrarily deep ones (e.g. "recipe.group.preferences" on `RecipeTimelineEvent`). 

289 """ 

290 mapper: Mapper 

291 model_attr: InstrumentedAttribute | None = None 1PLqGJITUVWXYZ0K1234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

292 

293 attribute_chain = decamelize(attr_string).split(".") 1PLqGJITUVWXYZ0K1234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

294 if not attribute_chain: 294 ↛ 295line 294 didn't jump to line 295 because the condition on line 294 was never true1PLqGJITUVWXYZ0K1234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

295 raise ValueError("invalid query string: attribute name cannot be empty") 

296 

297 current_model: SqlAlchemyBase = model # type: ignore 1PLqGJITUVWXYZ0K1234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

298 for i, attribute_link in enumerate(attribute_chain): 298 ↛ 331line 298 didn't jump to line 331 because the loop on line 298 didn't complete1PLqGJITUVWXYZ0K1234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

299 try: 1PLqGJITUVWXYZ0K1234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

300 model_attr = getattr(current_model, attribute_link) 1PLqGJITUVWXYZ0K1234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

301 

302 # proxied attributes can't be joined to the query directly, so we need to inspect the proxy 

303 # and get the actual model and its attribute 

304 if isinstance(model_attr, AssociationProxyInstance): 304 ↛ 305line 304 didn't jump to line 305 because the condition on line 304 was never true1PLqGJITUVWXYZ01234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

305 proxied_attribute_link = model_attr.target_collection 

306 next_attribute_link = model_attr.value_attr 

307 model_attr = getattr(current_model, proxied_attribute_link) 

308 

309 if query is not None: 

310 query = query.join(model_attr, isouter=True) 

311 

312 mapper = sa.inspect(current_model) 

313 relationship = mapper.relationships[proxied_attribute_link] 

314 current_model = relationship.mapper.class_ 

315 model_attr = getattr(current_model, next_attribute_link) 

316 

317 # at the end of the chain there are no more relationships to inspect 

318 if i == len(attribute_chain) - 1: 318 ↛ 321line 318 didn't jump to line 321 because the condition on line 318 was always true1PLqGJITUVWXYZ01234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

319 break 1PLqGJITUVWXYZ01234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

320 

321 if query is not None: 

322 query = query.join(model_attr, isouter=True) 

323 

324 mapper = sa.inspect(current_model) 

325 relationship = mapper.relationships[attribute_link] 

326 current_model = relationship.mapper.class_ 

327 

328 except (AttributeError, KeyError) as e: 1LqGJIKdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

329 raise ValueError(f"invalid attribute string: '{attr_string}' does not exist on this schema") from e 1LqGJIKdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

330 

331 if model_attr is None: 331 ↛ 332line 331 didn't jump to line 332 because the condition on line 331 was never true1PLqGJITUVWXYZ01234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

332 raise ValueError(f"invalid attribute string: '{attr_string}'") 

333 

334 return current_model, model_attr, query 1PLqGJITUVWXYZ01234567dcfFrgshkwCtuAijHxlnzveby8QNmoEOBDp9a

335 

336 @classmethod 1P

337 def _transform_model_attr(cls, model_attr: InstrumentedAttribute, model_attr_type: Any) -> InstrumentedAttribute: 1P

338 if isinstance(model_attr_type, sqltypes.String): 338 ↛ 339line 338 didn't jump to line 339 because the condition on line 338 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

339 model_attr = sa.func.lower(model_attr) 

340 

341 return model_attr 1qdcfrgshkwtuAijxlnzvebymoBDpa

342 

343 @classmethod 1P

344 def _get_filter_element[Model: SqlAlchemyBase]( 1P

345 cls, 

346 query: sa.Select, 

347 component: QueryFilterBuilderComponent, 

348 model: type[Model], 

349 model_attr: InstrumentedAttribute, 

350 model_attr_type: Any, 

351 ) -> sa.ColumnElement: 

352 original_model_attr = model_attr 1qdcfrgshkwtuAijxlnzvebymoBDpa

353 model_attr = cls._transform_model_attr(model_attr, model_attr_type) 1qdcfrgshkwtuAijxlnzvebymoBDpa

354 value = component.validate(model_attr_type) 1qdcfrgshkwtuAijxlnzvebymoBDpa

355 

356 # Keywords 

357 if component.relationship is RelationalKeyword.IS: 357 ↛ 358line 357 didn't jump to line 358 because the condition on line 357 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

358 element = model_attr.is_(value) 

359 elif component.relationship is RelationalKeyword.IS_NOT: 359 ↛ 360line 359 didn't jump to line 360 because the condition on line 359 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

360 element = model_attr.is_not(value) 

361 elif component.relationship is RelationalKeyword.IN: 361 ↛ 362line 361 didn't jump to line 362 because the condition on line 361 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

362 element = model_attr.in_(value) 

363 elif component.relationship is RelationalKeyword.NOT_IN: 363 ↛ 364line 363 didn't jump to line 364 because the condition on line 363 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

364 if original_model_attr.parent.entity != model: 

365 subq = query.with_only_columns(model.id).where(model_attr.in_(value)) 

366 element = sa.not_(model.id.in_(subq)) 

367 else: 

368 element = sa.not_(model_attr.in_(value)) 

369 

370 elif component.relationship is RelationalKeyword.CONTAINS_ALL: 370 ↛ 371line 370 didn't jump to line 371 because the condition on line 370 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

371 if len(value) == 1: 

372 element = model_attr.in_(value) 

373 else: 

374 primary_model_attr: InstrumentedAttribute = getattr(model, component.attribute_name.split(".")[0]) 

375 element = sa.and_(*(primary_model_attr.any(model_attr == v) for v in value)) 

376 elif component.relationship is RelationalKeyword.LIKE: 376 ↛ 377line 376 didn't jump to line 377 because the condition on line 376 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

377 element = model_attr.ilike(value) 

378 elif component.relationship is RelationalKeyword.NOT_LIKE: 378 ↛ 379line 378 didn't jump to line 379 because the condition on line 378 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

379 element = model_attr.not_ilike(value) 

380 

381 # Operators 

382 elif component.relationship is RelationalOperator.EQ: 1qdcfrgshkwtuAijxlnzvebymoBDpa

383 element = model_attr == value 1dcfrgshkwtuAijxlnzvebymoDpa

384 elif component.relationship is RelationalOperator.NOTEQ: 384 ↛ 385line 384 didn't jump to line 385 because the condition on line 384 was never true1qdcfghijebBa

385 element = model_attr != value 

386 elif component.relationship is RelationalOperator.GT: 386 ↛ 387line 386 didn't jump to line 387 because the condition on line 386 was never true1qdcfghijebBa

387 element = model_attr > value 

388 elif component.relationship is RelationalOperator.LT: 388 ↛ 389line 388 didn't jump to line 389 because the condition on line 388 was never true1qdcfghijebBa

389 element = model_attr < value 

390 elif component.relationship is RelationalOperator.GTE: 1qdcfghijebBa

391 element = model_attr >= value 1qdcfgjeba

392 elif component.relationship is RelationalOperator.LTE: 392 ↛ 395line 392 didn't jump to line 395 because the condition on line 392 was always true1qdchijebBa

393 element = model_attr <= value 1qdchijebBa

394 else: 

395 raise ValueError(f"invalid relationship {component.relationship}") 

396 

397 return element 1qdcfrgshkwtuAijxlnzvebymoBDpa

398 

399 def filter_query[Model: SqlAlchemyBase]( 1P

400 self, query: sa.Select, model: type[Model], column_aliases: dict[str, sa.ColumnElement] | None = None 

401 ) -> sa.Select: 

402 """ 

403 Filters a query based on the parsed filter string. 

404 If you need to filter on a custom column expression (e.g. a computed property), you can supply column aliases 

405 """ 

406 column_aliases = column_aliases or {} 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

407 

408 # join tables and build model chain 

409 attr_model_map: dict[int, Any] = {} 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

410 model_attr: InstrumentedAttribute 

411 for i, component in enumerate(self.filter_components): 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

412 if not isinstance(component, QueryFilterBuilderComponent): 1qGJIKdcfrgshkwCtuAijxlnzvebymoEBDpa

413 continue 1qdcfrgshkCtuijlnvebmopa

414 

415 nested_model, model_attr, query = self.get_model_and_model_attr_from_attr_string( 1qGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

416 component.attribute_name, model, query=query 

417 ) 

418 attr_model_map[i] = nested_model 1qdcfrgshkwtuAijxlnzvebymoBDpa

419 

420 # build query filter 

421 partial_group: list[sa.ColumnElement] = [] 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

422 partial_group_stack: deque[list[sa.ColumnElement]] = deque() 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

423 logical_operator_stack: deque[LogicalOperator] = deque() 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

424 for i, component in enumerate(self.filter_components): 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

425 if component == self.l_group_sep: 1qdcfrgshkwCtuAijxlnzvebymoBDpa

426 partial_group_stack.append(partial_group) 1qdcfrgshkCtuilnvebmopa

427 partial_group = [] 1qdcfrgshkCtuilnvebmopa

428 

429 elif component == self.r_group_sep: 1qdcfrgshkwCtuAijxlnzvebymoBDpa

430 if partial_group: 1qdcfrgshkCtuilnvebmopa

431 complete_group = self._consolidate_group(partial_group, logical_operator_stack) 1qcfgba

432 partial_group = partial_group_stack.pop() 1qcfgba

433 partial_group.append(complete_group) 1qcfgba

434 else: 

435 partial_group = partial_group_stack.pop() 1qdcfrgshkCtuilnvebmopa

436 

437 elif isinstance(component, LogicalOperator): 1qdcfrgshkwCtuAijxlnzvebymoBDpa

438 logical_operator_stack.append(component) 1qdcfrgshkCtuijlnvebmopa

439 

440 else: 

441 component = cast(QueryFilterBuilderComponent, component) 1qdcfrgshkwtuAijxlnzvebymoBDpa

442 base_attribute_name = component.attribute_name.split(".")[-1] 1qdcfrgshkwtuAijxlnzvebymoBDpa

443 model_attr = getattr(attr_model_map[i], base_attribute_name) 1qdcfrgshkwtuAijxlnzvebymoBDpa

444 

445 if (column_alias := column_aliases.get(base_attribute_name)) is not None: 445 ↛ 446line 445 didn't jump to line 446 because the condition on line 445 was never true1qdcfrgshkwtuAijxlnzvebymoBDpa

446 model_attr = column_alias 

447 

448 element = self._get_filter_element(query, component, model, model_attr, model_attr.type) 1qdcfrgshkwtuAijxlnzvebymoBDpa

449 partial_group.append(element) 1qdcfrgshkwtuAijxlnzvebymoBDpa

450 

451 # combine the completed groups into one filter 

452 while True: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

453 consolidated_group = self._consolidate_group(partial_group, logical_operator_stack) 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

454 if not partial_group_stack: 454 ↛ 457line 454 didn't jump to line 457 because the condition on line 454 was always true1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

455 return query.filter(consolidated_group) 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

456 else: 

457 partial_group = partial_group_stack.pop() 

458 partial_group.append(consolidated_group) 

459 

460 @staticmethod 1P

461 def _break_filter_string_into_components(filter_string: str) -> list[str]: 1P

462 """Recursively break filter string into components based on parenthesis groupings""" 

463 components = [filter_string] 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

464 in_quotes = False 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

465 while True: 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

466 subcomponents = [] 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

467 for component in components: 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

468 # don't parse components comprised of only a separator 

469 if component in QueryFilterBuilder.group_seps: 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

470 subcomponents.append(component) 1qMdcfFrgshkwCtuijxlnvebymopa

471 continue 1qMdcfFrgshkwCtuijxlnvebymopa

472 

473 # construct a component until it hits the right separator 

474 new_component = "" 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

475 for c in component: 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

476 # ignore characters in-between quotes 

477 if c == '"': 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

478 in_quotes = not in_quotes 1qGIdcfFrgshkwCtuAijHxlnzvebmoEpa

479 

480 if c in QueryFilterBuilder.group_seps and not in_quotes: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

481 if new_component: 1qMdcfFrgshkwCtuijxlnvebymopa

482 subcomponents.append(new_component) 1qMdcfFrgshkwCtuijxlnvebymopa

483 

484 subcomponents.append(c) 1qMdcfFrgshkwCtuijxlnvebymopa

485 new_component = "" 1qMdcfFrgshkwCtuijxlnvebymopa

486 continue 1qMdcfFrgshkwCtuijxlnvebymopa

487 

488 new_component += c 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

489 

490 if new_component: 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

491 subcomponents.append(new_component.strip()) 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

492 

493 if components == subcomponents: 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

494 break 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

495 

496 components = subcomponents 1qGJIKSMRdcfFrgshkwCtuAijHxlnzvebyNmoEBpa

497 

498 return components 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

499 

500 @staticmethod 1P

501 def _break_components_into_base_components(components: list[str]) -> list[str | list[str]]: 1P

502 """Further break down components by splitting at relational and logical operators""" 

503 pattern = "|".join([f"\\b{operator.value}\\b" for operator in LogicalOperator]) 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

504 logical_operators = re.compile(f"({pattern})", flags=re.IGNORECASE) 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

505 

506 in_list = False 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

507 base_components: list[str | list] = [] 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

508 list_value_components = [] 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

509 for component in components: 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

510 # parse out lists as their own singular sub component 

511 subcomponents = component.split(QueryFilterBuilder.l_list_sep) 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

512 for i, subcomponent in enumerate(subcomponents): 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

513 if not i: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

514 continue 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

515 

516 for j, list_value_string in enumerate(subcomponent.split(QueryFilterBuilder.r_list_sep)): 1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

517 if j % 2: 517 ↛ 518line 517 didn't jump to line 518 because the condition on line 517 was never true1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

518 continue 

519 

520 list_value_components.append( 1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

521 [val.strip() for val in list_value_string.split(QueryFilterBuilder.list_item_sep)] 

522 ) 

523 

524 quote_offset = 0 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

525 subcomponents = component.split('"') 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

526 for i, subcomponent in enumerate(subcomponents): 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

527 # we are in a list subcomponent, which is already handled 

528 if in_list: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

529 if QueryFilterBuilder.r_list_sep in subcomponent: 1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

530 # filter out the remainder of the list subcomponent and continue parsing 

531 base_components.append(list_value_components.pop(0)) 1kClnbmopa

532 subcomponent = subcomponent.split(QueryFilterBuilder.r_list_sep, maxsplit=1)[-1].strip() 1kClnbmopa

533 in_list = False 1kClnbmopa

534 else: 

535 continue 1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

536 

537 # don't parse components comprised of only a separator 

538 if subcomponent in QueryFilterBuilder.group_seps: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

539 quote_offset += 1 1qMdcfFrgshkwCtuijxlnvebymopa

540 base_components.append(subcomponent) 1qMdcfFrgshkwCtuijxlnvebymopa

541 continue 1qMdcfFrgshkwCtuijxlnvebymopa

542 

543 # this subcomponent was surrounded in quotes, so we keep it as-is 

544 if (i + quote_offset) % 2: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

545 base_components.append(f'"{subcomponent.strip()}"') 1qGIdcfFrgshkwCtuAijHxlnzvebmoEpa

546 continue 1qGIdcfFrgshkwCtuAijHxlnzvebmoEpa

547 

548 # if the final subcomponent has quotes, it creates an extra empty subcomponent at the end 

549 if not subcomponent: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

550 continue 1dcfFrgshkwCtuijxlnzvebmoEpa

551 

552 # continue parsing this subcomponent up to the list, then skip over subsequent subcomponents 

553 if not in_list and QueryFilterBuilder.l_list_sep in subcomponent: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

554 subcomponent, _new_sub_component = subcomponent.split(QueryFilterBuilder.l_list_sep, maxsplit=1) 1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

555 subcomponent = subcomponent.strip() 1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

556 subcomponents.insert(i + 1, _new_sub_component) 1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

557 quote_offset += 1 1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

558 in_list = True 1qGMdcfFrgshkwCtuijHxlnzvebymoEpa

559 

560 # parse out logical operators 

561 new_components = [ 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

562 base_component.strip() for base_component in logical_operators.split(subcomponent) if base_component 

563 ] 

564 

565 # parse out relational keywords and operators 

566 # each base_subcomponent has exactly zero or one keyword or operator 

567 for component in new_components: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

568 if not component: 568 ↛ 569line 568 didn't jump to line 569 because the condition on line 568 was never true1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

569 continue 

570 

571 # we try relational operators first since they aren't required to be surrounded by spaces 

572 parsed_component = RelationalOperator.parse_component(component) 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

573 if parsed_component is not None: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

574 base_components.extend(parsed_component) 1LqGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

575 continue 1LqGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

576 

577 parsed_component = RelationalKeyword.parse_component(component) 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

578 if parsed_component is not None: 578 ↛ 579line 578 didn't jump to line 579 because the condition on line 578 was never true1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

579 base_components.extend(parsed_component) 

580 continue 

581 

582 # this component does not have any keywords or operators, so we just add it as-is 

583 base_components.append(component) 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

584 

585 return base_components 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

586 

587 @staticmethod 1P

588 def _parse_base_components_into_filter_components( 1P

589 base_components: list[str | list[str]], 

590 ) -> list[str | QueryFilterBuilderComponent | LogicalOperator]: 

591 """Walk through base components and construct filter collections""" 

592 relational_keywords = [kw.value for kw in RelationalKeyword] 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

593 relational_operators = [op.value for op in RelationalOperator] 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

594 logical_operators = [op.value for op in LogicalOperator] 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

595 

596 # parse QueryFilterComponents and logical operators 

597 components: list[str | QueryFilterBuilderComponent | LogicalOperator] = [] 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

598 for i, base_component in enumerate(base_components): 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

599 if isinstance(base_component, list): 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

600 continue 1kClnbmopa

601 

602 if base_component in QueryFilterBuilder.group_seps: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

603 components.append(base_component) 1qdcfrgshkCtuilnvebmopa

604 

605 elif base_component in relational_keywords or base_component in relational_operators: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

606 relationship: RelationalKeyword | RelationalOperator 

607 if base_component in relational_keywords: 607 ↛ 608line 607 didn't jump to line 608 because the condition on line 607 was never true1LqGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

608 relationship = RelationalKeyword(base_components[i]) 

609 else: 

610 relationship = RelationalOperator(base_components[i]) 1LqGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

611 

612 components.append( 1LqGJIKdcfrgshkwtuAijxlnzvebymoEBDpa

613 QueryFilterBuilderComponent( 

614 attribute_name=base_components[i - 1], # type: ignore 

615 relationship=relationship, 

616 value=base_components[i + 1], 

617 ) 

618 ) 

619 

620 elif base_component.upper() in logical_operators: 1LqGJIKMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

621 components.append(LogicalOperator(base_component.upper())) 1qdcfrgshkCtuijlnvebmopa

622 

623 return components 1LqGJIKSMRdcfFrgshkwCtuAijHxlnzvebyQNmoEOBDpa

624 

625 def as_json_model(self) -> QueryFilterJSON: 1P

626 parts: list[QueryFilterJSONPart] = [] 1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

627 

628 current_part: QueryFilterJSONPart | None = None 1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

629 left_parens: list[str] = [] 1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

630 right_parens: list[str] = [] 1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

631 last_logical_operator: LogicalOperator | None = None 1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

632 

633 def add_part(): 1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

634 nonlocal current_part, left_parens, right_parens, last_logical_operator 

635 if not current_part: 635 ↛ 638line 635 didn't jump to line 638 because the condition on line 635 was always true1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

636 return 1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

637 

638 current_part.left_parenthesis = "".join(left_parens) or None 

639 current_part.right_parenthesis = "".join(right_parens) or None 

640 current_part.logical_operator = last_logical_operator 

641 

642 parts.append(current_part) 

643 current_part = None 

644 left_parens.clear() 

645 right_parens.clear() 

646 last_logical_operator = None 

647 

648 for component in self.filter_components: 648 ↛ 649line 648 didn't jump to line 649 because the loop on line 648 never started1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

649 if isinstance(component, QueryFilterBuilderComponent): 

650 if current_part: 

651 add_part() 

652 current_part = component.as_json_model() 

653 

654 elif isinstance(component, LogicalOperator): 

655 if current_part: 

656 add_part() 

657 last_logical_operator = component 

658 

659 elif isinstance(component, str): 

660 if component == QueryFilterBuilder.l_group_sep: 

661 left_parens.append(component) 

662 elif component == QueryFilterBuilder.r_group_sep: 

663 right_parens.append(component) 

664 

665 # add last part, if any 

666 add_part() 1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa

667 return QueryFilterJSON(parts=parts) 1SdcfFrgshkwCtuAijHxlnzvebyNmoEOBpa