Coverage for opt/mealie/lib/python3.12/site-packages/mealie/repos/repository_generic.py: 83%

275 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:32 +0000

1from __future__ import annotations 1b

2 

3import random 1b

4from collections.abc import Iterable 1b

5from datetime import UTC, datetime 1b

6from math import ceil 1b

7from typing import Any 1b

8 

9from fastapi import HTTPException 1b

10from pydantic import UUID4, BaseModel 1b

11from sqlalchemy import ColumnElement, Select, case, delete, func, nulls_first, nulls_last, select 1b

12from sqlalchemy.orm import InstrumentedAttribute 1b

13from sqlalchemy.orm.session import Session 1b

14from sqlalchemy.sql import sqltypes 1b

15 

16from mealie.core.root_logger import get_logger 1b

17from mealie.db.models._model_base import SqlAlchemyBase 1b

18from mealie.schema._mealie import MealieModel 1b

19from mealie.schema.response.pagination import ( 1b

20 OrderByNullPosition, 

21 OrderDirection, 

22 PaginationBase, 

23 PaginationQuery, 

24 RequestQuery, 

25) 

26from mealie.schema.response.query_filter import QueryFilterBuilder 1b

27from mealie.schema.response.query_search import SearchFilter 1b

28 

29from ._utils import NOT_SET, NotSet 1b

30 

31 

32class RepositoryGeneric[Schema: MealieModel, Model: SqlAlchemyBase]: 1b

33 """A Generic BaseAccess Model method to perform common operations on the database 

34 

35 Args: 

36 Schema: Represents the Pydantic Model 

37 Model: Represents the SqlAlchemyModel Model 

38 """ 

39 

40 session: Session 1b

41 

42 _group_id: UUID4 | None = None 1b

43 _household_id: UUID4 | None = None 1b

44 

45 def __init__( 1b

46 self, 

47 session: Session, 

48 primary_key: str, 

49 sql_model: type[Model], 

50 schema: type[Schema], 

51 ) -> None: 

52 self.session = session 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

53 self.primary_key = primary_key 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

54 self.model = sql_model 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

55 self.schema = schema 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

56 

57 self.logger = get_logger() 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

58 

59 @property 1b

60 def group_id(self) -> UUID4 | None: 1b

61 return self._group_id 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

62 

63 @property 1b

64 def household_id(self) -> UUID4 | None: 1b

65 return self._household_id 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

66 

67 @property 1b

68 def column_aliases(self) -> dict[str, ColumnElement]: 1b

69 return {} 1bcmejkoghdnlpfia

70 

71 def _random_seed(self) -> str: 1b

72 return str(datetime.now(tz=UTC)) 1ra

73 

74 def _log_exception(self, e: Exception) -> None: 1b

75 self.logger.error(f"Error processing query for Repo model={self.model.__name__} schema={self.schema.__name__}") 1cmejka

76 self.logger.error(e) 1cmejka

77 

78 def _query(self, override_schema: type[MealieModel] | None = None, with_options=True): 1b

79 q = select(self.model) 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

80 if with_options: 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

81 schema = override_schema or self.schema 1tuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

82 return q.options(*schema.loader_options()) 1tuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

83 else: 

84 return q 1bcmqejkoghdnlpfia

85 

86 def _filter_builder(self, **kwargs) -> dict[str, Any]: 1b

87 dct = {} 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

88 if self.group_id: 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

89 dct["group_id"] = self.group_id 1tucmqejkwroghdnlpfCzvDEFiGxsBa

90 if self.household_id: 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

91 dct["household_id"] = self.household_id 1tucmwoghdnlfCzvDEFisa

92 

93 return {**dct, **kwargs} 1btuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

94 

95 def get_all( 1b

96 self, 

97 limit: int | None = None, 

98 order_by: str | None = None, 

99 order_descending: bool = True, 

100 override=None, 

101 ) -> list[Schema]: 

102 pq = PaginationQuery( 1b

103 per_page=limit or -1, 

104 order_by=order_by, 

105 order_direction=OrderDirection.desc if order_descending else OrderDirection.asc, 

106 page=1, 

107 ) 

108 

109 results = self.page_all(pq, override=override) 1b

110 

111 return results.items 1b

112 

113 def multi_query( 1b

114 self, 

115 query_by: dict[str, str | bool | int | UUID4], 

116 start=0, 

117 limit: int | None = None, 

118 override_schema=None, 

119 order_by: str | None = None, 

120 ) -> list[Schema]: 

121 # sourcery skip: remove-unnecessary-cast 

122 eff_schema = override_schema or self.schema 1tcghdlfvisa

123 

124 fltr = self._filter_builder(**query_by) 1tcghdlfvisa

125 q = self._query(override_schema=eff_schema).filter_by(**fltr) 1tcghdlfvisa

126 

127 if order_by: 127 ↛ 128line 127 didn't jump to line 128 because the condition on line 127 was never true1tcghdlfvisa

128 if order_attr := getattr(self.model, str(order_by)): 

129 order_attr = order_attr.desc() 

130 q = q.order_by(order_attr) 

131 

132 q = q.offset(start).limit(limit) 1tcghdlfvisa

133 result = self.session.execute(q).unique().scalars().all() 1tcghdlfvisa

134 return [eff_schema.model_validate(x) for x in result] 1tcghdlfvisa

135 

136 def _query_one(self, match_value: str | int | UUID4, match_key: str | None = None) -> Model: 1b

137 """ 

138 Query the sql database for one item an return the sql alchemy model 

139 object. If no match key is provided the primary_key attribute will be used. 

140 """ 

141 if match_key is None: 1ucefzixBa

142 match_key = self.primary_key 1ucefixBa

143 

144 fltr = self._filter_builder(**{match_key: match_value}) 1ucefzixBa

145 return self.session.execute(self._query().filter_by(**fltr)).unique().scalars().one() 1ucefzixBa

146 

147 def get_one( 1b

148 self, 

149 value: str | int | UUID4, 

150 key: str | None = None, 

151 any_case=False, 

152 override_schema=None, 

153 ) -> Schema | None: 

154 key = key or self.primary_key 1tuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

155 eff_schema = override_schema or self.schema 1tuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

156 

157 q = self._query(override_schema=eff_schema) 1tuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

158 

159 if any_case: 1tuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

160 search_attr = getattr(self.model, key) 1uceHfiAa

161 q = q.where(func.lower(search_attr) == str(value).lower()).filter_by(**self._filter_builder()) 1uceHfiAa

162 else: 

163 q = q.filter_by(**self._filter_builder(**{key: value})) 1tuyLcmqejkIJKwroghdnlpfCzvDEFiGxsABa

164 

165 result = self.session.execute(q).unique().scalars().one_or_none() 1tuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

166 

167 if not result: 1tuyLcmqejkIJHKwroghdnlpfCzvDEFiGxsABa

168 return None 1tuLcejHwdnlfCDEFxsAa

169 

170 return eff_schema.model_validate(result) 1uycmqejkIJKwroghdnlpfCzvDEFiGxsABa

171 

172 def create(self, data: Schema | BaseModel | dict) -> Schema: 1b

173 try: 1btwghdlfvisAa

174 data = data if isinstance(data, dict) else data.model_dump() 1btwghdlfvisAa

175 new_document = self.model(session=self.session, **data) 1btwghdlfvisAa

176 self.session.add(new_document) 1btwghdlfvisAa

177 self.session.commit() 1btwghdlfvisAa

178 except Exception: 1twghdlfsa

179 self.session.rollback() 1twghdlfsa

180 raise 1twghdlfsa

181 

182 self.session.refresh(new_document) 1btwghdlfvisAa

183 

184 return self.schema.model_validate(new_document) 1btwghdlfvisAa

185 

186 def create_many(self, data: Iterable[Schema | dict]) -> list[Schema]: 1b

187 new_documents = [] 1oghdna

188 for document in data: 1oghdna

189 document = document if isinstance(document, dict) else document.model_dump() 1oghdna

190 new_document = self.model(session=self.session, **document) 1oghdna

191 new_documents.append(new_document) 1oghdna

192 

193 self.session.add_all(new_documents) 1oghdna

194 self.session.commit() 1oghdna

195 

196 for created_document in new_documents: 1oghdna

197 self.session.refresh(created_document) 1oghdna

198 

199 return [self.schema.model_validate(x) for x in new_documents] 1oghdna

200 

201 def update(self, match_value: str | int | UUID4, new_data: dict | BaseModel) -> Schema: 1b

202 """Update a database entry. 

203 Args: 

204 session (Session): Database Session 

205 match_value (str): Match "key" 

206 new_data (str): Match "value" 

207 

208 Returns: 

209 dict: Returns a dictionary representation of the database entry 

210 """ 

211 new_data = new_data if isinstance(new_data, dict) else new_data.model_dump() 1ucefixBa

212 

213 entry = self._query_one(match_value=match_value) 1ucefixBa

214 entry.update(session=self.session, **new_data) 1ucefixBa

215 

216 self.session.commit() 1ucefixBa

217 return self.schema.model_validate(entry) 1ucefixBa

218 

219 def update_many(self, data: Iterable[Schema | dict]) -> list[Schema]: 1b

220 document_data_by_id: dict[str, dict] = {} 

221 for document in data: 

222 document_data = document if isinstance(document, dict) else document.model_dump() 

223 document_data_by_id[document_data["id"]] = document_data 

224 

225 documents_to_update_query = self._query().filter(self.model.id.in_(list(document_data_by_id.keys()))) 

226 documents_to_update = self.session.execute(documents_to_update_query).unique().scalars().all() 

227 

228 updated_documents = [] 

229 for document_to_update in documents_to_update: 229 ↛ 230line 229 didn't jump to line 230 because the loop on line 229 never started

230 data = document_data_by_id[document_to_update.id] # type: ignore 

231 document_to_update.update(session=self.session, **data) # type: ignore 

232 updated_documents.append(document_to_update) 

233 

234 self.session.commit() 

235 return [self.schema.model_validate(x) for x in updated_documents] 

236 

237 def patch(self, match_value: str | int | UUID4, new_data: dict | BaseModel) -> Schema: 1b

238 new_data = new_data if isinstance(new_data, dict) else new_data.model_dump() 

239 

240 entry = self._query_one(match_value=match_value) 

241 

242 entry_as_dict = self.schema.model_validate(entry).model_dump() 

243 entry_as_dict.update(new_data) 

244 

245 return self.update(match_value, entry_as_dict) 

246 

247 def delete(self, value, match_key: str | None = None) -> Schema: 1b

248 match_key = match_key or self.primary_key 1ua

249 

250 result = self._query_one(value, match_key) 1ua

251 result_as_model = self.schema.model_validate(result) 

252 

253 try: 

254 self.session.delete(result) 

255 self.session.commit() 

256 except Exception as e: 

257 self.session.rollback() 

258 raise e 

259 

260 return result_as_model 

261 

262 def delete_many(self, values: Iterable) -> list[Schema]: 1b

263 query = self._query().filter(self.model.id.in_(values)) 1ya

264 results = self.session.execute(query).unique().scalars().all() 1ya

265 results_as_model = [self.schema.model_validate(result) for result in results] 1ya

266 

267 try: 1ya

268 # we create a delete statement for each row 

269 # we don't delete the whole query in one statement because postgres doesn't cascade correctly 

270 for result in results: 270 ↛ 271line 270 didn't jump to line 271 because the loop on line 270 never started1ya

271 self.session.delete(result) 

272 

273 self.session.commit() 1ya

274 except Exception as e: 

275 self.session.rollback() 

276 raise e 

277 

278 return results_as_model 1ya

279 

280 def delete_all(self) -> None: 1b

281 delete(self.model) 

282 self.session.commit() 

283 

284 def count_all(self, match_key=None, match_value=None) -> int: 1b

285 q = select(func.count(self.model.id)) 

286 if None not in [match_key, match_value]: 

287 q = q.filter_by(**{match_key: match_value}) 

288 return self.session.scalar(q) 

289 

290 def _count_attribute( 1b

291 self, 

292 attribute_name: str, 

293 attr_match: str | None = None, 

294 count=True, 

295 override_schema=None, 

296 ) -> int | list[Schema]: # sourcery skip: assign-if-exp 

297 eff_schema = override_schema or self.schema 

298 

299 if count: 

300 q = select(func.count(self.model.id)).filter(attribute_name == attr_match) 

301 return self.session.scalar(q) 

302 else: 

303 q = self._query(override_schema=eff_schema).filter(attribute_name == attr_match) 

304 return [eff_schema.model_validate(x) for x in self.session.execute(q).scalars().all()] 

305 

306 def page_all(self, pagination: PaginationQuery, override=None, search: str | None = None) -> PaginationBase[Schema]: 1b

307 """ 

308 pagination is a method to interact with the filtered database table and return a paginated result 

309 using the PaginationBase that provides several data points that are needed to manage pagination 

310 on the client side. This method does utilize the _filter_build method to ensure that the results 

311 are filtered by the group id when applicable. 

312 

313 NOTE: When you provide an override you'll need to manually type the result of this method 

314 as the override, as the type system is not able to infer the result of this method. 

315 """ 

316 eff_schema = override or self.schema 1bcmqejkoghdnlpfia

317 # Copy this, because calling methods (e.g. tests) might rely on it not getting mutated 

318 pagination_result = pagination.model_copy() 1bcmqejkoghdnlpfia

319 q = self._query(override_schema=eff_schema, with_options=False) 1bcmqejkoghdnlpfia

320 

321 fltr = self._filter_builder() 1bcmqejkoghdnlpfia

322 q = q.filter_by(**fltr) 1bcmqejkoghdnlpfia

323 if search: 1bcmqejkoghdnlpfia

324 q = self.add_search_to_query(q, eff_schema, search) 1qeka

325 

326 if not pagination_result.order_by and not search: 1bcmqejkoghdnlpfia

327 # default ordering if not searching 

328 pagination_result.order_by = "created_at" 1bcmejkodnlpfia

329 

330 q, count, total_pages = self.add_pagination_to_query(q, pagination_result) 1bcmqejkoghdnlpfia

331 

332 # Apply options late, so they do not get used for counting 

333 q = q.options(*eff_schema.loader_options()) 1bcmqejkoghdnlpfia

334 try: 1bcmqejkoghdnlpfia

335 data = self.session.execute(q).unique().scalars().all() 1bcmqejkoghdnlpfia

336 except Exception as e: 1cmejka

337 self._log_exception(e) 1cmejka

338 self.session.rollback() 1cmejka

339 raise e 1cmejka

340 return PaginationBase( 1bcmqejkoghdnlpfia

341 page=pagination_result.page, 

342 per_page=pagination_result.per_page, 

343 total=count, 

344 total_pages=total_pages, 

345 items=[eff_schema.model_validate(s) for s in data], 

346 ) 

347 

348 def add_pagination_to_query(self, query: Select, pagination: PaginationQuery) -> tuple[Select, int, int]: 1b

349 """ 

350 Adds pagination data to an existing query. 

351 

352 :returns: 

353 - query - modified query with pagination data 

354 - count - total number of records (without pagination) 

355 - total_pages - the total number of pages in the query 

356 """ 

357 

358 if pagination.query_filter: 1bcmqejkroghdnlpfia

359 try: 1cmejkodna

360 query_filter_builder = QueryFilterBuilder(pagination.query_filter) 1cmejkodna

361 query = query_filter_builder.filter_query(query, model=self.model, column_aliases=self.column_aliases) 1cmejkodna

362 

363 except ValueError as e: 1cejka

364 self.logger.error(e) 1cjka

365 raise HTTPException(status_code=400, detail=str(e)) from e 1cjka

366 

367 count_query = select(func.count()).select_from(query.subquery()) 1bcmqejkroghdnlpfia

368 count = self.session.scalar(count_query) 1bcmqejkroghdnlpfia

369 if not count: 1bcmqejkroghdnlpfia

370 count = 0 1bcmqejkrodnpfa

371 

372 # interpret -1 as "get_all" 

373 if pagination.per_page == -1: 1bcmqejkroghdnlpfia

374 pagination.per_page = count 1bcoghdnlpfia

375 

376 try: 1bcmqejkroghdnlpfia

377 total_pages = ceil(count / pagination.per_page) 1bcmqejkroghdnlpfia

378 except ZeroDivisionError: 1bcmjkodnpfa

379 total_pages = 0 1bcmjkodnpfa

380 

381 # interpret -1 as "last page" 

382 if pagination.page == -1: 382 ↛ 383line 382 didn't jump to line 383 because the condition on line 382 was never true1bcmqejkroghdnlpfia

383 pagination.page = total_pages 

384 

385 # failsafe for user input error 

386 if pagination.page < 1: 1bcmqejkroghdnlpfia

387 pagination.page = 1 1cmejka

388 

389 query = self.add_order_by_to_query(query, pagination) 1bcmqejkroghdnlpfia

390 return query.limit(pagination.per_page).offset((pagination.page - 1) * pagination.per_page), count, total_pages 1bcmqejkroghdnlpfia

391 

392 def add_order_attr_to_query( 1b

393 self, 

394 query: Select, 

395 order_attr: InstrumentedAttribute, 

396 order_dir: OrderDirection, 

397 order_by_null: OrderByNullPosition | None, 

398 ) -> Select: 

399 order_attr = self.column_aliases.get(order_attr.key, order_attr) 1bcmejkoghdnlpfia

400 

401 # queries handle uppercase and lowercase differently, which is undesirable 

402 if isinstance(order_attr.type, sqltypes.String): 1bcmejkoghdnlpfia

403 order_attr = func.lower(order_attr) 1ghda

404 

405 if order_dir is OrderDirection.asc: 1bcmejkoghdnlpfia

406 order_attr = order_attr.asc() 1cmejkghda

407 elif order_dir is OrderDirection.desc: 407 ↛ 410line 407 didn't jump to line 410 because the condition on line 407 was always true1bcmejkodnlpfia

408 order_attr = order_attr.desc() 1bcmejkodnlpfia

409 

410 if order_by_null is OrderByNullPosition.first: 1bcmejkoghdnlpfia

411 order_attr = nulls_first(order_attr) 1cmjka

412 elif order_by_null is OrderByNullPosition.last: 1bcmejkoghdnlpfia

413 order_attr = nulls_last(order_attr) 1cmejka

414 

415 return query.order_by(order_attr) 1bcmejkoghdnlpfia

416 

417 def add_order_by_to_query(self, query: Select, request_query: RequestQuery) -> Select: 1b

418 if not request_query.order_by: 1bcmqejkroghdnlpfia

419 return query 1qeka

420 

421 elif request_query.order_by == "random": 1bcmejkroghdnlpfia

422 # randomize outside of database, since not all db's can set random seeds 

423 # this solution is db-independent & stable to paging 

424 temp_query = query.with_only_columns(self.model.id) 1ra

425 allids = self.session.execute(temp_query).scalars().all() # fast because id is indexed 1ra

426 if not allids: 426 ↛ 429line 426 didn't jump to line 429 because the condition on line 426 was always true1ra

427 return query 1ra

428 

429 order = list(range(len(allids))) 

430 random.seed(request_query.pagination_seed) 

431 random.shuffle(order) 

432 random_dict = dict(zip(allids, order, strict=True)) 

433 case_stmt = case(random_dict, value=self.model.id) 

434 return query.order_by(case_stmt) 

435 

436 else: 

437 for order_by_val in request_query.order_by.split(","): 1bcmejkoghdnlpfia

438 try: 1bcmejkoghdnlpfia

439 order_by_val = order_by_val.strip() 1bcmejkoghdnlpfia

440 if ":" in order_by_val: 1bcmejkoghdnlpfia

441 order_by, order_dir_val = order_by_val.split(":") 1ceja

442 order_dir = OrderDirection(order_dir_val) 1ceja

443 else: 

444 order_by = order_by_val 1bcmejkoghdnlpfia

445 order_dir = request_query.order_direction 1bcmejkoghdnlpfia

446 

447 _, order_attr, query = QueryFilterBuilder.get_model_and_model_attr_from_attr_string( 1bcmejkoghdnlpfia

448 order_by, self.model, query=query 

449 ) 

450 

451 query = self.add_order_attr_to_query( 1bcmejkoghdnlpfia

452 query, order_attr, order_dir, request_query.order_by_null_position 

453 ) 

454 

455 except ValueError as e: 1cmejka

456 raise HTTPException( 1cmejka

457 status_code=400, 

458 detail=f'Invalid order_by statement "{request_query.order_by}": "{order_by_val}" is invalid', 

459 ) from e 

460 

461 return query 1bcmejkoghdnlpfia

462 

463 def add_search_to_query(self, query: Select, schema: type[Schema], search: str) -> Select: 1b

464 search_filter = SearchFilter(self.session, search, schema._normalize_search) 1qeka

465 return search_filter.filter_query_by_search(query, schema, self.model) 1qeka

466 

467 

468class GroupRepositoryGeneric[Schema: MealieModel, Model: SqlAlchemyBase](RepositoryGeneric[Schema, Model]): 1b

469 def __init__( 1b

470 self, 

471 session: Session, 

472 primary_key: str, 

473 sql_model: type[Model], 

474 schema: type[Schema], 

475 *, 

476 group_id: UUID4 | None | NotSet, 

477 ) -> None: 

478 super().__init__(session, primary_key, sql_model, schema) 1btuycmqejkIJHKwroghdnlpfCzvDEFiGxsABa

479 if group_id is NOT_SET: 479 ↛ 480line 479 didn't jump to line 480 because the condition on line 479 was never true1btuycmqejkIJHKwroghdnlpfCzvDEFiGxsABa

480 raise ValueError("group_id must be set") 

481 self._group_id = group_id if group_id else None 1btuycmqejkIJHKwroghdnlpfCzvDEFiGxsABa

482 

483 

484class HouseholdRepositoryGeneric[Schema: MealieModel, Model: SqlAlchemyBase](RepositoryGeneric[Schema, Model]): 1b

485 def __init__( 1b

486 self, 

487 session: Session, 

488 primary_key: str, 

489 sql_model: type[Model], 

490 schema: type[Schema], 

491 *, 

492 group_id: UUID4 | None | NotSet, 

493 household_id: UUID4 | None | NotSet, 

494 ) -> None: 

495 super().__init__(session, primary_key, sql_model, schema) 1btuycmejwroghdnlfCzvDEFiGxsa

496 if group_id is NOT_SET: 496 ↛ 497line 496 didn't jump to line 497 because the condition on line 496 was never true1btuycmejwroghdnlfCzvDEFiGxsa

497 raise ValueError("group_id must be set") 

498 self._group_id = group_id if group_id else None 1btuycmejwroghdnlfCzvDEFiGxsa

499 

500 if household_id is NOT_SET: 500 ↛ 501line 500 didn't jump to line 501 because the condition on line 500 was never true1btuycmejwroghdnlfCzvDEFiGxsa

501 raise ValueError("household_id must be set") 

502 self._household_id = household_id if household_id else None 1btuycmejwroghdnlfCzvDEFiGxsa