Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/recipe/recipe_service.py: 32%
296 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1import json 1a
2import os 1a
3import shutil 1a
4from datetime import UTC, datetime 1a
5from pathlib import Path 1a
6from shutil import copytree, rmtree 1a
7from typing import Any 1a
8from uuid import UUID, uuid4 1a
9from zipfile import ZipFile 1a
11from fastapi import UploadFile 1a
13from mealie.core import exceptions 1a
14from mealie.core.config import get_app_settings 1a
15from mealie.core.dependencies.dependencies import get_temporary_path 1a
16from mealie.lang.providers import Translator 1a
17from mealie.pkgs import cache 1a
18from mealie.repos.all_repositories import get_repositories 1a
19from mealie.repos.repository_factory import AllRepositories 1a
20from mealie.repos.repository_generic import RepositoryGeneric 1a
21from mealie.schema.household.household import HouseholdInDB, HouseholdRecipeUpdate 1a
22from mealie.schema.openai.recipe import OpenAIRecipe 1a
23from mealie.schema.recipe.recipe import CreateRecipe, Recipe, create_recipe_slug 1a
24from mealie.schema.recipe.recipe_ingredient import RecipeIngredient 1a
25from mealie.schema.recipe.recipe_notes import RecipeNote 1a
26from mealie.schema.recipe.recipe_settings import RecipeSettings 1a
27from mealie.schema.recipe.recipe_step import RecipeStep 1a
28from mealie.schema.recipe.recipe_timeline_events import RecipeTimelineEventCreate, TimelineEventType 1a
29from mealie.schema.recipe.request_helpers import RecipeDuplicate 1a
30from mealie.schema.user.user import PrivateUser, UserRatingCreate 1a
31from mealie.services._base_service import BaseService 1a
32from mealie.services.household_services.household_service import HouseholdService 1a
33from mealie.services.openai import OpenAIDataInjection, OpenAILocalImage, OpenAIService 1a
34from mealie.services.recipe.recipe_data_service import RecipeDataService 1a
35from mealie.services.scraper import cleaner 1a
37from .template_service import TemplateService 1a
40class RecipeServiceBase(BaseService): 1a
41 def __init__(self, repos: AllRepositories, user: PrivateUser, household: HouseholdInDB, translator: Translator): 1a
42 self.repos = repos 1cdefghijkb
43 self.user = user 1cdefghijkb
44 self.household = household 1cdefghijkb
46 if repos.group_id != user.group_id != household.group_id: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true1cdefghijkb
47 raise Exception("group ids do not match")
48 if repos.household_id != user.household_id != household.id: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true1cdefghijkb
49 raise Exception("household ids do not match")
51 self.group_recipes = get_repositories(repos.session, group_id=repos.group_id, household_id=None).recipes 1cdefghijkb
52 """Recipes repo without a Household filter""" 1cdefghijkb
54 self.translator = translator 1cdefghijkb
55 self.t = translator.t 1cdefghijkb
57 super().__init__() 1cdefghijkb
60class RecipeService(RecipeServiceBase): 1a
61 def _get_recipe(self, data: str | UUID, key: str | None = None) -> Recipe: 1a
62 recipe = self.group_recipes.get_one(data, key) 1cdefghijkb
63 if recipe is None: 63 ↛ 65line 63 didn't jump to line 65 because the condition on line 63 was always true1cdefghijkb
64 raise exceptions.NoEntryFound("Recipe not found.") 1cdefghijkb
65 return recipe
67 def can_delete(self, recipe: Recipe) -> bool: 1a
68 if self.user.admin:
69 return True
70 else:
71 return self.can_update(recipe)
73 def can_update(self, recipe: Recipe) -> bool: 1a
74 if recipe.settings is None:
75 raise exceptions.UnexpectedNone("Recipe Settings is None")
77 # Check if this user owns the recipe
78 if self.user.id == recipe.user_id:
79 return True
81 # Check if this user has permission to edit this recipe
82 if self.household.id != recipe.household_id:
83 other_household = self.repos.households.get_one(recipe.household_id)
84 if not (other_household and other_household.preferences):
85 return False
86 if other_household.preferences.lock_recipe_edits_from_other_households:
87 return False
88 if recipe.settings.locked:
89 return False
91 return True
93 def can_lock_unlock(self, recipe: Recipe) -> bool: 1a
94 return recipe.user_id == self.user.id
96 def check_assets(self, recipe: Recipe, original_slug: str) -> None: 1a
97 """Checks if the recipe slug has changed, and if so moves the assets to a new file with the new slug."""
98 if original_slug != recipe.slug:
99 current_dir = self.directories.RECIPE_DATA_DIR.joinpath(original_slug)
101 try:
102 copytree(current_dir, recipe.directory, dirs_exist_ok=True)
103 self.logger.debug(f"Renaming Recipe Directory: {original_slug} -> {recipe.slug}")
104 except FileNotFoundError:
105 self.logger.error(f"Recipe Directory not Found: {original_slug}")
107 if recipe.assets is None:
108 recipe.assets = []
110 all_asset_files = [x.file_name for x in recipe.assets]
112 for file in recipe.asset_dir.iterdir():
113 if file.is_dir():
114 continue
115 if file.name not in all_asset_files:
116 file.unlink()
118 def delete_assets(self, recipe: Recipe) -> None: 1a
119 recipe_dir = recipe.directory
120 rmtree(recipe_dir, ignore_errors=True)
121 self.logger.info(f"Recipe Directory Removed: {recipe.slug}")
123 def _recipe_creation_factory(self, name: str, additional_attrs: dict | None = None) -> Recipe: 1a
124 """
125 The main creation point for recipes. The factor method returns an instance of the
126 Recipe Schema class with the appropriate defaults set. Recipes should not be created
127 elsewhere to avoid conflicts.
128 """
129 additional_attrs = additional_attrs or {}
130 additional_attrs["name"] = name
131 additional_attrs["user_id"] = self.user.id
132 additional_attrs["household_id"] = self.household.id
133 additional_attrs["group_id"] = self.household.group_id
135 if additional_attrs.get("tags"): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 for i in range(len(additional_attrs.get("tags", []))):
137 additional_attrs["tags"][i]["group_id"] = self.user.group_id
139 if not additional_attrs.get("recipe_ingredient"): 139 ↛ 144line 139 didn't jump to line 144 because the condition on line 139 was always true
140 additional_attrs["recipe_ingredient"] = [
141 RecipeIngredient(note=self.t("recipe.recipe-defaults.ingredient-note"))
142 ]
144 if not additional_attrs.get("recipe_instructions"): 144 ↛ 147line 144 didn't jump to line 147 because the condition on line 144 was always true
145 additional_attrs["recipe_instructions"] = [RecipeStep(text=self.t("recipe.recipe-defaults.step-text"))]
147 return Recipe(**additional_attrs)
149 def get_one(self, slug_or_id: str | UUID) -> Recipe: 1a
150 if isinstance(slug_or_id, str): 1cdefghijkb
151 try: 1cdefghijkb
152 slug_or_id = UUID(slug_or_id) 1cdefghijkb
153 except ValueError: 1cdefghijkb
154 pass 1cdefghijkb
156 if isinstance(slug_or_id, UUID): 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true1cdefghijkb
157 return self._get_recipe(slug_or_id, "id")
159 else:
160 return self._get_recipe(slug_or_id, "slug") 1cdefghijkb
162 def create_one(self, create_data: Recipe | CreateRecipe) -> Recipe: 1a
163 if create_data.name is None: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 create_data.name = "New Recipe"
166 data: Recipe = self._recipe_creation_factory(name=create_data.name, additional_attrs=create_data.model_dump())
168 if isinstance(create_data, CreateRecipe) or create_data.settings is None: 168 ↛ 180line 168 didn't jump to line 180 because the condition on line 168 was always true
169 if self.household.preferences is not None: 169 ↛ 178line 169 didn't jump to line 178 because the condition on line 169 was always true
170 data.settings = RecipeSettings(
171 public=self.household.preferences.recipe_public,
172 show_nutrition=self.household.preferences.recipe_show_nutrition,
173 show_assets=self.household.preferences.recipe_show_assets,
174 landscape_view=self.household.preferences.recipe_landscape_view,
175 disable_comments=self.household.preferences.recipe_disable_comments,
176 )
177 else:
178 data.settings = RecipeSettings()
180 rating_input = data.rating
181 data.last_made = None
182 new_recipe = self.repos.recipes.create(data)
184 # convert rating into user rating
185 if rating_input:
186 self.repos.user_ratings.create(
187 UserRatingCreate(
188 user_id=self.user.id,
189 recipe_id=new_recipe.id,
190 rating=rating_input,
191 is_favorite=False,
192 )
193 )
195 # create first timeline entry
196 timeline_event_data = RecipeTimelineEventCreate(
197 user_id=new_recipe.user_id,
198 recipe_id=new_recipe.id,
199 subject=self.t("recipe.recipe-created"),
200 event_type=TimelineEventType.system,
201 timestamp=new_recipe.created_at or datetime.now(UTC),
202 )
204 self.repos.recipe_timeline_events.create(timeline_event_data)
205 return new_recipe
207 def _transform_user_id(self, user_id: str) -> str: 1a
208 query = self.repos.users.get_one(user_id)
209 if query:
210 return user_id
211 else:
212 # default to the current user
213 return str(self.user.id)
215 def _transform_category_or_tag(self, data: dict, repo: RepositoryGeneric) -> dict: 1a
216 slug = data.get("slug")
217 if not slug:
218 return data
220 # if the item exists, return the actual data
221 query = repo.get_one(slug, "slug")
222 if query:
223 return query.model_dump()
225 # otherwise, create the item
226 new_item = repo.create(data)
227 return new_item.model_dump()
229 def _process_recipe_data(self, key: str, data: list | dict | Any): 1a
230 if isinstance(data, list):
231 return [self._process_recipe_data(key, item) for item in data]
233 elif isinstance(data, str):
234 # make sure the user is valid
235 if key == "user_id":
236 return self._transform_user_id(str(data))
238 return data
240 elif not isinstance(data, dict):
241 return data
243 # force group_id and household_id to match the group id of the current user
244 data["group_id"] = str(self.user.group_id)
245 data["household_id"] = str(self.user.household_id)
247 # make sure categories and tags are valid
248 if key == "recipe_category":
249 return self._transform_category_or_tag(data, self.repos.categories)
250 elif key == "tags":
251 return self._transform_category_or_tag(data, self.repos.tags)
253 # recursively process other objects
254 for k, v in data.items():
255 data[k] = self._process_recipe_data(k, v)
257 return data
259 def clean_recipe_dict(self, recipe: dict[str, Any]) -> dict[str, Any]: 1a
260 return self._process_recipe_data("recipe", recipe)
262 def create_from_zip(self, archive: UploadFile, temp_path: Path) -> Recipe: 1a
263 """
264 `create_from_zip` creates a recipe in the database from a zip file exported from Mealie. This is NOT
265 a generic import from a zip file.
266 """
267 with temp_path.open("wb") as buffer:
268 shutil.copyfileobj(archive.file, buffer)
270 recipe_dict: dict | None = None
271 recipe_image: bytes | None = None
273 with ZipFile(temp_path) as myzip:
274 for file in myzip.namelist():
275 if file.endswith(".json"):
276 with myzip.open(file) as myfile:
277 recipe_dict = json.loads(myfile.read())
278 elif file.endswith(".webp"):
279 with myzip.open(file) as myfile:
280 recipe_image = myfile.read()
282 if recipe_dict is None:
283 raise exceptions.UnexpectedNone("No json data found in Zip")
285 recipe = self.create_one(Recipe(**self.clean_recipe_dict(recipe_dict)))
287 if recipe and recipe.id:
288 data_service = RecipeDataService(recipe.id)
290 if recipe_image:
291 data_service.write_image(recipe_image, "webp")
293 return recipe
295 async def create_from_images(self, images: list[UploadFile], translate_language: str | None = None) -> Recipe: 1a
296 openai_recipe_service = OpenAIRecipeService(self.repos, self.user, self.household, self.translator)
297 with get_temporary_path() as temp_path:
298 local_images: list[Path] = []
299 for image in images:
300 with temp_path.joinpath(image.filename).open("wb") as buffer:
301 shutil.copyfileobj(image.file, buffer)
302 local_images.append(temp_path.joinpath(image.filename))
304 recipe_data = await openai_recipe_service.build_recipe_from_images(
305 local_images, translate_language=translate_language
306 )
307 recipe_data = cleaner.clean(recipe_data, self.translator)
309 recipe = self.create_one(recipe_data)
310 data_service = RecipeDataService(recipe.id)
312 with open(local_images[0], "rb") as f:
313 data_service.write_image(f.read(), "webp")
314 return recipe
316 def duplicate_one(self, old_slug_or_id: str | UUID, dup_data: RecipeDuplicate) -> Recipe: 1a
317 """Duplicates a recipe and returns the new recipe."""
319 old_recipe = self.get_one(old_slug_or_id) 1defghijkb
320 new_recipe_data = old_recipe.model_dump(exclude={"id", "name", "slug", "image", "comments"}, round_trip=True)
321 new_recipe = Recipe.model_validate(new_recipe_data)
323 # Asset images in steps directly link to the original recipe, so we
324 # need to update them to references to the assets we copy below
325 def replace_recipe_step(step: RecipeStep) -> RecipeStep:
326 new_id = uuid4()
327 new_text = step.text.replace(str(old_recipe.id), str(new_recipe.id))
328 new_step = step.model_copy(update={"id": new_id, "text": new_text})
329 return new_step
331 # Copy ingredients to make them independent of the original
332 def copy_recipe_ingredient(ingredient: RecipeIngredient):
333 new_reference_id = uuid4()
334 new_ingredient = ingredient.model_copy(update={"reference_id": new_reference_id})
335 return new_ingredient
337 new_name = dup_data.name if dup_data.name else old_recipe.name or ""
338 new_recipe.id = uuid4()
339 new_recipe.slug = create_recipe_slug(new_name)
340 new_recipe.image = cache.cache_key.new_key() if old_recipe.image else None
341 new_recipe.recipe_instructions = (
342 None
343 if old_recipe.recipe_instructions is None
344 else list(map(replace_recipe_step, old_recipe.recipe_instructions))
345 )
346 new_recipe.recipe_ingredient = (
347 None
348 if old_recipe.recipe_ingredient is None
349 else list(map(copy_recipe_ingredient, old_recipe.recipe_ingredient))
350 )
351 new_recipe.last_made = None
353 new_recipe = self._recipe_creation_factory(new_name, additional_attrs=new_recipe.model_dump())
355 new_recipe = self.repos.recipes.create(new_recipe)
357 # Copy all assets (including images) to the new recipe directory
358 # This assures that replaced links in recipe steps continue to work when the old recipe is deleted
359 try:
360 new_service = RecipeDataService(new_recipe.id)
361 old_service = RecipeDataService(old_recipe.id)
362 copytree(
363 old_service.dir_data,
364 new_service.dir_data,
365 dirs_exist_ok=True,
366 )
367 except Exception as e:
368 self.logger.error(f"Failed to copy assets from {old_recipe.slug} to {new_recipe.slug}: {e}")
370 return new_recipe
372 def _pre_update_check(self, slug_or_id: str | UUID, new_data: Recipe) -> Recipe: 1a
373 """
374 gets the recipe from the database and performs a check to see if the user can update the recipe.
375 If the user can't update the recipe, an exception is raised.
377 Checks:
378 - That the recipe exists
379 - That the user can update the recipe (recipe is not locked or the user is the owner)
380 - _if_ the user is locking the recipe, that they can lock the recipe (user is the owner)
382 Args:
383 slug_or_id (str | UUID): recipe slug or id
384 new_data (Recipe): the new recipe data
386 Raises:
387 exceptions.PermissionDenied (403)
388 """
390 recipe = self.get_one(slug_or_id)
392 if recipe is None or recipe.settings is None:
393 raise exceptions.NoEntryFound("Recipe not found.")
395 if not self.can_update(recipe):
396 raise exceptions.PermissionDenied("You do not have permission to edit this recipe.")
398 setting_lock = new_data.settings is not None and recipe.settings.locked != new_data.settings.locked
399 if setting_lock and not self.can_lock_unlock(recipe):
400 raise exceptions.PermissionDenied("You do not have permission to lock/unlock this recipe.")
402 return recipe
404 def update_one(self, slug_or_id: str | UUID, update_data: Recipe) -> Recipe: 1a
405 recipe = self._pre_update_check(slug_or_id, update_data)
407 new_data = self.group_recipes.update(recipe.slug, update_data)
408 self.check_assets(new_data, recipe.slug)
409 return new_data
411 def update_recipe_image(self, slug: str, image: bytes, extension: str): 1a
412 recipe = self.get_one(slug)
413 if not self.can_update(recipe):
414 raise exceptions.PermissionDenied("You do not have permission to edit this recipe.")
416 data_service = RecipeDataService(recipe.id)
417 data_service.write_image(image, extension)
419 return self.group_recipes.update_image(slug, extension)
421 def patch_one(self, slug_or_id: str | UUID, patch_data: Recipe) -> Recipe: 1a
422 recipe: Recipe = self._pre_update_check(slug_or_id, patch_data)
424 new_data = self.group_recipes.patch(recipe.slug, patch_data.model_dump(exclude_unset=True))
426 self.check_assets(new_data, recipe.slug)
427 return new_data
429 def update_last_made(self, slug_or_id: str | UUID, timestamp: datetime) -> Recipe: 1a
430 # we bypass the pre update check since any user can update a recipe's last made date, even if it's locked,
431 # or if the user belongs to a different household
433 household_service = HouseholdService(self.user.group_id, self.user.household_id, self.repos)
434 household_service.set_household_recipe(slug_or_id, HouseholdRecipeUpdate(last_made=timestamp))
436 return self.get_one(slug_or_id)
438 def delete_one(self, slug_or_id: str | UUID) -> Recipe: 1a
439 recipe = self.get_one(slug_or_id) 1c
441 if not self.can_delete(recipe):
442 raise exceptions.PermissionDenied("You do not have permission to delete this recipe.")
444 data = self.group_recipes.delete(recipe.id, "id")
445 self.delete_assets(data)
446 return data
448 # =================================================================
449 # Recipe Template Methods
451 def render_template(self, recipe: Recipe, temp_dir: Path, template: str) -> Path: 1a
452 t_service = TemplateService(temp_dir)
453 return t_service.render(recipe, template)
456class OpenAIRecipeService(RecipeServiceBase): 1a
457 def _convert_recipe(self, openai_recipe: OpenAIRecipe) -> Recipe: 1a
458 return Recipe(
459 user_id=self.user.id,
460 group_id=self.user.group_id,
461 household_id=self.household.id,
462 name=openai_recipe.name,
463 slug=create_recipe_slug(openai_recipe.name),
464 description=openai_recipe.description,
465 recipe_yield=openai_recipe.recipe_yield,
466 total_time=openai_recipe.total_time,
467 prep_time=openai_recipe.prep_time,
468 perform_time=openai_recipe.perform_time,
469 recipe_ingredient=[
470 RecipeIngredient(title=ingredient.title, note=ingredient.text)
471 for ingredient in openai_recipe.ingredients
472 if ingredient.text
473 ],
474 recipe_instructions=[
475 RecipeStep(title=instruction.title, text=instruction.text)
476 for instruction in openai_recipe.instructions
477 if instruction.text
478 ],
479 notes=[RecipeNote(title=note.title or "", text=note.text) for note in openai_recipe.notes if note.text],
480 )
482 async def build_recipe_from_images(self, images: list[Path], translate_language: str | None) -> Recipe: 1a
483 settings = get_app_settings()
484 if not (settings.OPENAI_ENABLED and settings.OPENAI_ENABLE_IMAGE_SERVICES):
485 raise ValueError("OpenAI image services are not available")
487 openai_service = OpenAIService()
488 prompt = openai_service.get_prompt(
489 "recipes.parse-recipe-image",
490 data_injections=[
491 OpenAIDataInjection(
492 description=(
493 "This is the JSON response schema. You must respond in valid JSON that follows this schema. "
494 "Your payload should be as compact as possible, eliminating unncessesary whitespace. "
495 "Any fields with default values which you do not populate should not be in the payload."
496 ),
497 value=OpenAIRecipe,
498 )
499 ],
500 )
502 openai_images = [OpenAILocalImage(filename=os.path.basename(image), path=image) for image in images]
503 message = (
504 f"Please extract the recipe from the {'images' if len(openai_images) > 1 else 'image'} provided."
505 "There should be exactly one recipe."
506 )
508 if translate_language:
509 message += f" Please translate the recipe to {translate_language}."
511 try:
512 response = await openai_service.get_response(
513 prompt, message, images=openai_images, force_json_response=True
514 )
515 except Exception as e:
516 raise Exception("Failed to call OpenAI services") from e
518 try:
519 openai_recipe = OpenAIRecipe.parse_openai_response(response)
520 recipe = self._convert_recipe(openai_recipe)
521 except Exception as e:
522 raise ValueError("Unable to parse recipe from image") from e
524 return recipe