Coverage for opt/mealie/lib/python3.12/site-packages/mealie/repos/repository_recipes.py: 58%
269 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1import re as re 1b
2from collections.abc import Iterable, Sequence 1b
3from random import randint 1b
4from typing import Self, cast 1b
5from uuid import UUID 1b
7import sqlalchemy as sa 1b
8from fastapi import HTTPException 1b
9from pydantic import UUID4 1b
10from sqlalchemy import orm 1b
11from sqlalchemy.exc import IntegrityError 1b
13from mealie.db.models.household import Household, HouseholdToRecipe 1b
14from mealie.db.models.recipe.category import Category 1b
15from mealie.db.models.recipe.ingredient import RecipeIngredientModel, households_to_ingredient_foods 1b
16from mealie.db.models.recipe.recipe import RecipeModel 1b
17from mealie.db.models.recipe.settings import RecipeSettings 1b
18from mealie.db.models.recipe.tag import Tag 1b
19from mealie.db.models.recipe.tool import Tool, households_to_tools, recipes_to_tools 1b
20from mealie.db.models.users.user_to_recipe import UserToRecipe 1b
21from mealie.db.models.users.users import User 1b
22from mealie.schema.cookbook.cookbook import ReadCookBook 1b
23from mealie.schema.recipe import Recipe 1b
24from mealie.schema.recipe.recipe import RecipeCategory, RecipePagination, RecipeSummary, create_recipe_slug 1b
25from mealie.schema.recipe.recipe_ingredient import IngredientFood 1b
26from mealie.schema.recipe.recipe_suggestion import RecipeSuggestionQuery, RecipeSuggestionResponseItem 1b
27from mealie.schema.recipe.recipe_tool import RecipeToolOut 1b
28from mealie.schema.response.pagination import PaginationQuery 1b
29from mealie.schema.response.query_filter import QueryFilterBuilder 1b
31from ..db.models._model_base import SqlAlchemyBase 1b
32from .repository_generic import HouseholdRepositoryGeneric 1b
35class RepositoryRecipes(HouseholdRepositoryGeneric[Recipe, RecipeModel]): 1b
36 user_id: UUID4 | None = None 1b
38 @property 1b
39 def column_aliases(self): 1b
40 if not self.user_id: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true1cdefghija
41 return {}
43 return { 1cdefghija
44 "last_made": self._get_last_made_col_alias(),
45 "rating": self._get_rating_col_alias(),
46 }
48 def by_user(self: Self, user_id: UUID4) -> Self: 1b
49 """Add a user_id to the repo, which will be used to handle recipe ratings and other user-specific data"""
50 self.user_id = user_id 1klmnopqrsctduvwxyefzAghija
51 return self 1klmnopqrsctduvwxyefzAghija
53 def _get_last_made_col_alias(self) -> sa.ColumnElement | None: 1b
54 """Computed last_made which uses `HouseholdToRecipe.last_made` for the user's household, otherwise None"""
56 user_household_subquery = sa.select(User.household_id).where(User.id == self.user_id).scalar_subquery() 1cdefghija
57 return ( 1cdefghija
58 sa.select(HouseholdToRecipe.last_made)
59 .where(
60 HouseholdToRecipe.recipe_id == self.model.id,
61 HouseholdToRecipe.household_id == user_household_subquery,
62 )
63 .correlate(self.model)
64 .scalar_subquery()
65 )
67 def _get_rating_col_alias(self) -> sa.ColumnElement | None: 1b
68 """Computed rating which uses the user's rating if it exists, otherwise falling back to the recipe's rating"""
70 effective_rating = sa.case( 1cdefghija
71 (
72 sa.exists().where(
73 UserToRecipe.recipe_id == self.model.id,
74 UserToRecipe.user_id == self.user_id,
75 UserToRecipe.rating != None, # noqa E711
76 UserToRecipe.rating > 0,
77 ),
78 sa.select(sa.func.max(UserToRecipe.rating))
79 .where(UserToRecipe.recipe_id == self.model.id, UserToRecipe.user_id == self.user_id)
80 .correlate(self.model)
81 .scalar_subquery(),
82 ),
83 else_=sa.case(
84 (self.model.rating == 0, None),
85 else_=self.model.rating,
86 ),
87 )
88 return sa.cast(effective_rating, sa.Float) 1cdefghija
90 def create(self, document: Recipe) -> Recipe: # type: ignore 1b
91 max_retries = 10 1Ca
92 original_name: str = document.name # type: ignore 1Ca
94 for i in range(1, 11): 94 ↛ exitline 94 didn't return from function 'create' because the loop on line 94 didn't complete1Ca
95 try: 1Ca
96 return super().create(document) 1Ca
97 except IntegrityError: 1Ca
98 self.session.rollback() 1C
99 document.name = f"{original_name} ({i})" 1C
100 document.slug = create_recipe_slug(document.name) 1C
102 if i >= max_retries: 1C
103 raise 1C
105 def _delete_recipe(self, recipe: RecipeModel) -> Recipe: 1b
106 recipe_as_model = self.schema.model_validate(recipe) 1Ba
108 # first remove UserToRecipe entries so we don't run into stale data errors
109 try: 1Ba
110 user_to_recipe_delete_query = sa.delete(UserToRecipe).where(UserToRecipe.recipe_id == recipe.id) 1Ba
111 self.session.execute(user_to_recipe_delete_query) 1Ba
112 self.session.commit() 1Ba
113 except Exception:
114 self.session.rollback()
115 raise
117 # remove the recipe
118 try: 1Ba
119 self.session.delete(recipe) 1Ba
120 self.session.commit() 1Ba
121 except Exception:
122 self.session.rollback()
123 raise
125 return recipe_as_model 1Ba
127 def delete(self, value, match_key: str | None = None) -> Recipe: 1b
128 match_key = match_key or self.primary_key 1Ba
129 recipe_in_db = self._query_one(value, match_key) 1Ba
130 return self._delete_recipe(recipe_in_db) 1Ba
132 def delete_many(self, values: Iterable) -> list[Recipe]: 1b
133 query = self._query().filter(self.model.id.in_(values))
134 recipes_in_db = self.session.execute(query).unique().scalars().all()
135 results: list[Recipe] = []
137 # we create a delete statement for each row
138 # we don't delete the whole query in one statement because postgres doesn't cascade correctly
139 for recipe_in_db in recipes_in_db:
140 results.append(self._delete_recipe(recipe_in_db))
142 try:
143 self.session.commit()
144 except Exception as e:
145 self.session.rollback()
146 raise e
148 return results
150 def update_image(self, slug: str, _: str | None = None) -> int: 1b
151 entry: RecipeModel = self._query_one(match_value=slug)
152 entry.image = randint(0, 255)
153 self.session.commit()
155 return entry.image
157 def count_uncategorized(self, count=True, override_schema=None): 1b
158 return self._count_attribute(
159 attribute_name=RecipeModel.recipe_category,
160 attr_match=None,
161 count=count,
162 override_schema=override_schema,
163 )
165 def count_untagged(self, count=True, override_schema=None): 1b
166 return self._count_attribute(
167 attribute_name=RecipeModel.tags,
168 attr_match=None,
169 count=count,
170 override_schema=override_schema,
171 )
173 def _uuids_for_items(self, items: list[UUID | str] | None, model: type[SqlAlchemyBase]) -> list[UUID] | None: 1b
174 if not items: 1klmnopqrsctduvwxyefzAghija
175 return None 1klmnopqrsctduvwxyefzAghija
176 ids: list[UUID] = []
177 slugs: list[str] = []
179 for i in items:
180 if isinstance(i, UUID): 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true
181 ids.append(i)
182 else:
183 try:
184 i_as_uuid = UUID(i)
185 ids.append(i_as_uuid)
186 except ValueError:
187 slugs.append(i)
189 if not slugs: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true
190 return ids
191 additional_ids = self.session.execute(sa.select(model.id).filter(model.slug.in_(slugs))).scalars().all()
192 return ids + additional_ids
194 def page_all( # type: ignore 1b
195 self,
196 pagination: PaginationQuery,
197 override=None,
198 cookbook: ReadCookBook | None = None,
199 categories: list[UUID4 | str] | None = None,
200 tags: list[UUID4 | str] | None = None,
201 tools: list[UUID4 | str] | None = None,
202 foods: list[UUID4 | str] | None = None,
203 households: list[UUID4 | str] | None = None,
204 require_all_categories=True,
205 require_all_tags=True,
206 require_all_tools=True,
207 require_all_foods=True,
208 search: str | None = None,
209 ) -> RecipePagination:
210 # Copy this, because calling methods (e.g. tests) might rely on it not getting mutated
211 pagination_result = pagination.model_copy() 1klmnopqrsctduvwxyefzAghija
212 q = sa.select(self.model) 1klmnopqrsctduvwxyefzAghija
214 fltr = self._filter_builder() 1klmnopqrsctduvwxyefzAghija
215 q = q.filter_by(**fltr) 1klmnopqrsctduvwxyefzAghija
217 if cookbook: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true1klmnopqrsctduvwxyefzAghija
218 if pagination_result.query_filter and cookbook.query_filter_string:
219 pagination_result.query_filter = (
220 f"({pagination_result.query_filter}) AND ({cookbook.query_filter_string})"
221 )
222 else:
223 pagination_result.query_filter = cookbook.query_filter_string
224 else:
225 category_ids = self._uuids_for_items(categories, Category) 1klmnopqrsctduvwxyefzAghija
226 tag_ids = self._uuids_for_items(tags, Tag) 1klmnopqrsctduvwxyefzAghija
227 tool_ids = self._uuids_for_items(tools, Tool) 1klmnopqrsctduvwxyefzAghija
228 household_ids = self._uuids_for_items(households, Household) 1klmnopqrsctduvwxyefzAghija
229 filters = self._build_recipe_filter( 1klmnopqrsctduvwxyefzAghija
230 categories=category_ids,
231 tags=tag_ids,
232 tools=tool_ids,
233 foods=foods,
234 households=household_ids,
235 require_all_categories=require_all_categories,
236 require_all_tags=require_all_tags,
237 require_all_tools=require_all_tools,
238 require_all_foods=require_all_foods,
239 )
240 q = q.filter(*filters) 1klmnopqrsctduvwxyefzAghija
241 if search: 1klmnopqrsctduvwxyefzAghija
242 q = self.add_search_to_query(q, self.schema, search)
244 if not pagination_result.order_by and not search: 1klmnopqrsctduvwxyefzAghija
245 # default ordering if not searching
246 pagination_result.order_by = "created_at"
248 q, count, total_pages = self.add_pagination_to_query(q, pagination_result) 1klmnopqrsctduvwxyefzAghija
250 # Apply options late, so they do not get used for counting
251 q = q.options(*RecipeSummary.loader_options()) 1kcdefghija
252 try: 1kcdefghija
253 self.logger.debug(f"Recipe Pagination Query: {pagination_result}") 1kcdefghija
254 data = self.session.execute(q).scalars().unique().all() 1kcdefghija
255 except Exception as e:
256 self._log_exception(e)
257 self.session.rollback()
258 raise e
260 items = [RecipeSummary.model_validate(item) for item in data] 1kcdefghija
261 return RecipePagination( 1kcdefghija
262 page=pagination_result.page,
263 per_page=pagination_result.per_page,
264 total=count,
265 total_pages=total_pages,
266 items=items,
267 )
269 def get_by_categories(self, categories: list[RecipeCategory]) -> list[RecipeSummary]: 1b
270 """
271 get_by_categories returns all the Recipes that contain every category provided in the list
272 """
274 ids = [x.id for x in categories]
275 stmt = (
276 sa.select(RecipeModel)
277 .join(RecipeModel.recipe_category)
278 .filter(RecipeModel.recipe_category.any(Category.id.in_(ids)))
279 )
280 if self.group_id:
281 stmt = stmt.filter(RecipeModel.group_id == self.group_id)
282 if self.household_id:
283 stmt = stmt.filter(RecipeModel.household_id == self.household_id)
285 return [RecipeSummary.model_validate(x) for x in self.session.execute(stmt).unique().scalars().all()]
287 def _build_recipe_filter( 1b
288 self,
289 categories: list[UUID4] | None = None,
290 tags: list[UUID4] | None = None,
291 tools: list[UUID4] | None = None,
292 foods: list[UUID4] | None = None,
293 households: list[UUID4] | None = None,
294 require_all_categories: bool = True,
295 require_all_tags: bool = True,
296 require_all_tools: bool = True,
297 require_all_foods: bool = True,
298 ) -> list:
299 fltr: list[sa.ColumnElement] = [] 1klmnopqrsctduvwxyefzAghija
300 if self.group_id: 300 ↛ 302line 300 didn't jump to line 302 because the condition on line 300 was always true1klmnopqrsctduvwxyefzAghija
301 fltr.append(RecipeModel.group_id == self.group_id) 1klmnopqrsctduvwxyefzAghija
302 if self.household_id: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true1klmnopqrsctduvwxyefzAghija
303 fltr.append(RecipeModel.household_id == self.household_id)
305 if categories: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true1klmnopqrsctduvwxyefzAghija
306 if require_all_categories:
307 fltr.extend(RecipeModel.recipe_category.any(Category.id == cat_id) for cat_id in categories)
308 else:
309 fltr.append(RecipeModel.recipe_category.any(Category.id.in_(categories)))
311 if tags: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true1klmnopqrsctduvwxyefzAghija
312 if require_all_tags:
313 fltr.extend(RecipeModel.tags.any(Tag.id == tag_id) for tag_id in tags)
314 else:
315 fltr.append(RecipeModel.tags.any(Tag.id.in_(tags)))
317 if tools: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true1klmnopqrsctduvwxyefzAghija
318 if require_all_tools:
319 fltr.extend(RecipeModel.tools.any(Tool.id == tool_id) for tool_id in tools)
320 else:
321 fltr.append(RecipeModel.tools.any(Tool.id.in_(tools)))
322 if foods: 1klmnopqrsctduvwxyefzAghija
323 if require_all_foods:
324 fltr.extend(RecipeModel.recipe_ingredient.any(RecipeIngredientModel.food_id == food) for food in foods)
325 else:
326 fltr.append(RecipeModel.recipe_ingredient.any(RecipeIngredientModel.food_id.in_(foods)))
327 if households: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true1klmnopqrsctduvwxyefzAghija
328 fltr.append(RecipeModel.household_id.in_(households))
329 return fltr 1klmnopqrsctduvwxyefzAghija
331 def get_random(self, limit=1) -> list[Recipe]: 1b
332 stmt = sa.select(RecipeModel).order_by(sa.func.random()).limit(limit) # Postgres and SQLite specific
333 if self.group_id:
334 stmt = stmt.filter(RecipeModel.group_id == self.group_id)
335 if self.household_id:
336 stmt = stmt.filter(RecipeModel.household_id == self.household_id)
338 return [self.schema.model_validate(x) for x in self.session.execute(stmt).scalars().all()]
340 def get_by_slug(self, group_id: UUID4, slug: str) -> Recipe | None: 1b
341 stmt = sa.select(RecipeModel).filter(RecipeModel.group_id == group_id, RecipeModel.slug == slug)
342 dbrecipe = self.session.execute(stmt).scalars().one_or_none()
343 if dbrecipe is None:
344 return None
345 return self.schema.model_validate(dbrecipe)
347 def all_ids(self, group_id: UUID4) -> Sequence[UUID4]: 1b
348 stmt = sa.select(RecipeModel.id).filter(RecipeModel.group_id == group_id)
349 return self.session.execute(stmt).scalars().all()
351 def find_suggested_recipes( 1b
352 self,
353 params: RecipeSuggestionQuery,
354 food_ids: list[UUID4] | None = None,
355 tool_ids: list[UUID4] | None = None,
356 ) -> list[RecipeSuggestionResponseItem]:
357 """
358 Queries all recipes and returns the ones that are missing the least amount of foods and tools.
360 Results are ordered first by number of missing tools, then foods, and finally by the user-specified order.
361 If foods are provided, the query will prefer recipes with more matches to user-provided foods.
362 """
364 if not params.order_by:
365 params.order_by = "created_at"
367 user_food_ids = list(set(food_ids or []))
368 user_tool_ids = list(set(tool_ids or []))
370 # preserve the original lists of ids before we add on_hand items
371 food_ids_with_on_hand = user_food_ids.copy()
372 tool_ids_with_on_hand = user_tool_ids.copy()
374 if params.include_foods_on_hand and self.user_id:
375 foods_on_hand_query = (
376 sa.select(households_to_ingredient_foods.c.food_id)
377 .join(User, households_to_ingredient_foods.c.household_id == User.household_id)
378 .filter(
379 sa.not_(households_to_ingredient_foods.c.food_id.in_(food_ids_with_on_hand)),
380 User.id == self.user_id,
381 )
382 )
383 foods_on_hand = self.session.execute(foods_on_hand_query).scalars().all()
384 food_ids_with_on_hand.extend(foods_on_hand)
386 if params.include_tools_on_hand and self.user_id:
387 tools_on_hand_query = (
388 sa.select(households_to_tools.c.tool_id)
389 .join(User, households_to_tools.c.household_id == User.household_id)
390 .filter(
391 sa.not_(households_to_tools.c.tool_id.in_(tool_ids_with_on_hand)),
392 User.id == self.user_id,
393 )
394 )
395 tools_on_hand = self.session.execute(tools_on_hand_query).scalars().all()
396 tool_ids_with_on_hand.extend(tools_on_hand)
398 ## Build suggestion query
399 settings_alias = orm.aliased(RecipeSettings)
400 ingredients_alias = orm.aliased(RecipeIngredientModel)
401 tools_alias = orm.aliased(Tool)
403 q = sa.select(self.model)
404 fltr = self._filter_builder()
405 q = q.filter_by(**fltr)
407 # Tools goes first so we can order by missing tools count before foods
408 if user_tool_ids: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true
409 unmatched_tools_query = (
410 sa.select(recipes_to_tools.c.recipe_id, sa.func.count().label("unmatched_tools_count"))
411 .join(tools_alias, recipes_to_tools.c.tool_id == tools_alias.id)
412 .filter(sa.not_(tools_alias.id.in_(tool_ids_with_on_hand)))
413 .group_by(recipes_to_tools.c.recipe_id)
414 .subquery()
415 )
416 q = (
417 q.outerjoin(unmatched_tools_query, self.model.id == unmatched_tools_query.c.recipe_id)
418 .filter(
419 sa.or_(
420 unmatched_tools_query.c.unmatched_tools_count.is_(None),
421 unmatched_tools_query.c.unmatched_tools_count <= params.max_missing_tools,
422 )
423 )
424 .order_by(unmatched_tools_query.c.unmatched_tools_count.asc().nulls_first())
425 )
427 if user_food_ids: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 unmatched_foods_query = (
429 sa.select(ingredients_alias.recipe_id, sa.func.count().label("unmatched_foods_count"))
430 .filter(sa.not_(ingredients_alias.food_id.in_(food_ids_with_on_hand)))
431 .filter(ingredients_alias.food_id.isnot(None))
432 .group_by(ingredients_alias.recipe_id)
433 .subquery()
434 )
435 total_user_foods_query = (
436 sa.select(ingredients_alias.recipe_id, sa.func.count().label("total_user_foods_count"))
437 .filter(ingredients_alias.food_id.in_(user_food_ids))
438 .group_by(ingredients_alias.recipe_id)
439 .subquery()
440 )
441 q = (
442 q.join(settings_alias, self.model.settings)
443 .outerjoin(unmatched_foods_query, self.model.id == unmatched_foods_query.c.recipe_id)
444 .outerjoin(total_user_foods_query, self.model.id == total_user_foods_query.c.recipe_id)
445 .filter(
446 sa.or_(
447 unmatched_foods_query.c.unmatched_foods_count.is_(None),
448 unmatched_foods_query.c.unmatched_foods_count <= params.max_missing_foods,
449 ),
450 )
451 .order_by(
452 unmatched_foods_query.c.unmatched_foods_count.asc().nulls_first(),
453 # favor recipes with more matched foods, in case the user is looking for something specific
454 total_user_foods_query.c.total_user_foods_count.desc().nulls_last(),
455 )
456 )
458 # only include recipes that have at least one food in the user's list
459 if user_food_ids:
460 q = q.filter(total_user_foods_query.c.total_user_foods_count > 0)
462 ## Add filters and loader options
463 if self.group_id: 463 ↛ 465line 463 didn't jump to line 465 because the condition on line 463 was always true
464 q = q.filter(self.model.group_id == self.group_id)
465 if self.household_id: 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true
466 q = q.filter(self.model.household_id == self.household_id)
467 if params.query_filter:
468 try:
469 query_filter_builder = QueryFilterBuilder(params.query_filter)
470 q = query_filter_builder.filter_query(q, model=self.model)
472 except ValueError as e:
473 self.logger.error(e)
474 raise HTTPException(status_code=400, detail=str(e)) from e
476 q = self.add_order_by_to_query(q, params)
477 q = q.limit(params.limit).options(*RecipeSummary.loader_options())
479 ## Execute query
480 try:
481 data = self.session.execute(q).scalars().unique().all()
482 except Exception as e:
483 self._log_exception(e)
484 self.session.rollback()
485 raise e
487 suggestions: list[RecipeSuggestionResponseItem] = []
488 for result in data: 488 ↛ 489line 488 didn't jump to line 489 because the loop on line 488 never started
489 recipe = cast(RecipeModel, result)
491 missing_foods: list[IngredientFood] = []
492 if user_food_ids: # only check for missing foods if the user has provided a list of foods
493 seen_food_ids: set[UUID4] = set()
494 seen_food_ids.update(food_ids_with_on_hand)
495 for ingredient in recipe.recipe_ingredient:
496 if not ingredient.food:
497 continue
498 if ingredient.food.id in seen_food_ids:
499 continue
501 seen_food_ids.add(ingredient.food.id)
502 missing_foods.append(IngredientFood.model_validate(ingredient.food))
504 missing_tools: list[RecipeToolOut] = []
505 if user_tool_ids: # only check for missing tools if the user has provided a list of tools
506 seen_tool_ids: set[UUID4] = set()
507 seen_tool_ids.update(tool_ids_with_on_hand)
508 for tool in recipe.tools:
509 if tool.id in seen_tool_ids:
510 continue
512 seen_tool_ids.add(tool.id)
513 missing_tools.append(RecipeToolOut.model_validate(tool))
515 suggestion = RecipeSuggestionResponseItem(
516 recipe=RecipeSummary.model_validate(recipe),
517 missing_foods=missing_foods,
518 missing_tools=missing_tools,
519 )
520 suggestions.append(suggestion)
522 return suggestions