Coverage for opt/mealie/lib/python3.12/site-packages/mealie/repos/repository_recipes.py: 49%

269 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:32 +0000

1import re as re 1c

2from collections.abc import Iterable, Sequence 1c

3from random import randint 1c

4from typing import Self, cast 1c

5from uuid import UUID 1c

6 

7import sqlalchemy as sa 1c

8from fastapi import HTTPException 1c

9from pydantic import UUID4 1c

10from sqlalchemy import orm 1c

11from sqlalchemy.exc import IntegrityError 1c

12 

13from mealie.db.models.household import Household, HouseholdToRecipe 1c

14from mealie.db.models.recipe.category import Category 1c

15from mealie.db.models.recipe.ingredient import RecipeIngredientModel, households_to_ingredient_foods 1c

16from mealie.db.models.recipe.recipe import RecipeModel 1c

17from mealie.db.models.recipe.settings import RecipeSettings 1c

18from mealie.db.models.recipe.tag import Tag 1c

19from mealie.db.models.recipe.tool import Tool, households_to_tools, recipes_to_tools 1c

20from mealie.db.models.users.user_to_recipe import UserToRecipe 1c

21from mealie.db.models.users.users import User 1c

22from mealie.schema.cookbook.cookbook import ReadCookBook 1c

23from mealie.schema.recipe import Recipe 1c

24from mealie.schema.recipe.recipe import RecipeCategory, RecipePagination, RecipeSummary, create_recipe_slug 1c

25from mealie.schema.recipe.recipe_ingredient import IngredientFood 1c

26from mealie.schema.recipe.recipe_suggestion import RecipeSuggestionQuery, RecipeSuggestionResponseItem 1c

27from mealie.schema.recipe.recipe_tool import RecipeToolOut 1c

28from mealie.schema.response.pagination import PaginationQuery 1c

29from mealie.schema.response.query_filter import QueryFilterBuilder 1c

30 

31from ..db.models._model_base import SqlAlchemyBase 1c

32from .repository_generic import HouseholdRepositoryGeneric 1c

33 

34 

35class RepositoryRecipes(HouseholdRepositoryGeneric[Recipe, RecipeModel]): 1c

36 user_id: UUID4 | None = None 1c

37 

38 @property 1c

39 def column_aliases(self): 1c

40 if not self.user_id: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true1adefgb

41 return {} 

42 

43 return { 1adefgb

44 "last_made": self._get_last_made_col_alias(), 

45 "rating": self._get_rating_col_alias(), 

46 } 

47 

48 def by_user(self: Self, user_id: UUID4) -> Self: 1c

49 """Add a user_id to the repo, which will be used to handle recipe ratings and other user-specific data""" 

50 self.user_id = user_id 1adefghijklb

51 return self 1adefghijklb

52 

53 def _get_last_made_col_alias(self) -> sa.ColumnElement | None: 1c

54 """Computed last_made which uses `HouseholdToRecipe.last_made` for the user's household, otherwise None""" 

55 

56 user_household_subquery = sa.select(User.household_id).where(User.id == self.user_id).scalar_subquery() 1adefgb

57 return ( 1adefgb

58 sa.select(HouseholdToRecipe.last_made) 

59 .where( 

60 HouseholdToRecipe.recipe_id == self.model.id, 

61 HouseholdToRecipe.household_id == user_household_subquery, 

62 ) 

63 .correlate(self.model) 

64 .scalar_subquery() 

65 ) 

66 

67 def _get_rating_col_alias(self) -> sa.ColumnElement | None: 1c

68 """Computed rating which uses the user's rating if it exists, otherwise falling back to the recipe's rating""" 

69 

70 effective_rating = sa.case( 1adefgb

71 ( 

72 sa.exists().where( 

73 UserToRecipe.recipe_id == self.model.id, 

74 UserToRecipe.user_id == self.user_id, 

75 UserToRecipe.rating != None, # noqa E711 

76 UserToRecipe.rating > 0, 

77 ), 

78 sa.select(sa.func.max(UserToRecipe.rating)) 

79 .where(UserToRecipe.recipe_id == self.model.id, UserToRecipe.user_id == self.user_id) 

80 .correlate(self.model) 

81 .scalar_subquery(), 

82 ), 

83 else_=sa.case( 

84 (self.model.rating == 0, None), 

85 else_=self.model.rating, 

86 ), 

87 ) 

88 return sa.cast(effective_rating, sa.Float) 1adefgb

89 

90 def create(self, document: Recipe) -> Recipe: # type: ignore 1c

91 max_retries = 10 

92 original_name: str = document.name # type: ignore 

93 

94 for i in range(1, 11): 94 ↛ exitline 94 didn't return from function 'create' because the loop on line 94 didn't complete

95 try: 

96 return super().create(document) 

97 except IntegrityError: 

98 self.session.rollback() 

99 document.name = f"{original_name} ({i})" 

100 document.slug = create_recipe_slug(document.name) 

101 

102 if i >= max_retries: 

103 raise 

104 

105 def _delete_recipe(self, recipe: RecipeModel) -> Recipe: 1c

106 recipe_as_model = self.schema.model_validate(recipe) 

107 

108 # first remove UserToRecipe entries so we don't run into stale data errors 

109 try: 

110 user_to_recipe_delete_query = sa.delete(UserToRecipe).where(UserToRecipe.recipe_id == recipe.id) 

111 self.session.execute(user_to_recipe_delete_query) 

112 self.session.commit() 

113 except Exception: 

114 self.session.rollback() 

115 raise 

116 

117 # remove the recipe 

118 try: 

119 self.session.delete(recipe) 

120 self.session.commit() 

121 except Exception: 

122 self.session.rollback() 

123 raise 

124 

125 return recipe_as_model 

126 

127 def delete(self, value, match_key: str | None = None) -> Recipe: 1c

128 match_key = match_key or self.primary_key 

129 recipe_in_db = self._query_one(value, match_key) 

130 return self._delete_recipe(recipe_in_db) 

131 

132 def delete_many(self, values: Iterable) -> list[Recipe]: 1c

133 query = self._query().filter(self.model.id.in_(values)) 

134 recipes_in_db = self.session.execute(query).unique().scalars().all() 

135 results: list[Recipe] = [] 

136 

137 # we create a delete statement for each row 

138 # we don't delete the whole query in one statement because postgres doesn't cascade correctly 

139 for recipe_in_db in recipes_in_db: 

140 results.append(self._delete_recipe(recipe_in_db)) 

141 

142 try: 

143 self.session.commit() 

144 except Exception as e: 

145 self.session.rollback() 

146 raise e 

147 

148 return results 

149 

150 def update_image(self, slug: str, _: str | None = None) -> int: 1c

151 entry: RecipeModel = self._query_one(match_value=slug) 

152 entry.image = randint(0, 255) 

153 self.session.commit() 

154 

155 return entry.image 

156 

157 def count_uncategorized(self, count=True, override_schema=None): 1c

158 return self._count_attribute( 

159 attribute_name=RecipeModel.recipe_category, 

160 attr_match=None, 

161 count=count, 

162 override_schema=override_schema, 

163 ) 

164 

165 def count_untagged(self, count=True, override_schema=None): 1c

166 return self._count_attribute( 

167 attribute_name=RecipeModel.tags, 

168 attr_match=None, 

169 count=count, 

170 override_schema=override_schema, 

171 ) 

172 

173 def _uuids_for_items(self, items: list[UUID | str] | None, model: type[SqlAlchemyBase]) -> list[UUID] | None: 1c

174 if not items: 1adefghijklb

175 return None 1adefghijklb

176 ids: list[UUID] = [] 1a

177 slugs: list[str] = [] 1a

178 

179 for i in items: 1a

180 if isinstance(i, UUID): 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true1a

181 ids.append(i) 

182 else: 

183 try: 1a

184 i_as_uuid = UUID(i) 1a

185 ids.append(i_as_uuid) 

186 except ValueError: 1a

187 slugs.append(i) 1a

188 

189 if not slugs: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true1a

190 return ids 

191 additional_ids = self.session.execute(sa.select(model.id).filter(model.slug.in_(slugs))).scalars().all() 1a

192 return ids + additional_ids 1a

193 

194 def page_all( # type: ignore 1c

195 self, 

196 pagination: PaginationQuery, 

197 override=None, 

198 cookbook: ReadCookBook | None = None, 

199 categories: list[UUID4 | str] | None = None, 

200 tags: list[UUID4 | str] | None = None, 

201 tools: list[UUID4 | str] | None = None, 

202 foods: list[UUID4 | str] | None = None, 

203 households: list[UUID4 | str] | None = None, 

204 require_all_categories=True, 

205 require_all_tags=True, 

206 require_all_tools=True, 

207 require_all_foods=True, 

208 search: str | None = None, 

209 ) -> RecipePagination: 

210 # Copy this, because calling methods (e.g. tests) might rely on it not getting mutated 

211 pagination_result = pagination.model_copy() 1adefghijklb

212 q = sa.select(self.model) 1adefghijklb

213 

214 fltr = self._filter_builder() 1adefghijklb

215 q = q.filter_by(**fltr) 1adefghijklb

216 

217 if cookbook: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true1adefghijklb

218 if pagination_result.query_filter and cookbook.query_filter_string: 

219 pagination_result.query_filter = ( 

220 f"({pagination_result.query_filter}) AND ({cookbook.query_filter_string})" 

221 ) 

222 else: 

223 pagination_result.query_filter = cookbook.query_filter_string 

224 else: 

225 category_ids = self._uuids_for_items(categories, Category) 1adefghijklb

226 tag_ids = self._uuids_for_items(tags, Tag) 1adefghijklb

227 tool_ids = self._uuids_for_items(tools, Tool) 1adefghijklb

228 household_ids = self._uuids_for_items(households, Household) 1adefghijklb

229 filters = self._build_recipe_filter( 1adefghijklb

230 categories=category_ids, 

231 tags=tag_ids, 

232 tools=tool_ids, 

233 foods=foods, 

234 households=household_ids, 

235 require_all_categories=require_all_categories, 

236 require_all_tags=require_all_tags, 

237 require_all_tools=require_all_tools, 

238 require_all_foods=require_all_foods, 

239 ) 

240 q = q.filter(*filters) 1adefghijklb

241 if search: 241 ↛ 242line 241 didn't jump to line 242 because the condition on line 241 was never true1adefghijklb

242 q = self.add_search_to_query(q, self.schema, search) 

243 

244 if not pagination_result.order_by and not search: 1adefghijklb

245 # default ordering if not searching 

246 pagination_result.order_by = "created_at" 1a

247 

248 q, count, total_pages = self.add_pagination_to_query(q, pagination_result) 1adefghijklb

249 

250 # Apply options late, so they do not get used for counting 

251 q = q.options(*RecipeSummary.loader_options()) 1adefgb

252 try: 1adefgb

253 self.logger.debug(f"Recipe Pagination Query: {pagination_result}") 1adefgb

254 data = self.session.execute(q).scalars().unique().all() 1adefgb

255 except Exception as e: 

256 self._log_exception(e) 

257 self.session.rollback() 

258 raise e 

259 

260 items = [RecipeSummary.model_validate(item) for item in data] 1adefgb

261 return RecipePagination( 1adefgb

262 page=pagination_result.page, 

263 per_page=pagination_result.per_page, 

264 total=count, 

265 total_pages=total_pages, 

266 items=items, 

267 ) 

268 

269 def get_by_categories(self, categories: list[RecipeCategory]) -> list[RecipeSummary]: 1c

270 """ 

271 get_by_categories returns all the Recipes that contain every category provided in the list 

272 """ 

273 

274 ids = [x.id for x in categories] 

275 stmt = ( 

276 sa.select(RecipeModel) 

277 .join(RecipeModel.recipe_category) 

278 .filter(RecipeModel.recipe_category.any(Category.id.in_(ids))) 

279 ) 

280 if self.group_id: 

281 stmt = stmt.filter(RecipeModel.group_id == self.group_id) 

282 if self.household_id: 

283 stmt = stmt.filter(RecipeModel.household_id == self.household_id) 

284 

285 return [RecipeSummary.model_validate(x) for x in self.session.execute(stmt).unique().scalars().all()] 

286 

287 def _build_recipe_filter( 1c

288 self, 

289 categories: list[UUID4] | None = None, 

290 tags: list[UUID4] | None = None, 

291 tools: list[UUID4] | None = None, 

292 foods: list[UUID4] | None = None, 

293 households: list[UUID4] | None = None, 

294 require_all_categories: bool = True, 

295 require_all_tags: bool = True, 

296 require_all_tools: bool = True, 

297 require_all_foods: bool = True, 

298 ) -> list: 

299 fltr: list[sa.ColumnElement] = [] 1adefghijklb

300 if self.group_id: 300 ↛ 302line 300 didn't jump to line 302 because the condition on line 300 was always true1adefghijklb

301 fltr.append(RecipeModel.group_id == self.group_id) 1adefghijklb

302 if self.household_id: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true1adefghijklb

303 fltr.append(RecipeModel.household_id == self.household_id) 

304 

305 if categories: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true1adefghijklb

306 if require_all_categories: 

307 fltr.extend(RecipeModel.recipe_category.any(Category.id == cat_id) for cat_id in categories) 

308 else: 

309 fltr.append(RecipeModel.recipe_category.any(Category.id.in_(categories))) 

310 

311 if tags: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true1adefghijklb

312 if require_all_tags: 

313 fltr.extend(RecipeModel.tags.any(Tag.id == tag_id) for tag_id in tags) 

314 else: 

315 fltr.append(RecipeModel.tags.any(Tag.id.in_(tags))) 

316 

317 if tools: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true1adefghijklb

318 if require_all_tools: 

319 fltr.extend(RecipeModel.tools.any(Tool.id == tool_id) for tool_id in tools) 

320 else: 

321 fltr.append(RecipeModel.tools.any(Tool.id.in_(tools))) 

322 if foods: 322 ↛ 323line 322 didn't jump to line 323 because the condition on line 322 was never true1adefghijklb

323 if require_all_foods: 

324 fltr.extend(RecipeModel.recipe_ingredient.any(RecipeIngredientModel.food_id == food) for food in foods) 

325 else: 

326 fltr.append(RecipeModel.recipe_ingredient.any(RecipeIngredientModel.food_id.in_(foods))) 

327 if households: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true1adefghijklb

328 fltr.append(RecipeModel.household_id.in_(households)) 

329 return fltr 1adefghijklb

330 

331 def get_random(self, limit=1) -> list[Recipe]: 1c

332 stmt = sa.select(RecipeModel).order_by(sa.func.random()).limit(limit) # Postgres and SQLite specific 

333 if self.group_id: 

334 stmt = stmt.filter(RecipeModel.group_id == self.group_id) 

335 if self.household_id: 

336 stmt = stmt.filter(RecipeModel.household_id == self.household_id) 

337 

338 return [self.schema.model_validate(x) for x in self.session.execute(stmt).scalars().all()] 

339 

340 def get_by_slug(self, group_id: UUID4, slug: str) -> Recipe | None: 1c

341 stmt = sa.select(RecipeModel).filter(RecipeModel.group_id == group_id, RecipeModel.slug == slug) 

342 dbrecipe = self.session.execute(stmt).scalars().one_or_none() 

343 if dbrecipe is None: 

344 return None 

345 return self.schema.model_validate(dbrecipe) 

346 

347 def all_ids(self, group_id: UUID4) -> Sequence[UUID4]: 1c

348 stmt = sa.select(RecipeModel.id).filter(RecipeModel.group_id == group_id) 1a

349 return self.session.execute(stmt).scalars().all() 1a

350 

351 def find_suggested_recipes( 1c

352 self, 

353 params: RecipeSuggestionQuery, 

354 food_ids: list[UUID4] | None = None, 

355 tool_ids: list[UUID4] | None = None, 

356 ) -> list[RecipeSuggestionResponseItem]: 

357 """ 

358 Queries all recipes and returns the ones that are missing the least amount of foods and tools. 

359 

360 Results are ordered first by number of missing tools, then foods, and finally by the user-specified order. 

361 If foods are provided, the query will prefer recipes with more matches to user-provided foods. 

362 """ 

363 

364 if not params.order_by: 1ab

365 params.order_by = "created_at" 1ab

366 

367 user_food_ids = list(set(food_ids or [])) 1ab

368 user_tool_ids = list(set(tool_ids or [])) 1ab

369 

370 # preserve the original lists of ids before we add on_hand items 

371 food_ids_with_on_hand = user_food_ids.copy() 1ab

372 tool_ids_with_on_hand = user_tool_ids.copy() 1ab

373 

374 if params.include_foods_on_hand and self.user_id: 1ab

375 foods_on_hand_query = ( 1ab

376 sa.select(households_to_ingredient_foods.c.food_id) 

377 .join(User, households_to_ingredient_foods.c.household_id == User.household_id) 

378 .filter( 

379 sa.not_(households_to_ingredient_foods.c.food_id.in_(food_ids_with_on_hand)), 

380 User.id == self.user_id, 

381 ) 

382 ) 

383 foods_on_hand = self.session.execute(foods_on_hand_query).scalars().all() 1ab

384 food_ids_with_on_hand.extend(foods_on_hand) 1ab

385 

386 if params.include_tools_on_hand and self.user_id: 386 ↛ 399line 386 didn't jump to line 399 because the condition on line 386 was always true1ab

387 tools_on_hand_query = ( 1ab

388 sa.select(households_to_tools.c.tool_id) 

389 .join(User, households_to_tools.c.household_id == User.household_id) 

390 .filter( 

391 sa.not_(households_to_tools.c.tool_id.in_(tool_ids_with_on_hand)), 

392 User.id == self.user_id, 

393 ) 

394 ) 

395 tools_on_hand = self.session.execute(tools_on_hand_query).scalars().all() 1ab

396 tool_ids_with_on_hand.extend(tools_on_hand) 1ab

397 

398 ## Build suggestion query 

399 settings_alias = orm.aliased(RecipeSettings) 1ab

400 ingredients_alias = orm.aliased(RecipeIngredientModel) 1ab

401 tools_alias = orm.aliased(Tool) 1ab

402 

403 q = sa.select(self.model) 1ab

404 fltr = self._filter_builder() 1ab

405 q = q.filter_by(**fltr) 1ab

406 

407 # Tools goes first so we can order by missing tools count before foods 

408 if user_tool_ids: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true1ab

409 unmatched_tools_query = ( 

410 sa.select(recipes_to_tools.c.recipe_id, sa.func.count().label("unmatched_tools_count")) 

411 .join(tools_alias, recipes_to_tools.c.tool_id == tools_alias.id) 

412 .filter(sa.not_(tools_alias.id.in_(tool_ids_with_on_hand))) 

413 .group_by(recipes_to_tools.c.recipe_id) 

414 .subquery() 

415 ) 

416 q = ( 

417 q.outerjoin(unmatched_tools_query, self.model.id == unmatched_tools_query.c.recipe_id) 

418 .filter( 

419 sa.or_( 

420 unmatched_tools_query.c.unmatched_tools_count.is_(None), 

421 unmatched_tools_query.c.unmatched_tools_count <= params.max_missing_tools, 

422 ) 

423 ) 

424 .order_by(unmatched_tools_query.c.unmatched_tools_count.asc().nulls_first()) 

425 ) 

426 

427 if user_food_ids: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true1ab

428 unmatched_foods_query = ( 

429 sa.select(ingredients_alias.recipe_id, sa.func.count().label("unmatched_foods_count")) 

430 .filter(sa.not_(ingredients_alias.food_id.in_(food_ids_with_on_hand))) 

431 .filter(ingredients_alias.food_id.isnot(None)) 

432 .group_by(ingredients_alias.recipe_id) 

433 .subquery() 

434 ) 

435 total_user_foods_query = ( 

436 sa.select(ingredients_alias.recipe_id, sa.func.count().label("total_user_foods_count")) 

437 .filter(ingredients_alias.food_id.in_(user_food_ids)) 

438 .group_by(ingredients_alias.recipe_id) 

439 .subquery() 

440 ) 

441 q = ( 

442 q.join(settings_alias, self.model.settings) 

443 .outerjoin(unmatched_foods_query, self.model.id == unmatched_foods_query.c.recipe_id) 

444 .outerjoin(total_user_foods_query, self.model.id == total_user_foods_query.c.recipe_id) 

445 .filter( 

446 sa.or_( 

447 unmatched_foods_query.c.unmatched_foods_count.is_(None), 

448 unmatched_foods_query.c.unmatched_foods_count <= params.max_missing_foods, 

449 ), 

450 ) 

451 .order_by( 

452 unmatched_foods_query.c.unmatched_foods_count.asc().nulls_first(), 

453 # favor recipes with more matched foods, in case the user is looking for something specific 

454 total_user_foods_query.c.total_user_foods_count.desc().nulls_last(), 

455 ) 

456 ) 

457 

458 # only include recipes that have at least one food in the user's list 

459 if user_food_ids: 

460 q = q.filter(total_user_foods_query.c.total_user_foods_count > 0) 

461 

462 ## Add filters and loader options 

463 if self.group_id: 463 ↛ 465line 463 didn't jump to line 465 because the condition on line 463 was always true1ab

464 q = q.filter(self.model.group_id == self.group_id) 1ab

465 if self.household_id: 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true1ab

466 q = q.filter(self.model.household_id == self.household_id) 

467 if params.query_filter: 467 ↛ 468line 467 didn't jump to line 468 because the condition on line 467 was never true1ab

468 try: 

469 query_filter_builder = QueryFilterBuilder(params.query_filter) 

470 q = query_filter_builder.filter_query(q, model=self.model) 

471 

472 except ValueError as e: 

473 self.logger.error(e) 

474 raise HTTPException(status_code=400, detail=str(e)) from e 

475 

476 q = self.add_order_by_to_query(q, params) 1ab

477 q = q.limit(params.limit).options(*RecipeSummary.loader_options()) 1ab

478 

479 ## Execute query 

480 try: 1ab

481 data = self.session.execute(q).scalars().unique().all() 1ab

482 except Exception as e: 

483 self._log_exception(e) 

484 self.session.rollback() 

485 raise e 

486 

487 suggestions: list[RecipeSuggestionResponseItem] = [] 1ab

488 for result in data: 488 ↛ 489line 488 didn't jump to line 489 because the loop on line 488 never started1ab

489 recipe = cast(RecipeModel, result) 

490 

491 missing_foods: list[IngredientFood] = [] 

492 if user_food_ids: # only check for missing foods if the user has provided a list of foods 

493 seen_food_ids: set[UUID4] = set() 

494 seen_food_ids.update(food_ids_with_on_hand) 

495 for ingredient in recipe.recipe_ingredient: 

496 if not ingredient.food: 

497 continue 

498 if ingredient.food.id in seen_food_ids: 

499 continue 

500 

501 seen_food_ids.add(ingredient.food.id) 

502 missing_foods.append(IngredientFood.model_validate(ingredient.food)) 

503 

504 missing_tools: list[RecipeToolOut] = [] 

505 if user_tool_ids: # only check for missing tools if the user has provided a list of tools 

506 seen_tool_ids: set[UUID4] = set() 

507 seen_tool_ids.update(tool_ids_with_on_hand) 

508 for tool in recipe.tools: 

509 if tool.id in seen_tool_ids: 

510 continue 

511 

512 seen_tool_ids.add(tool.id) 

513 missing_tools.append(RecipeToolOut.model_validate(tool)) 

514 

515 suggestion = RecipeSuggestionResponseItem( 

516 recipe=RecipeSummary.model_validate(recipe), 

517 missing_foods=missing_foods, 

518 missing_tools=missing_tools, 

519 ) 

520 suggestions.append(suggestion) 

521 

522 return suggestions 1ab