Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/household_services/shopping_lists.py: 20%

257 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:48 +0000

1from typing import cast 1a

2 

3from pydantic import UUID4 1a

4 

5from mealie.core.exceptions import UnexpectedNone 1a

6from mealie.repos.all_repositories import get_repositories 1a

7from mealie.repos.repository_factory import AllRepositories 1a

8from mealie.schema.household.group_shopping_list import ( 1a

9 ShoppingListAddRecipeParamsBulk, 

10 ShoppingListCreate, 

11 ShoppingListItemBase, 

12 ShoppingListItemCreate, 

13 ShoppingListItemOut, 

14 ShoppingListItemRecipeRefCreate, 

15 ShoppingListItemRecipeRefOut, 

16 ShoppingListItemsCollectionOut, 

17 ShoppingListItemUpdate, 

18 ShoppingListItemUpdateBulk, 

19 ShoppingListMultiPurposeLabelCreate, 

20 ShoppingListOut, 

21 ShoppingListSave, 

22) 

23from mealie.schema.recipe.recipe_ingredient import ( 1a

24 IngredientFood, 

25 IngredientUnit, 

26 RecipeIngredient, 

27) 

28from mealie.schema.response.pagination import OrderDirection, PaginationQuery 1a

29from mealie.services.parser_services._base import DataMatcher 1a

30 

31 

32class ShoppingListService: 1a

33 DEFAULT_FOOD_FUZZY_MATCH_THRESHOLD = 80 1a

34 

35 def __init__(self, repos: AllRepositories): 1a

36 self.repos = repos 1recdfghijklmnopqb

37 self.shopping_lists = repos.group_shopping_lists 1recdfghijklmnopqb

38 self.list_items = repos.group_shopping_list_item 1recdfghijklmnopqb

39 self.list_item_refs = repos.group_shopping_list_item_references 1recdfghijklmnopqb

40 self.list_refs = repos.group_shopping_list_recipe_refs 1recdfghijklmnopqb

41 self.data_matcher = DataMatcher(self.repos, food_fuzzy_match_threshold=self.DEFAULT_FOOD_FUZZY_MATCH_THRESHOLD) 1recdfghijklmnopqb

42 

43 @staticmethod 1a

44 def can_merge(item1: ShoppingListItemBase, item2: ShoppingListItemBase) -> bool: 1a

45 """Check to see if this item can be merged with another item""" 

46 

47 if any( 

48 [ 

49 item1.checked, 

50 item2.checked, 

51 item1.food_id != item2.food_id, 

52 item1.unit_id != item2.unit_id, 

53 ] 

54 ): 

55 return False 

56 

57 # if foods match, we can merge, otherwise compare the notes 

58 return bool(item1.food_id) or item1.note == item2.note 

59 

60 @staticmethod 1a

61 def merge_items( 1a

62 from_item: ShoppingListItemCreate | ShoppingListItemUpdateBulk, 

63 to_item: ShoppingListItemCreate | ShoppingListItemUpdateBulk | ShoppingListItemOut, 

64 ) -> ShoppingListItemUpdate: 

65 """ 

66 Takes an item and merges it into an already-existing item, then returns a copy 

67 

68 Attributes of the `to_item` take priority over the `from_item`, except extras with overlapping keys 

69 """ 

70 

71 to_item.quantity += from_item.quantity 

72 if to_item.note != from_item.note: 

73 to_item.note = " | ".join([note for note in [to_item.note, from_item.note] if note]) 

74 

75 if from_item.note and to_item.note != from_item.note: 

76 notes: set[str] = set(to_item.note.split(" | ")) if to_item.note else set() 

77 notes.add(from_item.note) 

78 to_item.note = " | ".join([note for note in notes if note]) 

79 

80 if to_item.extras and from_item.extras: 

81 to_item.extras.update(from_item.extras) 

82 

83 updated_refs = {ref.recipe_id: ref for ref in from_item.recipe_references} 

84 for to_ref in to_item.recipe_references: 

85 if to_ref.recipe_id not in updated_refs: 

86 updated_refs[to_ref.recipe_id] = to_ref 

87 continue 

88 

89 # merge recipe scales 

90 base_ref = updated_refs[to_ref.recipe_id] 

91 

92 # if the scale is missing we assume it's 1 for backwards compatibility 

93 # if the scale is 0 we leave it alone 

94 if base_ref.recipe_scale is None: 

95 base_ref.recipe_scale = 1 

96 

97 if to_ref.recipe_scale is None: 

98 to_ref.recipe_scale = 1 

99 

100 base_ref.recipe_scale += to_ref.recipe_scale 

101 

102 return to_item.cast(ShoppingListItemUpdate, recipe_references=list(updated_refs.values())) 

103 

104 def remove_unused_recipe_references(self, shopping_list_id: UUID4) -> None: 1a

105 shopping_list = cast(ShoppingListOut, self.shopping_lists.get_one(shopping_list_id)) 

106 

107 recipe_ids_to_keep: set[UUID4] = set() 

108 for item in shopping_list.list_items: 

109 recipe_ids_to_keep.update([ref.recipe_id for ref in item.recipe_references]) 

110 

111 list_refs_to_delete: set[UUID4] = set() 

112 for list_ref in shopping_list.recipe_references: 

113 if list_ref.recipe_id not in recipe_ids_to_keep: 

114 list_refs_to_delete.add(list_ref.id) 

115 

116 if list_refs_to_delete: 

117 self.list_refs.delete_many(list_refs_to_delete) 

118 

119 def find_matching_label(self, item: ShoppingListItemBase) -> UUID4 | None: 1a

120 if item.label_id: 

121 return item.label_id 

122 if item.food: 

123 return item.food.label_id 

124 

125 food_search = self.data_matcher.find_food_match(item.display) 

126 return food_search.label_id if food_search else None 

127 

128 def bulk_create_items( 1a

129 self, create_items: list[ShoppingListItemCreate], auto_find_labels=True 

130 ) -> ShoppingListItemsCollectionOut: 

131 """ 

132 Create a list of items, merging into existing ones where possible. 

133 Optionally try to find a label for each item if one isn't provided using the item's food data or display name. 

134 """ 

135 

136 # consolidate items to be created 

137 consolidated_create_items: list[ShoppingListItemCreate] = [] 1eb

138 for create_item in create_items: 138 ↛ 139line 138 didn't jump to line 139 because the loop on line 138 never started1eb

139 merged = False 

140 for i, filtered_item in enumerate(consolidated_create_items): 

141 if not self.can_merge(create_item, filtered_item): 

142 continue 

143 

144 consolidated_create_items[i] = self.merge_items(create_item, filtered_item).cast(ShoppingListItemCreate) 

145 merged = True 

146 break 

147 

148 if not merged: 

149 consolidated_create_items.append(create_item) 

150 

151 create_items = consolidated_create_items 1eb

152 filtered_create_items: list[ShoppingListItemCreate] = [] 1eb

153 

154 # check to see if we can merge into any existing items 

155 update_items: list[ShoppingListItemUpdateBulk] = [] 1eb

156 existing_items_map: dict[UUID4, list[ShoppingListItemOut]] = {} 1eb

157 for create_item in create_items: 157 ↛ 158line 157 didn't jump to line 158 because the loop on line 157 never started1eb

158 if create_item.shopping_list_id not in existing_items_map: 

159 query = PaginationQuery( 

160 per_page=-1, query_filter=f"shopping_list_id={create_item.shopping_list_id} AND checked=false" 

161 ) 

162 items_data = self.list_items.page_all(query) 

163 existing_items_map[create_item.shopping_list_id] = items_data.items 

164 

165 merged = False 

166 for existing_item in existing_items_map[create_item.shopping_list_id]: 

167 if not self.can_merge(existing_item, create_item): 

168 continue 

169 

170 updated_existing_item = self.merge_items(create_item, existing_item).cast( 

171 ShoppingListItemUpdateBulk, id=existing_item.id 

172 ) 

173 update_items.append(updated_existing_item.cast(ShoppingListItemUpdateBulk, id=existing_item.id)) 

174 merged = True 

175 break 

176 

177 if merged or create_item.quantity < 0: 

178 continue 

179 

180 # create the item 

181 if create_item.checked: 

182 # checked items should not have recipe references 

183 create_item.recipe_references = [] 

184 if auto_find_labels: 

185 create_item.label_id = self.find_matching_label(create_item) 

186 

187 filtered_create_items.append(create_item) 

188 

189 created_items = self.list_items.create_many(filtered_create_items) if filtered_create_items else [] 1eb

190 updated_items = self.list_items.update_many(update_items) if update_items else [] 1eb

191 

192 for list_id in {item.shopping_list_id for item in created_items + updated_items}: 192 ↛ 193line 192 didn't jump to line 193 because the loop on line 192 never started1eb

193 self.remove_unused_recipe_references(list_id) 

194 

195 return ShoppingListItemsCollectionOut( 1eb

196 created_items=created_items, updated_items=updated_items, deleted_items=[] 

197 ) 

198 

199 def bulk_update_items(self, update_items: list[ShoppingListItemUpdateBulk]) -> ShoppingListItemsCollectionOut: 1a

200 # consolidate items to be created 

201 consolidated_update_items: list[ShoppingListItemUpdateBulk] = [] 1c

202 delete_items: set[UUID4] = set() 1c

203 seen_update_ids: set[UUID4] = set() 1c

204 for update_item in update_items: 204 ↛ 206line 204 didn't jump to line 206 because the loop on line 204 never started1c

205 # if the same item appears multiple times in one request, ignore all but the first instance 

206 if update_item.id in seen_update_ids: 

207 continue 

208 

209 seen_update_ids.add(update_item.id) 

210 

211 merged = False 

212 for i, filtered_item in enumerate(consolidated_update_items): 

213 if not self.can_merge(update_item, filtered_item): 

214 continue 

215 

216 consolidated_update_items[i] = self.merge_items(update_item, filtered_item).cast( 

217 ShoppingListItemUpdateBulk, id=filtered_item.id 

218 ) 

219 delete_items.add(update_item.id) 

220 merged = True 

221 break 

222 

223 if not merged: 

224 consolidated_update_items.append(update_item) 

225 

226 update_items = consolidated_update_items 1c

227 

228 # check to see if we can merge into any existing items 

229 filtered_update_items: list[ShoppingListItemUpdateBulk] = [] 1c

230 existing_items_map: dict[UUID4, list[ShoppingListItemOut]] = {} 1c

231 for update_item in update_items: 231 ↛ 232line 231 didn't jump to line 232 because the loop on line 231 never started1c

232 if update_item.shopping_list_id not in existing_items_map: 

233 query = PaginationQuery( 

234 per_page=-1, query_filter=f"shopping_list_id={update_item.shopping_list_id} AND checked=false" 

235 ) 

236 items_data = self.list_items.page_all(query) 

237 existing_items_map[update_item.shopping_list_id] = items_data.items 

238 

239 merged = False 

240 for existing_item in existing_items_map[update_item.shopping_list_id]: 

241 if existing_item.id in delete_items or existing_item.id == update_item.id: 

242 continue 

243 

244 if not self.can_merge(update_item, existing_item): 

245 continue 

246 

247 updated_existing_item = self.merge_items(update_item, existing_item).cast( 

248 ShoppingListItemUpdateBulk, id=existing_item.id 

249 ) 

250 filtered_update_items.append(updated_existing_item) 

251 delete_items.add(update_item.id) 

252 merged = True 

253 break 

254 

255 if merged: 

256 continue 

257 

258 # update or delete the item 

259 if update_item.quantity < 0: 

260 delete_items.add(update_item.id) 

261 continue 

262 

263 if update_item.checked: 

264 # checked items should not have recipe references 

265 update_item.recipe_references = [] 

266 

267 filtered_update_items.append(update_item) 

268 

269 updated_items = cast( 1c

270 list[ShoppingListItemOut], 

271 self.list_items.update_many(filtered_update_items) if filtered_update_items else [], # type: ignore 

272 ) 

273 

274 deleted_items = cast( 1c

275 list[ShoppingListItemOut], 

276 self.list_items.delete_many(delete_items) if delete_items else [], # type: ignore 

277 ) 

278 

279 for list_id in {item.shopping_list_id for item in updated_items + deleted_items}: 279 ↛ 280line 279 didn't jump to line 280 because the loop on line 279 never started1c

280 self.remove_unused_recipe_references(list_id) 

281 

282 return ShoppingListItemsCollectionOut( 1c

283 created_items=[], updated_items=updated_items, deleted_items=deleted_items 

284 ) 

285 

286 def bulk_delete_items(self, delete_items: list[UUID4]) -> ShoppingListItemsCollectionOut: 1a

287 deleted_items = cast( 1r

288 list[ShoppingListItemOut], 

289 self.list_items.delete_many(set(delete_items)) if delete_items else [], # type: ignore 

290 ) 

291 

292 for list_id in {item.shopping_list_id for item in deleted_items}: 292 ↛ 293line 292 didn't jump to line 293 because the loop on line 292 never started1r

293 self.remove_unused_recipe_references(list_id) 

294 

295 return ShoppingListItemsCollectionOut(created_items=[], updated_items=[], deleted_items=deleted_items) 1r

296 

297 def get_shopping_list_items_from_recipe( 1a

298 self, 

299 list_id: UUID4, 

300 recipe_id: UUID4, 

301 scale: float = 1, 

302 recipe_ingredients: list[RecipeIngredient] | None = None, 

303 ) -> list[ShoppingListItemCreate]: 

304 """Generates a list of new list items based on a recipe""" 

305 

306 if recipe_ingredients is None: 306 ↛ 316line 306 didn't jump to line 316 because the condition on line 306 was always true1d

307 group_recipes_repo = get_repositories( 1d

308 self.repos.session, group_id=self.repos.group_id, household_id=None 

309 ).recipes 

310 recipe = group_recipes_repo.get_one(recipe_id, "id") 1d

311 if not recipe: 311 ↛ 314line 311 didn't jump to line 314 because the condition on line 311 was always true1d

312 raise UnexpectedNone("Recipe not found") 1d

313 

314 recipe_ingredients = recipe.recipe_ingredient 

315 

316 list_items: list[ShoppingListItemCreate] = [] 

317 for ingredient in recipe_ingredients: 

318 if isinstance(ingredient.food, IngredientFood): 

319 food_id = ingredient.food.id 

320 label_id = ingredient.food.label_id 

321 

322 else: 

323 food_id = None 

324 label_id = None 

325 

326 if isinstance(ingredient.unit, IngredientUnit): 

327 unit_id = ingredient.unit.id 

328 

329 else: 

330 unit_id = None 

331 

332 new_item = ShoppingListItemCreate( 

333 shopping_list_id=list_id, 

334 note=ingredient.note, 

335 quantity=ingredient.quantity * scale if ingredient.quantity else 0, 

336 food_id=food_id, 

337 label_id=label_id, 

338 unit_id=unit_id, 

339 recipe_references=[ 

340 ShoppingListItemRecipeRefCreate( 

341 recipe_id=recipe_id, 

342 recipe_quantity=ingredient.quantity, 

343 recipe_scale=scale, 

344 recipe_note=ingredient.note or None, 

345 ) 

346 ], 

347 ) 

348 

349 # some recipes have the same ingredient multiple times, so we check to see if we can combine them 

350 merged = False 

351 for existing_item in list_items: 

352 if not self.can_merge(existing_item, new_item): 

353 continue 

354 

355 # since this is the same recipe, we combine the quanities, rather than the scales 

356 # all items will have exactly one recipe reference 

357 if ingredient.quantity: 

358 existing_item.quantity += ingredient.quantity 

359 existing_item.recipe_references[0].recipe_quantity += ingredient.quantity # type: ignore 

360 

361 # merge notes 

362 if new_item.note and existing_item.note != new_item.note: 

363 notes: set[str] = set(existing_item.note.split(" | ")) if existing_item.note else set() 

364 notes.add(new_item.note) 

365 existing_item.note = " | ".join([note for note in notes if note]) 

366 

367 merged = True 

368 break 

369 

370 if not merged: 

371 list_items.append(new_item) 

372 

373 return list_items 

374 

375 def add_recipe_ingredients_to_list( 1a

376 self, 

377 list_id: UUID4, 

378 recipe_items: list[ShoppingListAddRecipeParamsBulk], 

379 ) -> tuple[ShoppingListOut, ShoppingListItemsCollectionOut]: 

380 """ 

381 Adds recipe ingredients to a list 

382 

383 Returns a tuple of: 

384 - Updated Shopping List 

385 - Impacted Shopping List Items 

386 """ 

387 

388 items_to_create = [ 1d

389 item 

390 for recipe in recipe_items 

391 for item in self.get_shopping_list_items_from_recipe( 

392 list_id, recipe.recipe_id, recipe.recipe_increment_quantity, recipe.recipe_ingredients 

393 ) 

394 ] 

395 item_changes = self.bulk_create_items(items_to_create) 

396 updated_list = cast(ShoppingListOut, self.shopping_lists.get_one(list_id)) 

397 

398 # update list-level recipe references 

399 for recipe in recipe_items: 

400 ref_merged = False 

401 for ref in updated_list.recipe_references: 

402 if ref.recipe_id != recipe.recipe_id: 

403 continue 

404 

405 ref.recipe_quantity += recipe.recipe_increment_quantity 

406 ref_merged = True 

407 break 

408 

409 if not ref_merged: 

410 updated_list.recipe_references.append( 

411 ShoppingListItemRecipeRefCreate( 

412 recipe_id=recipe.recipe_id, recipe_quantity=recipe.recipe_increment_quantity 

413 ) 

414 ) 

415 

416 updated_list = self.shopping_lists.update(updated_list.id, updated_list) 

417 return updated_list, item_changes 

418 

419 def remove_recipe_ingredients_from_list( 1a

420 self, list_id: UUID4, recipe_id: UUID4, recipe_decrement: float = 1 

421 ) -> tuple[ShoppingListOut, ShoppingListItemsCollectionOut]: 

422 """ 

423 Removes a recipe's ingredients from a list 

424 

425 Returns a tuple of: 

426 - Updated Shopping List 

427 - Impacted Shopping List Items 

428 """ 

429 

430 shopping_list = self.shopping_lists.get_one(list_id) 

431 if shopping_list is None: 

432 raise UnexpectedNone("Shopping list not found, cannot remove recipe ingredients") 

433 

434 update_items: list[ShoppingListItemUpdateBulk] = [] 

435 delete_items: list[UUID4] = [] 

436 for item in shopping_list.list_items: 

437 found = False 

438 

439 refs = cast(list[ShoppingListItemRecipeRefOut], item.recipe_references) 

440 for ref in refs: 

441 if ref.recipe_id != recipe_id: 

442 continue 

443 

444 # if the scale is missing we assume it's 1 for backwards compatibility 

445 # if the scale is 0 we leave it alone 

446 if ref.recipe_scale is None: 

447 ref.recipe_scale = 1 

448 

449 # Set Quantity 

450 if ref.recipe_scale > recipe_decrement: 

451 # remove only part of the reference 

452 item.quantity -= recipe_decrement * ref.recipe_quantity 

453 

454 else: 

455 # remove everything that's left on the reference 

456 item.quantity -= ref.recipe_scale * ref.recipe_quantity 

457 

458 # Set Reference Scale 

459 ref.recipe_scale -= recipe_decrement 

460 if ref.recipe_scale <= 0: 

461 item.recipe_references.remove(ref) 

462 

463 found = True 

464 break 

465 

466 if found: 

467 # only remove a 0 quantity item if we removed its last recipe reference 

468 if item.quantity < 0 or (item.quantity == 0 and not item.recipe_references): 

469 delete_items.append(item.id) 

470 

471 else: 

472 update_items.append(item.cast(ShoppingListItemUpdateBulk)) 

473 

474 response_update = self.bulk_update_items(update_items) 

475 

476 deleted_item_ids = [item.id for item in response_update.deleted_items] 

477 response_delete = self.bulk_delete_items([id for id in delete_items if id not in deleted_item_ids]) 

478 

479 items = ShoppingListItemsCollectionOut( 

480 created_items=response_update.created_items + response_delete.created_items, 

481 updated_items=response_update.updated_items + response_delete.updated_items, 

482 deleted_items=response_update.deleted_items + response_delete.deleted_items, 

483 ) 

484 

485 # Decrement the list recipe reference count 

486 updated_list = self.shopping_lists.get_one(shopping_list.id) 

487 for recipe_ref in updated_list.recipe_references: # type: ignore 

488 if recipe_ref.recipe_id != recipe_id or recipe_ref.recipe_quantity is None: 

489 continue 

490 

491 recipe_ref.recipe_quantity -= recipe_decrement 

492 

493 if recipe_ref.recipe_quantity <= 0.0: 

494 self.list_refs.delete(recipe_ref.id) 

495 

496 else: 

497 self.list_refs.update(recipe_ref.id, recipe_ref) 

498 

499 break 

500 

501 return self.shopping_lists.get_one(shopping_list.id), items # type: ignore 

502 

503 def create_one_list(self, data: ShoppingListCreate, owner_id: UUID4): 1a

504 create_data = data.cast(ShoppingListSave, group_id=self.repos.group_id, user_id=owner_id) 1dfghijklmnopqb

505 new_list = self.shopping_lists.create(create_data) # type: ignore 1dfghijklmnopqb

506 

507 labels = self.repos.group_multi_purpose_labels.page_all( 1dfghijklmnopqb

508 PaginationQuery(page=1, per_page=-1, order_by="name", order_direction=OrderDirection.asc) 

509 ) 

510 label_settings = [ 1dfghijklmnopqb

511 ShoppingListMultiPurposeLabelCreate(shopping_list_id=new_list.id, label_id=label.id, position=i) 

512 for i, label in enumerate(labels.items) 

513 ] 

514 

515 self.repos.shopping_list_multi_purpose_labels.create_many(label_settings) 1dfghijklmnopqb

516 return self.shopping_lists.get_one(new_list.id) 1dfghijklmnopqb