Coverage for opt/mealie/lib/python3.12/site-packages/mealie/repos/repository_recipes.py: 55%
269 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
1import re as re 1c
2from collections.abc import Iterable, Sequence 1c
3from random import randint 1c
4from typing import Self, cast 1c
5from uuid import UUID 1c
7import sqlalchemy as sa 1c
8from fastapi import HTTPException 1c
9from pydantic import UUID4 1c
10from sqlalchemy import orm 1c
11from sqlalchemy.exc import IntegrityError 1c
13from mealie.db.models.household import Household, HouseholdToRecipe 1c
14from mealie.db.models.recipe.category import Category 1c
15from mealie.db.models.recipe.ingredient import RecipeIngredientModel, households_to_ingredient_foods 1c
16from mealie.db.models.recipe.recipe import RecipeModel 1c
17from mealie.db.models.recipe.settings import RecipeSettings 1c
18from mealie.db.models.recipe.tag import Tag 1c
19from mealie.db.models.recipe.tool import Tool, households_to_tools, recipes_to_tools 1c
20from mealie.db.models.users.user_to_recipe import UserToRecipe 1c
21from mealie.db.models.users.users import User 1c
22from mealie.schema.cookbook.cookbook import ReadCookBook 1c
23from mealie.schema.recipe import Recipe 1c
24from mealie.schema.recipe.recipe import RecipeCategory, RecipePagination, RecipeSummary, create_recipe_slug 1c
25from mealie.schema.recipe.recipe_ingredient import IngredientFood 1c
26from mealie.schema.recipe.recipe_suggestion import RecipeSuggestionQuery, RecipeSuggestionResponseItem 1c
27from mealie.schema.recipe.recipe_tool import RecipeToolOut 1c
28from mealie.schema.response.pagination import PaginationQuery 1c
29from mealie.schema.response.query_filter import QueryFilterBuilder 1c
31from ..db.models._model_base import SqlAlchemyBase 1c
32from .repository_generic import HouseholdRepositoryGeneric 1c
35class RepositoryRecipes(HouseholdRepositoryGeneric[Recipe, RecipeModel]): 1c
36 user_id: UUID4 | None = None 1c
38 @property 1c
39 def column_aliases(self): 1c
40 if not self.user_id: 40 ↛ 41line 40 didn't jump to line 41 because the condition on line 40 was never true1ab
41 return {}
43 return { 1ab
44 "last_made": self._get_last_made_col_alias(),
45 "rating": self._get_rating_col_alias(),
46 }
48 def by_user(self: Self, user_id: UUID4) -> Self: 1c
49 """Add a user_id to the repo, which will be used to handle recipe ratings and other user-specific data"""
50 self.user_id = user_id 1adefghijklmnopb
51 return self 1adefghijklmnopb
53 def _get_last_made_col_alias(self) -> sa.ColumnElement | None: 1c
54 """Computed last_made which uses `HouseholdToRecipe.last_made` for the user's household, otherwise None"""
56 user_household_subquery = sa.select(User.household_id).where(User.id == self.user_id).scalar_subquery() 1ab
57 return ( 1ab
58 sa.select(HouseholdToRecipe.last_made)
59 .where(
60 HouseholdToRecipe.recipe_id == self.model.id,
61 HouseholdToRecipe.household_id == user_household_subquery,
62 )
63 .correlate(self.model)
64 .scalar_subquery()
65 )
67 def _get_rating_col_alias(self) -> sa.ColumnElement | None: 1c
68 """Computed rating which uses the user's rating if it exists, otherwise falling back to the recipe's rating"""
70 effective_rating = sa.case( 1ab
71 (
72 sa.exists().where(
73 UserToRecipe.recipe_id == self.model.id,
74 UserToRecipe.user_id == self.user_id,
75 UserToRecipe.rating != None, # noqa E711
76 UserToRecipe.rating > 0,
77 ),
78 sa.select(sa.func.max(UserToRecipe.rating))
79 .where(UserToRecipe.recipe_id == self.model.id, UserToRecipe.user_id == self.user_id)
80 .correlate(self.model)
81 .scalar_subquery(),
82 ),
83 else_=sa.case(
84 (self.model.rating == 0, None),
85 else_=self.model.rating,
86 ),
87 )
88 return sa.cast(effective_rating, sa.Float) 1ab
90 def create(self, document: Recipe) -> Recipe: # type: ignore 1c
91 max_retries = 10
92 original_name: str = document.name # type: ignore
94 for i in range(1, 11): 94 ↛ exitline 94 didn't return from function 'create' because the loop on line 94 didn't complete
95 try:
96 return super().create(document)
97 except IntegrityError:
98 self.session.rollback()
99 document.name = f"{original_name} ({i})"
100 document.slug = create_recipe_slug(document.name)
102 if i >= max_retries:
103 raise
105 def _delete_recipe(self, recipe: RecipeModel) -> Recipe: 1c
106 recipe_as_model = self.schema.model_validate(recipe)
108 # first remove UserToRecipe entries so we don't run into stale data errors
109 try:
110 user_to_recipe_delete_query = sa.delete(UserToRecipe).where(UserToRecipe.recipe_id == recipe.id)
111 self.session.execute(user_to_recipe_delete_query)
112 self.session.commit()
113 except Exception:
114 self.session.rollback()
115 raise
117 # remove the recipe
118 try:
119 self.session.delete(recipe)
120 self.session.commit()
121 except Exception:
122 self.session.rollback()
123 raise
125 return recipe_as_model
127 def delete(self, value, match_key: str | None = None) -> Recipe: 1c
128 match_key = match_key or self.primary_key
129 recipe_in_db = self._query_one(value, match_key)
130 return self._delete_recipe(recipe_in_db)
132 def delete_many(self, values: Iterable) -> list[Recipe]: 1c
133 query = self._query().filter(self.model.id.in_(values))
134 recipes_in_db = self.session.execute(query).unique().scalars().all()
135 results: list[Recipe] = []
137 # we create a delete statement for each row
138 # we don't delete the whole query in one statement because postgres doesn't cascade correctly
139 for recipe_in_db in recipes_in_db:
140 results.append(self._delete_recipe(recipe_in_db))
142 try:
143 self.session.commit()
144 except Exception as e:
145 self.session.rollback()
146 raise e
148 return results
150 def update_image(self, slug: str, _: str | None = None) -> int: 1c
151 entry: RecipeModel = self._query_one(match_value=slug)
152 entry.image = randint(0, 255)
153 self.session.commit()
155 return entry.image
157 def count_uncategorized(self, count=True, override_schema=None): 1c
158 return self._count_attribute(
159 attribute_name=RecipeModel.recipe_category,
160 attr_match=None,
161 count=count,
162 override_schema=override_schema,
163 )
165 def count_untagged(self, count=True, override_schema=None): 1c
166 return self._count_attribute(
167 attribute_name=RecipeModel.tags,
168 attr_match=None,
169 count=count,
170 override_schema=override_schema,
171 )
173 def _uuids_for_items(self, items: list[UUID | str] | None, model: type[SqlAlchemyBase]) -> list[UUID] | None: 1c
174 if not items: 1adefghijklmnopb
175 return None 1adefghijklmnopb
176 ids: list[UUID] = [] 1a
177 slugs: list[str] = [] 1a
179 for i in items: 1a
180 if isinstance(i, UUID): 180 ↛ 181line 180 didn't jump to line 181 because the condition on line 180 was never true1a
181 ids.append(i)
182 else:
183 try: 1a
184 i_as_uuid = UUID(i) 1a
185 ids.append(i_as_uuid)
186 except ValueError: 1a
187 slugs.append(i) 1a
189 if not slugs: 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true1a
190 return ids
191 additional_ids = self.session.execute(sa.select(model.id).filter(model.slug.in_(slugs))).scalars().all() 1a
192 return ids + additional_ids 1a
194 def page_all( # type: ignore 1c
195 self,
196 pagination: PaginationQuery,
197 override=None,
198 cookbook: ReadCookBook | None = None,
199 categories: list[UUID4 | str] | None = None,
200 tags: list[UUID4 | str] | None = None,
201 tools: list[UUID4 | str] | None = None,
202 foods: list[UUID4 | str] | None = None,
203 households: list[UUID4 | str] | None = None,
204 require_all_categories=True,
205 require_all_tags=True,
206 require_all_tools=True,
207 require_all_foods=True,
208 search: str | None = None,
209 ) -> RecipePagination:
210 # Copy this, because calling methods (e.g. tests) might rely on it not getting mutated
211 pagination_result = pagination.model_copy() 1adefghijklmnopb
212 q = sa.select(self.model) 1adefghijklmnopb
214 fltr = self._filter_builder() 1adefghijklmnopb
215 q = q.filter_by(**fltr) 1adefghijklmnopb
217 if cookbook: 217 ↛ 218line 217 didn't jump to line 218 because the condition on line 217 was never true1adefghijklmnopb
218 if pagination_result.query_filter and cookbook.query_filter_string:
219 pagination_result.query_filter = (
220 f"({pagination_result.query_filter}) AND ({cookbook.query_filter_string})"
221 )
222 else:
223 pagination_result.query_filter = cookbook.query_filter_string
224 else:
225 category_ids = self._uuids_for_items(categories, Category) 1adefghijklmnopb
226 tag_ids = self._uuids_for_items(tags, Tag) 1adefghijklmnopb
227 tool_ids = self._uuids_for_items(tools, Tool) 1adefghijklmnopb
228 household_ids = self._uuids_for_items(households, Household) 1adefghijklmnopb
229 filters = self._build_recipe_filter( 1adefghijklmnopb
230 categories=category_ids,
231 tags=tag_ids,
232 tools=tool_ids,
233 foods=foods,
234 households=household_ids,
235 require_all_categories=require_all_categories,
236 require_all_tags=require_all_tags,
237 require_all_tools=require_all_tools,
238 require_all_foods=require_all_foods,
239 )
240 q = q.filter(*filters) 1adefghijklmnopb
241 if search: 1adefghijklmnopb
242 q = self.add_search_to_query(q, self.schema, search) 1a
244 if not pagination_result.order_by and not search: 1adefghijklmnopb
245 # default ordering if not searching
246 pagination_result.order_by = "created_at" 1a
248 q, count, total_pages = self.add_pagination_to_query(q, pagination_result) 1adefghijklmnopb
250 # Apply options late, so they do not get used for counting
251 q = q.options(*RecipeSummary.loader_options()) 1ab
252 try: 1ab
253 self.logger.debug(f"Recipe Pagination Query: {pagination_result}") 1ab
254 data = self.session.execute(q).scalars().unique().all() 1ab
255 except Exception as e:
256 self._log_exception(e)
257 self.session.rollback()
258 raise e
260 items = [RecipeSummary.model_validate(item) for item in data] 1ab
261 return RecipePagination( 1ab
262 page=pagination_result.page,
263 per_page=pagination_result.per_page,
264 total=count,
265 total_pages=total_pages,
266 items=items,
267 )
269 def get_by_categories(self, categories: list[RecipeCategory]) -> list[RecipeSummary]: 1c
270 """
271 get_by_categories returns all the Recipes that contain every category provided in the list
272 """
274 ids = [x.id for x in categories]
275 stmt = (
276 sa.select(RecipeModel)
277 .join(RecipeModel.recipe_category)
278 .filter(RecipeModel.recipe_category.any(Category.id.in_(ids)))
279 )
280 if self.group_id:
281 stmt = stmt.filter(RecipeModel.group_id == self.group_id)
282 if self.household_id:
283 stmt = stmt.filter(RecipeModel.household_id == self.household_id)
285 return [RecipeSummary.model_validate(x) for x in self.session.execute(stmt).unique().scalars().all()]
287 def _build_recipe_filter( 1c
288 self,
289 categories: list[UUID4] | None = None,
290 tags: list[UUID4] | None = None,
291 tools: list[UUID4] | None = None,
292 foods: list[UUID4] | None = None,
293 households: list[UUID4] | None = None,
294 require_all_categories: bool = True,
295 require_all_tags: bool = True,
296 require_all_tools: bool = True,
297 require_all_foods: bool = True,
298 ) -> list:
299 fltr: list[sa.ColumnElement] = [] 1adefghijklmnopb
300 if self.group_id: 300 ↛ 302line 300 didn't jump to line 302 because the condition on line 300 was always true1adefghijklmnopb
301 fltr.append(RecipeModel.group_id == self.group_id) 1adefghijklmnopb
302 if self.household_id: 302 ↛ 303line 302 didn't jump to line 303 because the condition on line 302 was never true1adefghijklmnopb
303 fltr.append(RecipeModel.household_id == self.household_id)
305 if categories: 305 ↛ 306line 305 didn't jump to line 306 because the condition on line 305 was never true1adefghijklmnopb
306 if require_all_categories:
307 fltr.extend(RecipeModel.recipe_category.any(Category.id == cat_id) for cat_id in categories)
308 else:
309 fltr.append(RecipeModel.recipe_category.any(Category.id.in_(categories)))
311 if tags: 311 ↛ 312line 311 didn't jump to line 312 because the condition on line 311 was never true1adefghijklmnopb
312 if require_all_tags:
313 fltr.extend(RecipeModel.tags.any(Tag.id == tag_id) for tag_id in tags)
314 else:
315 fltr.append(RecipeModel.tags.any(Tag.id.in_(tags)))
317 if tools: 317 ↛ 318line 317 didn't jump to line 318 because the condition on line 317 was never true1adefghijklmnopb
318 if require_all_tools:
319 fltr.extend(RecipeModel.tools.any(Tool.id == tool_id) for tool_id in tools)
320 else:
321 fltr.append(RecipeModel.tools.any(Tool.id.in_(tools)))
322 if foods: 1adefghijklmnopb
323 if require_all_foods: 1a
324 fltr.extend(RecipeModel.recipe_ingredient.any(RecipeIngredientModel.food_id == food) for food in foods) 1a
325 else:
326 fltr.append(RecipeModel.recipe_ingredient.any(RecipeIngredientModel.food_id.in_(foods))) 1a
327 if households: 327 ↛ 328line 327 didn't jump to line 328 because the condition on line 327 was never true1adefghijklmnopb
328 fltr.append(RecipeModel.household_id.in_(households))
329 return fltr 1adefghijklmnopb
331 def get_random(self, limit=1) -> list[Recipe]: 1c
332 stmt = sa.select(RecipeModel).order_by(sa.func.random()).limit(limit) # Postgres and SQLite specific
333 if self.group_id:
334 stmt = stmt.filter(RecipeModel.group_id == self.group_id)
335 if self.household_id:
336 stmt = stmt.filter(RecipeModel.household_id == self.household_id)
338 return [self.schema.model_validate(x) for x in self.session.execute(stmt).scalars().all()]
340 def get_by_slug(self, group_id: UUID4, slug: str) -> Recipe | None: 1c
341 stmt = sa.select(RecipeModel).filter(RecipeModel.group_id == group_id, RecipeModel.slug == slug)
342 dbrecipe = self.session.execute(stmt).scalars().one_or_none()
343 if dbrecipe is None:
344 return None
345 return self.schema.model_validate(dbrecipe)
347 def all_ids(self, group_id: UUID4) -> Sequence[UUID4]: 1c
348 stmt = sa.select(RecipeModel.id).filter(RecipeModel.group_id == group_id)
349 return self.session.execute(stmt).scalars().all()
351 def find_suggested_recipes( 1c
352 self,
353 params: RecipeSuggestionQuery,
354 food_ids: list[UUID4] | None = None,
355 tool_ids: list[UUID4] | None = None,
356 ) -> list[RecipeSuggestionResponseItem]:
357 """
358 Queries all recipes and returns the ones that are missing the least amount of foods and tools.
360 Results are ordered first by number of missing tools, then foods, and finally by the user-specified order.
361 If foods are provided, the query will prefer recipes with more matches to user-provided foods.
362 """
364 if not params.order_by: 1ab
365 params.order_by = "created_at" 1ab
367 user_food_ids = list(set(food_ids or [])) 1ab
368 user_tool_ids = list(set(tool_ids or [])) 1ab
370 # preserve the original lists of ids before we add on_hand items
371 food_ids_with_on_hand = user_food_ids.copy() 1ab
372 tool_ids_with_on_hand = user_tool_ids.copy() 1ab
374 if params.include_foods_on_hand and self.user_id: 1ab
375 foods_on_hand_query = ( 1ab
376 sa.select(households_to_ingredient_foods.c.food_id)
377 .join(User, households_to_ingredient_foods.c.household_id == User.household_id)
378 .filter(
379 sa.not_(households_to_ingredient_foods.c.food_id.in_(food_ids_with_on_hand)),
380 User.id == self.user_id,
381 )
382 )
383 foods_on_hand = self.session.execute(foods_on_hand_query).scalars().all() 1ab
384 food_ids_with_on_hand.extend(foods_on_hand) 1ab
386 if params.include_tools_on_hand and self.user_id: 1ab
387 tools_on_hand_query = ( 1ab
388 sa.select(households_to_tools.c.tool_id)
389 .join(User, households_to_tools.c.household_id == User.household_id)
390 .filter(
391 sa.not_(households_to_tools.c.tool_id.in_(tool_ids_with_on_hand)),
392 User.id == self.user_id,
393 )
394 )
395 tools_on_hand = self.session.execute(tools_on_hand_query).scalars().all() 1ab
396 tool_ids_with_on_hand.extend(tools_on_hand) 1ab
398 ## Build suggestion query
399 settings_alias = orm.aliased(RecipeSettings) 1ab
400 ingredients_alias = orm.aliased(RecipeIngredientModel) 1ab
401 tools_alias = orm.aliased(Tool) 1ab
403 q = sa.select(self.model) 1ab
404 fltr = self._filter_builder() 1ab
405 q = q.filter_by(**fltr) 1ab
407 # Tools goes first so we can order by missing tools count before foods
408 if user_tool_ids: 408 ↛ 409line 408 didn't jump to line 409 because the condition on line 408 was never true1ab
409 unmatched_tools_query = (
410 sa.select(recipes_to_tools.c.recipe_id, sa.func.count().label("unmatched_tools_count"))
411 .join(tools_alias, recipes_to_tools.c.tool_id == tools_alias.id)
412 .filter(sa.not_(tools_alias.id.in_(tool_ids_with_on_hand)))
413 .group_by(recipes_to_tools.c.recipe_id)
414 .subquery()
415 )
416 q = (
417 q.outerjoin(unmatched_tools_query, self.model.id == unmatched_tools_query.c.recipe_id)
418 .filter(
419 sa.or_(
420 unmatched_tools_query.c.unmatched_tools_count.is_(None),
421 unmatched_tools_query.c.unmatched_tools_count <= params.max_missing_tools,
422 )
423 )
424 .order_by(unmatched_tools_query.c.unmatched_tools_count.asc().nulls_first())
425 )
427 if user_food_ids: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true1ab
428 unmatched_foods_query = (
429 sa.select(ingredients_alias.recipe_id, sa.func.count().label("unmatched_foods_count"))
430 .filter(sa.not_(ingredients_alias.food_id.in_(food_ids_with_on_hand)))
431 .filter(ingredients_alias.food_id.isnot(None))
432 .group_by(ingredients_alias.recipe_id)
433 .subquery()
434 )
435 total_user_foods_query = (
436 sa.select(ingredients_alias.recipe_id, sa.func.count().label("total_user_foods_count"))
437 .filter(ingredients_alias.food_id.in_(user_food_ids))
438 .group_by(ingredients_alias.recipe_id)
439 .subquery()
440 )
441 q = (
442 q.join(settings_alias, self.model.settings)
443 .outerjoin(unmatched_foods_query, self.model.id == unmatched_foods_query.c.recipe_id)
444 .outerjoin(total_user_foods_query, self.model.id == total_user_foods_query.c.recipe_id)
445 .filter(
446 sa.or_(
447 unmatched_foods_query.c.unmatched_foods_count.is_(None),
448 unmatched_foods_query.c.unmatched_foods_count <= params.max_missing_foods,
449 ),
450 )
451 .order_by(
452 unmatched_foods_query.c.unmatched_foods_count.asc().nulls_first(),
453 # favor recipes with more matched foods, in case the user is looking for something specific
454 total_user_foods_query.c.total_user_foods_count.desc().nulls_last(),
455 )
456 )
458 # only include recipes that have at least one food in the user's list
459 if user_food_ids:
460 q = q.filter(total_user_foods_query.c.total_user_foods_count > 0)
462 ## Add filters and loader options
463 if self.group_id: 463 ↛ 465line 463 didn't jump to line 465 because the condition on line 463 was always true1ab
464 q = q.filter(self.model.group_id == self.group_id) 1ab
465 if self.household_id: 465 ↛ 466line 465 didn't jump to line 466 because the condition on line 465 was never true1ab
466 q = q.filter(self.model.household_id == self.household_id)
467 if params.query_filter: 1ab
468 try: 1ab
469 query_filter_builder = QueryFilterBuilder(params.query_filter) 1ab
470 q = query_filter_builder.filter_query(q, model=self.model) 1ab
472 except ValueError as e:
473 self.logger.error(e)
474 raise HTTPException(status_code=400, detail=str(e)) from e
476 q = self.add_order_by_to_query(q, params) 1ab
477 q = q.limit(params.limit).options(*RecipeSummary.loader_options()) 1ab
479 ## Execute query
480 try: 1ab
481 data = self.session.execute(q).scalars().unique().all() 1ab
482 except Exception as e:
483 self._log_exception(e)
484 self.session.rollback()
485 raise e
487 suggestions: list[RecipeSuggestionResponseItem] = [] 1ab
488 for result in data: 488 ↛ 489line 488 didn't jump to line 489 because the loop on line 488 never started1ab
489 recipe = cast(RecipeModel, result)
491 missing_foods: list[IngredientFood] = []
492 if user_food_ids: # only check for missing foods if the user has provided a list of foods
493 seen_food_ids: set[UUID4] = set()
494 seen_food_ids.update(food_ids_with_on_hand)
495 for ingredient in recipe.recipe_ingredient:
496 if not ingredient.food:
497 continue
498 if ingredient.food.id in seen_food_ids:
499 continue
501 seen_food_ids.add(ingredient.food.id)
502 missing_foods.append(IngredientFood.model_validate(ingredient.food))
504 missing_tools: list[RecipeToolOut] = []
505 if user_tool_ids: # only check for missing tools if the user has provided a list of tools
506 seen_tool_ids: set[UUID4] = set()
507 seen_tool_ids.update(tool_ids_with_on_hand)
508 for tool in recipe.tools:
509 if tool.id in seen_tool_ids:
510 continue
512 seen_tool_ids.add(tool.id)
513 missing_tools.append(RecipeToolOut.model_validate(tool))
515 suggestion = RecipeSuggestionResponseItem(
516 recipe=RecipeSummary.model_validate(recipe),
517 missing_foods=missing_foods,
518 missing_tools=missing_tools,
519 )
520 suggestions.append(suggestion)
522 return suggestions 1ab