Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/recipe/recipe_service.py: 50%

296 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:48 +0000

1import json 1b

2import os 1b

3import shutil 1b

4from datetime import UTC, datetime 1b

5from pathlib import Path 1b

6from shutil import copytree, rmtree 1b

7from typing import Any 1b

8from uuid import UUID, uuid4 1b

9from zipfile import ZipFile 1b

10 

11from fastapi import UploadFile 1b

12 

13from mealie.core import exceptions 1b

14from mealie.core.config import get_app_settings 1b

15from mealie.core.dependencies.dependencies import get_temporary_path 1b

16from mealie.lang.providers import Translator 1b

17from mealie.pkgs import cache 1b

18from mealie.repos.all_repositories import get_repositories 1b

19from mealie.repos.repository_factory import AllRepositories 1b

20from mealie.repos.repository_generic import RepositoryGeneric 1b

21from mealie.schema.household.household import HouseholdInDB, HouseholdRecipeUpdate 1b

22from mealie.schema.openai.recipe import OpenAIRecipe 1b

23from mealie.schema.recipe.recipe import CreateRecipe, Recipe, create_recipe_slug 1b

24from mealie.schema.recipe.recipe_ingredient import RecipeIngredient 1b

25from mealie.schema.recipe.recipe_notes import RecipeNote 1b

26from mealie.schema.recipe.recipe_settings import RecipeSettings 1b

27from mealie.schema.recipe.recipe_step import RecipeStep 1b

28from mealie.schema.recipe.recipe_timeline_events import RecipeTimelineEventCreate, TimelineEventType 1b

29from mealie.schema.recipe.request_helpers import RecipeDuplicate 1b

30from mealie.schema.user.user import PrivateUser, UserRatingCreate 1b

31from mealie.services._base_service import BaseService 1b

32from mealie.services.household_services.household_service import HouseholdService 1b

33from mealie.services.openai import OpenAIDataInjection, OpenAILocalImage, OpenAIService 1b

34from mealie.services.recipe.recipe_data_service import RecipeDataService 1b

35from mealie.services.scraper import cleaner 1b

36 

37from .template_service import TemplateService 1b

38 

39 

40class RecipeServiceBase(BaseService): 1b

41 def __init__(self, repos: AllRepositories, user: PrivateUser, household: HouseholdInDB, translator: Translator): 1b

42 self.repos = repos 1qcdefghijklmnopa

43 self.user = user 1qcdefghijklmnopa

44 self.household = household 1qcdefghijklmnopa

45 

46 if repos.group_id != user.group_id != household.group_id: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true1qcdefghijklmnopa

47 raise Exception("group ids do not match") 

48 if repos.household_id != user.household_id != household.id: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true1qcdefghijklmnopa

49 raise Exception("household ids do not match") 

50 

51 self.group_recipes = get_repositories(repos.session, group_id=repos.group_id, household_id=None).recipes 1qcdefghijklmnopa

52 """Recipes repo without a Household filter""" 1qcdefghijklmnopa

53 

54 self.translator = translator 1qcdefghijklmnopa

55 self.t = translator.t 1qcdefghijklmnopa

56 

57 super().__init__() 1qcdefghijklmnopa

58 

59 

60class RecipeService(RecipeServiceBase): 1b

61 def _get_recipe(self, data: str | UUID, key: str | None = None) -> Recipe: 1b

62 recipe = self.group_recipes.get_one(data, key) 1qcdefghijklmnopa

63 if recipe is None: 1qcdefghijklmnopa

64 raise exceptions.NoEntryFound("Recipe not found.") 1qcdefghijklmnopa

65 return recipe 1cda

66 

67 def can_delete(self, recipe: Recipe) -> bool: 1b

68 if self.user.admin: 

69 return True 

70 else: 

71 return self.can_update(recipe) 

72 

73 def can_update(self, recipe: Recipe) -> bool: 1b

74 if recipe.settings is None: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true1cda

75 raise exceptions.UnexpectedNone("Recipe Settings is None") 

76 

77 # Check if this user owns the recipe 

78 if self.user.id == recipe.user_id: 78 ↛ 82line 78 didn't jump to line 82 because the condition on line 78 was always true1cda

79 return True 1cda

80 

81 # Check if this user has permission to edit this recipe 

82 if self.household.id != recipe.household_id: 

83 other_household = self.repos.households.get_one(recipe.household_id) 

84 if not (other_household and other_household.preferences): 

85 return False 

86 if other_household.preferences.lock_recipe_edits_from_other_households: 

87 return False 

88 if recipe.settings.locked: 

89 return False 

90 

91 return True 

92 

93 def can_lock_unlock(self, recipe: Recipe) -> bool: 1b

94 return recipe.user_id == self.user.id 

95 

96 def check_assets(self, recipe: Recipe, original_slug: str) -> None: 1b

97 """Checks if the recipe slug has changed, and if so moves the assets to a new file with the new slug.""" 

98 if original_slug != recipe.slug: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true1ca

99 current_dir = self.directories.RECIPE_DATA_DIR.joinpath(original_slug) 

100 

101 try: 

102 copytree(current_dir, recipe.directory, dirs_exist_ok=True) 

103 self.logger.debug(f"Renaming Recipe Directory: {original_slug} -> {recipe.slug}") 

104 except FileNotFoundError: 

105 self.logger.error(f"Recipe Directory not Found: {original_slug}") 

106 

107 if recipe.assets is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true1ca

108 recipe.assets = [] 

109 

110 all_asset_files = [x.file_name for x in recipe.assets] 1ca

111 

112 for file in recipe.asset_dir.iterdir(): 1ca

113 if file.is_dir(): 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true1ca

114 continue 

115 if file.name not in all_asset_files: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true1ca

116 file.unlink() 

117 

118 def delete_assets(self, recipe: Recipe) -> None: 1b

119 recipe_dir = recipe.directory 

120 rmtree(recipe_dir, ignore_errors=True) 

121 self.logger.info(f"Recipe Directory Removed: {recipe.slug}") 

122 

123 def _recipe_creation_factory(self, name: str, additional_attrs: dict | None = None) -> Recipe: 1b

124 """ 

125 The main creation point for recipes. The factor method returns an instance of the 

126 Recipe Schema class with the appropriate defaults set. Recipes should not be created 

127 elsewhere to avoid conflicts. 

128 """ 

129 additional_attrs = additional_attrs or {} 

130 additional_attrs["name"] = name 

131 additional_attrs["user_id"] = self.user.id 

132 additional_attrs["household_id"] = self.household.id 

133 additional_attrs["group_id"] = self.household.group_id 

134 

135 if additional_attrs.get("tags"): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true

136 for i in range(len(additional_attrs.get("tags", []))): 

137 additional_attrs["tags"][i]["group_id"] = self.user.group_id 

138 

139 if not additional_attrs.get("recipe_ingredient"): 

140 additional_attrs["recipe_ingredient"] = [ 

141 RecipeIngredient(note=self.t("recipe.recipe-defaults.ingredient-note")) 

142 ] 

143 

144 if not additional_attrs.get("recipe_instructions"): 

145 additional_attrs["recipe_instructions"] = [RecipeStep(text=self.t("recipe.recipe-defaults.step-text"))] 

146 

147 return Recipe(**additional_attrs) 

148 

149 def get_one(self, slug_or_id: str | UUID) -> Recipe: 1b

150 if isinstance(slug_or_id, str): 1qcdefghijklmnopa

151 try: 1cdefghijklmnopa

152 slug_or_id = UUID(slug_or_id) 1cdefghijklmnopa

153 except ValueError: 1cdefghijklmnopa

154 pass 1cdefghijklmnopa

155 

156 if isinstance(slug_or_id, UUID): 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true1qcdefghijklmnopa

157 return self._get_recipe(slug_or_id, "id") 

158 

159 else: 

160 return self._get_recipe(slug_or_id, "slug") 1qcdefghijklmnopa

161 

162 def create_one(self, create_data: Recipe | CreateRecipe) -> Recipe: 1b

163 if create_data.name is None: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true

164 create_data.name = "New Recipe" 

165 

166 data: Recipe = self._recipe_creation_factory(name=create_data.name, additional_attrs=create_data.model_dump()) 

167 

168 if isinstance(create_data, CreateRecipe) or create_data.settings is None: 168 ↛ 180line 168 didn't jump to line 180 because the condition on line 168 was always true

169 if self.household.preferences is not None: 169 ↛ 178line 169 didn't jump to line 178 because the condition on line 169 was always true

170 data.settings = RecipeSettings( 

171 public=self.household.preferences.recipe_public, 

172 show_nutrition=self.household.preferences.recipe_show_nutrition, 

173 show_assets=self.household.preferences.recipe_show_assets, 

174 landscape_view=self.household.preferences.recipe_landscape_view, 

175 disable_comments=self.household.preferences.recipe_disable_comments, 

176 ) 

177 else: 

178 data.settings = RecipeSettings() 

179 

180 rating_input = data.rating 

181 data.last_made = None 

182 new_recipe = self.repos.recipes.create(data) 

183 

184 # convert rating into user rating 

185 if rating_input: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true

186 self.repos.user_ratings.create( 

187 UserRatingCreate( 

188 user_id=self.user.id, 

189 recipe_id=new_recipe.id, 

190 rating=rating_input, 

191 is_favorite=False, 

192 ) 

193 ) 

194 

195 # create first timeline entry 

196 timeline_event_data = RecipeTimelineEventCreate( 

197 user_id=new_recipe.user_id, 

198 recipe_id=new_recipe.id, 

199 subject=self.t("recipe.recipe-created"), 

200 event_type=TimelineEventType.system, 

201 timestamp=new_recipe.created_at or datetime.now(UTC), 

202 ) 

203 

204 self.repos.recipe_timeline_events.create(timeline_event_data) 

205 return new_recipe 

206 

207 def _transform_user_id(self, user_id: str) -> str: 1b

208 query = self.repos.users.get_one(user_id) 

209 if query: 

210 return user_id 

211 else: 

212 # default to the current user 

213 return str(self.user.id) 

214 

215 def _transform_category_or_tag(self, data: dict, repo: RepositoryGeneric) -> dict: 1b

216 slug = data.get("slug") 

217 if not slug: 

218 return data 

219 

220 # if the item exists, return the actual data 

221 query = repo.get_one(slug, "slug") 

222 if query: 

223 return query.model_dump() 

224 

225 # otherwise, create the item 

226 new_item = repo.create(data) 

227 return new_item.model_dump() 

228 

229 def _process_recipe_data(self, key: str, data: list | dict | Any): 1b

230 if isinstance(data, list): 

231 return [self._process_recipe_data(key, item) for item in data] 

232 

233 elif isinstance(data, str): 

234 # make sure the user is valid 

235 if key == "user_id": 

236 return self._transform_user_id(str(data)) 

237 

238 return data 

239 

240 elif not isinstance(data, dict): 

241 return data 

242 

243 # force group_id and household_id to match the group id of the current user 

244 data["group_id"] = str(self.user.group_id) 

245 data["household_id"] = str(self.user.household_id) 

246 

247 # make sure categories and tags are valid 

248 if key == "recipe_category": 

249 return self._transform_category_or_tag(data, self.repos.categories) 

250 elif key == "tags": 

251 return self._transform_category_or_tag(data, self.repos.tags) 

252 

253 # recursively process other objects 

254 for k, v in data.items(): 

255 data[k] = self._process_recipe_data(k, v) 

256 

257 return data 

258 

259 def clean_recipe_dict(self, recipe: dict[str, Any]) -> dict[str, Any]: 1b

260 return self._process_recipe_data("recipe", recipe) 

261 

262 def create_from_zip(self, archive: UploadFile, temp_path: Path) -> Recipe: 1b

263 """ 

264 `create_from_zip` creates a recipe in the database from a zip file exported from Mealie. This is NOT 

265 a generic import from a zip file. 

266 """ 

267 with temp_path.open("wb") as buffer: 1c

268 shutil.copyfileobj(archive.file, buffer) 1c

269 

270 recipe_dict: dict | None = None 1c

271 recipe_image: bytes | None = None 1c

272 

273 with ZipFile(temp_path) as myzip: 1c

274 for file in myzip.namelist(): 

275 if file.endswith(".json"): 

276 with myzip.open(file) as myfile: 

277 recipe_dict = json.loads(myfile.read()) 

278 elif file.endswith(".webp"): 

279 with myzip.open(file) as myfile: 

280 recipe_image = myfile.read() 

281 

282 if recipe_dict is None: 

283 raise exceptions.UnexpectedNone("No json data found in Zip") 

284 

285 recipe = self.create_one(Recipe(**self.clean_recipe_dict(recipe_dict))) 

286 

287 if recipe and recipe.id: 

288 data_service = RecipeDataService(recipe.id) 

289 

290 if recipe_image: 

291 data_service.write_image(recipe_image, "webp") 

292 

293 return recipe 

294 

295 async def create_from_images(self, images: list[UploadFile], translate_language: str | None = None) -> Recipe: 1b

296 openai_recipe_service = OpenAIRecipeService(self.repos, self.user, self.household, self.translator) 

297 with get_temporary_path() as temp_path: 

298 local_images: list[Path] = [] 

299 for image in images: 

300 with temp_path.joinpath(image.filename).open("wb") as buffer: 

301 shutil.copyfileobj(image.file, buffer) 

302 local_images.append(temp_path.joinpath(image.filename)) 

303 

304 recipe_data = await openai_recipe_service.build_recipe_from_images( 

305 local_images, translate_language=translate_language 

306 ) 

307 recipe_data = cleaner.clean(recipe_data, self.translator) 

308 

309 recipe = self.create_one(recipe_data) 

310 data_service = RecipeDataService(recipe.id) 

311 

312 with open(local_images[0], "rb") as f: 

313 data_service.write_image(f.read(), "webp") 

314 return recipe 

315 

316 def duplicate_one(self, old_slug_or_id: str | UUID, dup_data: RecipeDuplicate) -> Recipe: 1b

317 """Duplicates a recipe and returns the new recipe.""" 

318 

319 old_recipe = self.get_one(old_slug_or_id) 1efghijklmnopa

320 new_recipe_data = old_recipe.model_dump(exclude={"id", "name", "slug", "image", "comments"}, round_trip=True) 

321 new_recipe = Recipe.model_validate(new_recipe_data) 

322 

323 # Asset images in steps directly link to the original recipe, so we 

324 # need to update them to references to the assets we copy below 

325 def replace_recipe_step(step: RecipeStep) -> RecipeStep: 

326 new_id = uuid4() 

327 new_text = step.text.replace(str(old_recipe.id), str(new_recipe.id)) 

328 new_step = step.model_copy(update={"id": new_id, "text": new_text}) 

329 return new_step 

330 

331 # Copy ingredients to make them independent of the original 

332 def copy_recipe_ingredient(ingredient: RecipeIngredient): 

333 new_reference_id = uuid4() 

334 new_ingredient = ingredient.model_copy(update={"reference_id": new_reference_id}) 

335 return new_ingredient 

336 

337 new_name = dup_data.name if dup_data.name else old_recipe.name or "" 

338 new_recipe.id = uuid4() 

339 new_recipe.slug = create_recipe_slug(new_name) 

340 new_recipe.image = cache.cache_key.new_key() if old_recipe.image else None 

341 new_recipe.recipe_instructions = ( 

342 None 

343 if old_recipe.recipe_instructions is None 

344 else list(map(replace_recipe_step, old_recipe.recipe_instructions)) 

345 ) 

346 new_recipe.recipe_ingredient = ( 

347 None 

348 if old_recipe.recipe_ingredient is None 

349 else list(map(copy_recipe_ingredient, old_recipe.recipe_ingredient)) 

350 ) 

351 new_recipe.last_made = None 

352 

353 new_recipe = self._recipe_creation_factory(new_name, additional_attrs=new_recipe.model_dump()) 

354 

355 new_recipe = self.repos.recipes.create(new_recipe) 

356 

357 # Copy all assets (including images) to the new recipe directory 

358 # This assures that replaced links in recipe steps continue to work when the old recipe is deleted 

359 try: 

360 new_service = RecipeDataService(new_recipe.id) 

361 old_service = RecipeDataService(old_recipe.id) 

362 copytree( 

363 old_service.dir_data, 

364 new_service.dir_data, 

365 dirs_exist_ok=True, 

366 ) 

367 except Exception as e: 

368 self.logger.error(f"Failed to copy assets from {old_recipe.slug} to {new_recipe.slug}: {e}") 

369 

370 return new_recipe 

371 

372 def _pre_update_check(self, slug_or_id: str | UUID, new_data: Recipe) -> Recipe: 1b

373 """ 

374 gets the recipe from the database and performs a check to see if the user can update the recipe. 

375 If the user can't update the recipe, an exception is raised. 

376 

377 Checks: 

378 - That the recipe exists 

379 - That the user can update the recipe (recipe is not locked or the user is the owner) 

380 - _if_ the user is locking the recipe, that they can lock the recipe (user is the owner) 

381 

382 Args: 

383 slug_or_id (str | UUID): recipe slug or id 

384 new_data (Recipe): the new recipe data 

385 

386 Raises: 

387 exceptions.PermissionDenied (403) 

388 """ 

389 

390 recipe = self.get_one(slug_or_id) 1qca

391 

392 if recipe is None or recipe.settings is None: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true1ca

393 raise exceptions.NoEntryFound("Recipe not found.") 

394 

395 if not self.can_update(recipe): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true1ca

396 raise exceptions.PermissionDenied("You do not have permission to edit this recipe.") 

397 

398 setting_lock = new_data.settings is not None and recipe.settings.locked != new_data.settings.locked 1ca

399 if setting_lock and not self.can_lock_unlock(recipe): 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true1ca

400 raise exceptions.PermissionDenied("You do not have permission to lock/unlock this recipe.") 

401 

402 return recipe 1ca

403 

404 def update_one(self, slug_or_id: str | UUID, update_data: Recipe) -> Recipe: 1b

405 recipe = self._pre_update_check(slug_or_id, update_data) 1ca

406 

407 new_data = self.group_recipes.update(recipe.slug, update_data) 1ca

408 self.check_assets(new_data, recipe.slug) 1ca

409 return new_data 1ca

410 

411 def update_recipe_image(self, slug: str, image: bytes, extension: str): 1b

412 recipe = self.get_one(slug) 1da

413 if not self.can_update(recipe): 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true1da

414 raise exceptions.PermissionDenied("You do not have permission to edit this recipe.") 

415 

416 data_service = RecipeDataService(recipe.id) 1da

417 data_service.write_image(image, extension) 1da

418 

419 return self.group_recipes.update_image(slug, extension) 

420 

421 def patch_one(self, slug_or_id: str | UUID, patch_data: Recipe) -> Recipe: 1b

422 recipe: Recipe = self._pre_update_check(slug_or_id, patch_data) 1qa

423 

424 new_data = self.group_recipes.patch(recipe.slug, patch_data.model_dump(exclude_unset=True)) 

425 

426 self.check_assets(new_data, recipe.slug) 

427 return new_data 

428 

429 def update_last_made(self, slug_or_id: str | UUID, timestamp: datetime) -> Recipe: 1b

430 # we bypass the pre update check since any user can update a recipe's last made date, even if it's locked, 

431 # or if the user belongs to a different household 

432 

433 household_service = HouseholdService(self.user.group_id, self.user.household_id, self.repos) 1q

434 household_service.set_household_recipe(slug_or_id, HouseholdRecipeUpdate(last_made=timestamp)) 1q

435 

436 return self.get_one(slug_or_id) 

437 

438 def delete_one(self, slug_or_id: str | UUID) -> Recipe: 1b

439 recipe = self.get_one(slug_or_id) 

440 

441 if not self.can_delete(recipe): 

442 raise exceptions.PermissionDenied("You do not have permission to delete this recipe.") 

443 

444 data = self.group_recipes.delete(recipe.id, "id") 

445 self.delete_assets(data) 

446 return data 

447 

448 # ================================================================= 

449 # Recipe Template Methods 

450 

451 def render_template(self, recipe: Recipe, temp_dir: Path, template: str) -> Path: 1b

452 t_service = TemplateService(temp_dir) 

453 return t_service.render(recipe, template) 

454 

455 

456class OpenAIRecipeService(RecipeServiceBase): 1b

457 def _convert_recipe(self, openai_recipe: OpenAIRecipe) -> Recipe: 1b

458 return Recipe( 

459 user_id=self.user.id, 

460 group_id=self.user.group_id, 

461 household_id=self.household.id, 

462 name=openai_recipe.name, 

463 slug=create_recipe_slug(openai_recipe.name), 

464 description=openai_recipe.description, 

465 recipe_yield=openai_recipe.recipe_yield, 

466 total_time=openai_recipe.total_time, 

467 prep_time=openai_recipe.prep_time, 

468 perform_time=openai_recipe.perform_time, 

469 recipe_ingredient=[ 

470 RecipeIngredient(title=ingredient.title, note=ingredient.text) 

471 for ingredient in openai_recipe.ingredients 

472 if ingredient.text 

473 ], 

474 recipe_instructions=[ 

475 RecipeStep(title=instruction.title, text=instruction.text) 

476 for instruction in openai_recipe.instructions 

477 if instruction.text 

478 ], 

479 notes=[RecipeNote(title=note.title or "", text=note.text) for note in openai_recipe.notes if note.text], 

480 ) 

481 

482 async def build_recipe_from_images(self, images: list[Path], translate_language: str | None) -> Recipe: 1b

483 settings = get_app_settings() 

484 if not (settings.OPENAI_ENABLED and settings.OPENAI_ENABLE_IMAGE_SERVICES): 

485 raise ValueError("OpenAI image services are not available") 

486 

487 openai_service = OpenAIService() 

488 prompt = openai_service.get_prompt( 

489 "recipes.parse-recipe-image", 

490 data_injections=[ 

491 OpenAIDataInjection( 

492 description=( 

493 "This is the JSON response schema. You must respond in valid JSON that follows this schema. " 

494 "Your payload should be as compact as possible, eliminating unncessesary whitespace. " 

495 "Any fields with default values which you do not populate should not be in the payload." 

496 ), 

497 value=OpenAIRecipe, 

498 ) 

499 ], 

500 ) 

501 

502 openai_images = [OpenAILocalImage(filename=os.path.basename(image), path=image) for image in images] 

503 message = ( 

504 f"Please extract the recipe from the {'images' if len(openai_images) > 1 else 'image'} provided." 

505 "There should be exactly one recipe." 

506 ) 

507 

508 if translate_language: 

509 message += f" Please translate the recipe to {translate_language}." 

510 

511 try: 

512 response = await openai_service.get_response( 

513 prompt, message, images=openai_images, force_json_response=True 

514 ) 

515 except Exception as e: 

516 raise Exception("Failed to call OpenAI services") from e 

517 

518 try: 

519 openai_recipe = OpenAIRecipe.parse_openai_response(response) 

520 recipe = self._convert_recipe(openai_recipe) 

521 except Exception as e: 

522 raise ValueError("Unable to parse recipe from image") from e 

523 

524 return recipe