Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/household_services/shopping_lists.py: 20%
257 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1from typing import cast 1b
3from pydantic import UUID4 1b
5from mealie.core.exceptions import UnexpectedNone 1b
6from mealie.repos.all_repositories import get_repositories 1b
7from mealie.repos.repository_factory import AllRepositories 1b
8from mealie.schema.household.group_shopping_list import ( 1b
9 ShoppingListAddRecipeParamsBulk,
10 ShoppingListCreate,
11 ShoppingListItemBase,
12 ShoppingListItemCreate,
13 ShoppingListItemOut,
14 ShoppingListItemRecipeRefCreate,
15 ShoppingListItemRecipeRefOut,
16 ShoppingListItemsCollectionOut,
17 ShoppingListItemUpdate,
18 ShoppingListItemUpdateBulk,
19 ShoppingListMultiPurposeLabelCreate,
20 ShoppingListOut,
21 ShoppingListSave,
22)
23from mealie.schema.recipe.recipe_ingredient import ( 1b
24 IngredientFood,
25 IngredientUnit,
26 RecipeIngredient,
27)
28from mealie.schema.response.pagination import OrderDirection, PaginationQuery 1b
29from mealie.services.parser_services._base import DataMatcher 1b
32class ShoppingListService: 1b
33 DEFAULT_FOOD_FUZZY_MATCH_THRESHOLD = 80 1b
35 def __init__(self, repos: AllRepositories): 1b
36 self.repos = repos 1lcefdghijka
37 self.shopping_lists = repos.group_shopping_lists 1lcefdghijka
38 self.list_items = repos.group_shopping_list_item 1lcefdghijka
39 self.list_item_refs = repos.group_shopping_list_item_references 1lcefdghijka
40 self.list_refs = repos.group_shopping_list_recipe_refs 1lcefdghijka
41 self.data_matcher = DataMatcher(self.repos, food_fuzzy_match_threshold=self.DEFAULT_FOOD_FUZZY_MATCH_THRESHOLD) 1lcefdghijka
43 @staticmethod 1b
44 def can_merge(item1: ShoppingListItemBase, item2: ShoppingListItemBase) -> bool: 1b
45 """Check to see if this item can be merged with another item"""
47 if any(
48 [
49 item1.checked,
50 item2.checked,
51 item1.food_id != item2.food_id,
52 item1.unit_id != item2.unit_id,
53 ]
54 ):
55 return False
57 # if foods match, we can merge, otherwise compare the notes
58 return bool(item1.food_id) or item1.note == item2.note
60 @staticmethod 1b
61 def merge_items( 1b
62 from_item: ShoppingListItemCreate | ShoppingListItemUpdateBulk,
63 to_item: ShoppingListItemCreate | ShoppingListItemUpdateBulk | ShoppingListItemOut,
64 ) -> ShoppingListItemUpdate:
65 """
66 Takes an item and merges it into an already-existing item, then returns a copy
68 Attributes of the `to_item` take priority over the `from_item`, except extras with overlapping keys
69 """
71 to_item.quantity += from_item.quantity
72 if to_item.note != from_item.note:
73 to_item.note = " | ".join([note for note in [to_item.note, from_item.note] if note])
75 if from_item.note and to_item.note != from_item.note:
76 notes: set[str] = set(to_item.note.split(" | ")) if to_item.note else set()
77 notes.add(from_item.note)
78 to_item.note = " | ".join([note for note in notes if note])
80 if to_item.extras and from_item.extras:
81 to_item.extras.update(from_item.extras)
83 updated_refs = {ref.recipe_id: ref for ref in from_item.recipe_references}
84 for to_ref in to_item.recipe_references:
85 if to_ref.recipe_id not in updated_refs:
86 updated_refs[to_ref.recipe_id] = to_ref
87 continue
89 # merge recipe scales
90 base_ref = updated_refs[to_ref.recipe_id]
92 # if the scale is missing we assume it's 1 for backwards compatibility
93 # if the scale is 0 we leave it alone
94 if base_ref.recipe_scale is None:
95 base_ref.recipe_scale = 1
97 if to_ref.recipe_scale is None:
98 to_ref.recipe_scale = 1
100 base_ref.recipe_scale += to_ref.recipe_scale
102 return to_item.cast(ShoppingListItemUpdate, recipe_references=list(updated_refs.values()))
104 def remove_unused_recipe_references(self, shopping_list_id: UUID4) -> None: 1b
105 shopping_list = cast(ShoppingListOut, self.shopping_lists.get_one(shopping_list_id))
107 recipe_ids_to_keep: set[UUID4] = set()
108 for item in shopping_list.list_items:
109 recipe_ids_to_keep.update([ref.recipe_id for ref in item.recipe_references])
111 list_refs_to_delete: set[UUID4] = set()
112 for list_ref in shopping_list.recipe_references:
113 if list_ref.recipe_id not in recipe_ids_to_keep:
114 list_refs_to_delete.add(list_ref.id)
116 if list_refs_to_delete:
117 self.list_refs.delete_many(list_refs_to_delete)
119 def find_matching_label(self, item: ShoppingListItemBase) -> UUID4 | None: 1b
120 if item.label_id:
121 return item.label_id
122 if item.food:
123 return item.food.label_id
125 food_search = self.data_matcher.find_food_match(item.display)
126 return food_search.label_id if food_search else None
128 def bulk_create_items( 1b
129 self, create_items: list[ShoppingListItemCreate], auto_find_labels=True
130 ) -> ShoppingListItemsCollectionOut:
131 """
132 Create a list of items, merging into existing ones where possible.
133 Optionally try to find a label for each item if one isn't provided using the item's food data or display name.
134 """
136 # consolidate items to be created
137 consolidated_create_items: list[ShoppingListItemCreate] = []
138 for create_item in create_items: 138 ↛ 139line 138 didn't jump to line 139 because the loop on line 138 never started
139 merged = False
140 for i, filtered_item in enumerate(consolidated_create_items):
141 if not self.can_merge(create_item, filtered_item):
142 continue
144 consolidated_create_items[i] = self.merge_items(create_item, filtered_item).cast(ShoppingListItemCreate)
145 merged = True
146 break
148 if not merged:
149 consolidated_create_items.append(create_item)
151 create_items = consolidated_create_items
152 filtered_create_items: list[ShoppingListItemCreate] = []
154 # check to see if we can merge into any existing items
155 update_items: list[ShoppingListItemUpdateBulk] = []
156 existing_items_map: dict[UUID4, list[ShoppingListItemOut]] = {}
157 for create_item in create_items: 157 ↛ 158line 157 didn't jump to line 158 because the loop on line 157 never started
158 if create_item.shopping_list_id not in existing_items_map:
159 query = PaginationQuery(
160 per_page=-1, query_filter=f"shopping_list_id={create_item.shopping_list_id} AND checked=false"
161 )
162 items_data = self.list_items.page_all(query)
163 existing_items_map[create_item.shopping_list_id] = items_data.items
165 merged = False
166 for existing_item in existing_items_map[create_item.shopping_list_id]:
167 if not self.can_merge(existing_item, create_item):
168 continue
170 updated_existing_item = self.merge_items(create_item, existing_item).cast(
171 ShoppingListItemUpdateBulk, id=existing_item.id
172 )
173 update_items.append(updated_existing_item.cast(ShoppingListItemUpdateBulk, id=existing_item.id))
174 merged = True
175 break
177 if merged or create_item.quantity < 0:
178 continue
180 # create the item
181 if create_item.checked:
182 # checked items should not have recipe references
183 create_item.recipe_references = []
184 if auto_find_labels:
185 create_item.label_id = self.find_matching_label(create_item)
187 filtered_create_items.append(create_item)
189 created_items = self.list_items.create_many(filtered_create_items) if filtered_create_items else []
190 updated_items = self.list_items.update_many(update_items) if update_items else []
192 for list_id in {item.shopping_list_id for item in created_items + updated_items}: 192 ↛ 193line 192 didn't jump to line 193 because the loop on line 192 never started
193 self.remove_unused_recipe_references(list_id)
195 return ShoppingListItemsCollectionOut(
196 created_items=created_items, updated_items=updated_items, deleted_items=[]
197 )
199 def bulk_update_items(self, update_items: list[ShoppingListItemUpdateBulk]) -> ShoppingListItemsCollectionOut: 1b
200 # consolidate items to be created
201 consolidated_update_items: list[ShoppingListItemUpdateBulk] = []
202 delete_items: set[UUID4] = set()
203 seen_update_ids: set[UUID4] = set()
204 for update_item in update_items: 204 ↛ 206line 204 didn't jump to line 206 because the loop on line 204 never started
205 # if the same item appears multiple times in one request, ignore all but the first instance
206 if update_item.id in seen_update_ids:
207 continue
209 seen_update_ids.add(update_item.id)
211 merged = False
212 for i, filtered_item in enumerate(consolidated_update_items):
213 if not self.can_merge(update_item, filtered_item):
214 continue
216 consolidated_update_items[i] = self.merge_items(update_item, filtered_item).cast(
217 ShoppingListItemUpdateBulk, id=filtered_item.id
218 )
219 delete_items.add(update_item.id)
220 merged = True
221 break
223 if not merged:
224 consolidated_update_items.append(update_item)
226 update_items = consolidated_update_items
228 # check to see if we can merge into any existing items
229 filtered_update_items: list[ShoppingListItemUpdateBulk] = []
230 existing_items_map: dict[UUID4, list[ShoppingListItemOut]] = {}
231 for update_item in update_items: 231 ↛ 232line 231 didn't jump to line 232 because the loop on line 231 never started
232 if update_item.shopping_list_id not in existing_items_map:
233 query = PaginationQuery(
234 per_page=-1, query_filter=f"shopping_list_id={update_item.shopping_list_id} AND checked=false"
235 )
236 items_data = self.list_items.page_all(query)
237 existing_items_map[update_item.shopping_list_id] = items_data.items
239 merged = False
240 for existing_item in existing_items_map[update_item.shopping_list_id]:
241 if existing_item.id in delete_items or existing_item.id == update_item.id:
242 continue
244 if not self.can_merge(update_item, existing_item):
245 continue
247 updated_existing_item = self.merge_items(update_item, existing_item).cast(
248 ShoppingListItemUpdateBulk, id=existing_item.id
249 )
250 filtered_update_items.append(updated_existing_item)
251 delete_items.add(update_item.id)
252 merged = True
253 break
255 if merged:
256 continue
258 # update or delete the item
259 if update_item.quantity < 0:
260 delete_items.add(update_item.id)
261 continue
263 if update_item.checked:
264 # checked items should not have recipe references
265 update_item.recipe_references = []
267 filtered_update_items.append(update_item)
269 updated_items = cast(
270 list[ShoppingListItemOut],
271 self.list_items.update_many(filtered_update_items) if filtered_update_items else [], # type: ignore
272 )
274 deleted_items = cast(
275 list[ShoppingListItemOut],
276 self.list_items.delete_many(delete_items) if delete_items else [], # type: ignore
277 )
279 for list_id in {item.shopping_list_id for item in updated_items + deleted_items}: 279 ↛ 280line 279 didn't jump to line 280 because the loop on line 279 never started
280 self.remove_unused_recipe_references(list_id)
282 return ShoppingListItemsCollectionOut(
283 created_items=[], updated_items=updated_items, deleted_items=deleted_items
284 )
286 def bulk_delete_items(self, delete_items: list[UUID4]) -> ShoppingListItemsCollectionOut: 1b
287 deleted_items = cast( 1l
288 list[ShoppingListItemOut],
289 self.list_items.delete_many(set(delete_items)) if delete_items else [], # type: ignore
290 )
292 for list_id in {item.shopping_list_id for item in deleted_items}: 292 ↛ 293line 292 didn't jump to line 293 because the loop on line 292 never started1l
293 self.remove_unused_recipe_references(list_id)
295 return ShoppingListItemsCollectionOut(created_items=[], updated_items=[], deleted_items=deleted_items) 1l
297 def get_shopping_list_items_from_recipe( 1b
298 self,
299 list_id: UUID4,
300 recipe_id: UUID4,
301 scale: float = 1,
302 recipe_ingredients: list[RecipeIngredient] | None = None,
303 ) -> list[ShoppingListItemCreate]:
304 """Generates a list of new list items based on a recipe"""
306 if recipe_ingredients is None: 306 ↛ 316line 306 didn't jump to line 316 because the condition on line 306 was always true1cda
307 group_recipes_repo = get_repositories( 1cda
308 self.repos.session, group_id=self.repos.group_id, household_id=None
309 ).recipes
310 recipe = group_recipes_repo.get_one(recipe_id, "id") 1cda
311 if not recipe: 311 ↛ 314line 311 didn't jump to line 314 because the condition on line 311 was always true1cda
312 raise UnexpectedNone("Recipe not found") 1cda
314 recipe_ingredients = recipe.recipe_ingredient
316 list_items: list[ShoppingListItemCreate] = []
317 for ingredient in recipe_ingredients:
318 if isinstance(ingredient.food, IngredientFood):
319 food_id = ingredient.food.id
320 label_id = ingredient.food.label_id
322 else:
323 food_id = None
324 label_id = None
326 if isinstance(ingredient.unit, IngredientUnit):
327 unit_id = ingredient.unit.id
329 else:
330 unit_id = None
332 new_item = ShoppingListItemCreate(
333 shopping_list_id=list_id,
334 note=ingredient.note,
335 quantity=ingredient.quantity * scale if ingredient.quantity else 0,
336 food_id=food_id,
337 label_id=label_id,
338 unit_id=unit_id,
339 recipe_references=[
340 ShoppingListItemRecipeRefCreate(
341 recipe_id=recipe_id,
342 recipe_quantity=ingredient.quantity,
343 recipe_scale=scale,
344 recipe_note=ingredient.note or None,
345 )
346 ],
347 )
349 # some recipes have the same ingredient multiple times, so we check to see if we can combine them
350 merged = False
351 for existing_item in list_items:
352 if not self.can_merge(existing_item, new_item):
353 continue
355 # since this is the same recipe, we combine the quanities, rather than the scales
356 # all items will have exactly one recipe reference
357 if ingredient.quantity:
358 existing_item.quantity += ingredient.quantity
359 existing_item.recipe_references[0].recipe_quantity += ingredient.quantity # type: ignore
361 # merge notes
362 if new_item.note and existing_item.note != new_item.note:
363 notes: set[str] = set(existing_item.note.split(" | ")) if existing_item.note else set()
364 notes.add(new_item.note)
365 existing_item.note = " | ".join([note for note in notes if note])
367 merged = True
368 break
370 if not merged:
371 list_items.append(new_item)
373 return list_items
375 def add_recipe_ingredients_to_list( 1b
376 self,
377 list_id: UUID4,
378 recipe_items: list[ShoppingListAddRecipeParamsBulk],
379 ) -> tuple[ShoppingListOut, ShoppingListItemsCollectionOut]:
380 """
381 Adds recipe ingredients to a list
383 Returns a tuple of:
384 - Updated Shopping List
385 - Impacted Shopping List Items
386 """
388 items_to_create = [ 1cda
389 item
390 for recipe in recipe_items
391 for item in self.get_shopping_list_items_from_recipe(
392 list_id, recipe.recipe_id, recipe.recipe_increment_quantity, recipe.recipe_ingredients
393 )
394 ]
395 item_changes = self.bulk_create_items(items_to_create)
396 updated_list = cast(ShoppingListOut, self.shopping_lists.get_one(list_id))
398 # update list-level recipe references
399 for recipe in recipe_items:
400 ref_merged = False
401 for ref in updated_list.recipe_references:
402 if ref.recipe_id != recipe.recipe_id:
403 continue
405 ref.recipe_quantity += recipe.recipe_increment_quantity
406 ref_merged = True
407 break
409 if not ref_merged:
410 updated_list.recipe_references.append(
411 ShoppingListItemRecipeRefCreate(
412 recipe_id=recipe.recipe_id, recipe_quantity=recipe.recipe_increment_quantity
413 )
414 )
416 updated_list = self.shopping_lists.update(updated_list.id, updated_list)
417 return updated_list, item_changes
419 def remove_recipe_ingredients_from_list( 1b
420 self, list_id: UUID4, recipe_id: UUID4, recipe_decrement: float = 1
421 ) -> tuple[ShoppingListOut, ShoppingListItemsCollectionOut]:
422 """
423 Removes a recipe's ingredients from a list
425 Returns a tuple of:
426 - Updated Shopping List
427 - Impacted Shopping List Items
428 """
430 shopping_list = self.shopping_lists.get_one(list_id)
431 if shopping_list is None:
432 raise UnexpectedNone("Shopping list not found, cannot remove recipe ingredients")
434 update_items: list[ShoppingListItemUpdateBulk] = []
435 delete_items: list[UUID4] = []
436 for item in shopping_list.list_items:
437 found = False
439 refs = cast(list[ShoppingListItemRecipeRefOut], item.recipe_references)
440 for ref in refs:
441 if ref.recipe_id != recipe_id:
442 continue
444 # if the scale is missing we assume it's 1 for backwards compatibility
445 # if the scale is 0 we leave it alone
446 if ref.recipe_scale is None:
447 ref.recipe_scale = 1
449 # Set Quantity
450 if ref.recipe_scale > recipe_decrement:
451 # remove only part of the reference
452 item.quantity -= recipe_decrement * ref.recipe_quantity
454 else:
455 # remove everything that's left on the reference
456 item.quantity -= ref.recipe_scale * ref.recipe_quantity
458 # Set Reference Scale
459 ref.recipe_scale -= recipe_decrement
460 if ref.recipe_scale <= 0:
461 item.recipe_references.remove(ref)
463 found = True
464 break
466 if found:
467 # only remove a 0 quantity item if we removed its last recipe reference
468 if item.quantity < 0 or (item.quantity == 0 and not item.recipe_references):
469 delete_items.append(item.id)
471 else:
472 update_items.append(item.cast(ShoppingListItemUpdateBulk))
474 response_update = self.bulk_update_items(update_items)
476 deleted_item_ids = [item.id for item in response_update.deleted_items]
477 response_delete = self.bulk_delete_items([id for id in delete_items if id not in deleted_item_ids])
479 items = ShoppingListItemsCollectionOut(
480 created_items=response_update.created_items + response_delete.created_items,
481 updated_items=response_update.updated_items + response_delete.updated_items,
482 deleted_items=response_update.deleted_items + response_delete.deleted_items,
483 )
485 # Decrement the list recipe reference count
486 updated_list = self.shopping_lists.get_one(shopping_list.id)
487 for recipe_ref in updated_list.recipe_references: # type: ignore
488 if recipe_ref.recipe_id != recipe_id or recipe_ref.recipe_quantity is None:
489 continue
491 recipe_ref.recipe_quantity -= recipe_decrement
493 if recipe_ref.recipe_quantity <= 0.0:
494 self.list_refs.delete(recipe_ref.id)
496 else:
497 self.list_refs.update(recipe_ref.id, recipe_ref)
499 break
501 return self.shopping_lists.get_one(shopping_list.id), items # type: ignore
503 def create_one_list(self, data: ShoppingListCreate, owner_id: UUID4): 1b
504 create_data = data.cast(ShoppingListSave, group_id=self.repos.group_id, user_id=owner_id) 1cefdghijka
505 new_list = self.shopping_lists.create(create_data) # type: ignore 1cefdghijka
507 labels = self.repos.group_multi_purpose_labels.page_all( 1cefdghijka
508 PaginationQuery(page=1, per_page=-1, order_by="name", order_direction=OrderDirection.asc)
509 )
510 label_settings = [ 1cefdghijka
511 ShoppingListMultiPurposeLabelCreate(shopping_list_id=new_list.id, label_id=label.id, position=i)
512 for i, label in enumerate(labels.items)
513 ]
515 self.repos.shopping_list_multi_purpose_labels.create_many(label_settings) 1cefdghijka
516 return self.shopping_lists.get_one(new_list.id) 1cefdghijka