Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/recipe/recipe_service.py: 50%
296 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
1import json 1b
2import os 1b
3import shutil 1b
4from datetime import UTC, datetime 1b
5from pathlib import Path 1b
6from shutil import copytree, rmtree 1b
7from typing import Any 1b
8from uuid import UUID, uuid4 1b
9from zipfile import ZipFile 1b
11from fastapi import UploadFile 1b
13from mealie.core import exceptions 1b
14from mealie.core.config import get_app_settings 1b
15from mealie.core.dependencies.dependencies import get_temporary_path 1b
16from mealie.lang.providers import Translator 1b
17from mealie.pkgs import cache 1b
18from mealie.repos.all_repositories import get_repositories 1b
19from mealie.repos.repository_factory import AllRepositories 1b
20from mealie.repos.repository_generic import RepositoryGeneric 1b
21from mealie.schema.household.household import HouseholdInDB, HouseholdRecipeUpdate 1b
22from mealie.schema.openai.recipe import OpenAIRecipe 1b
23from mealie.schema.recipe.recipe import CreateRecipe, Recipe, create_recipe_slug 1b
24from mealie.schema.recipe.recipe_ingredient import RecipeIngredient 1b
25from mealie.schema.recipe.recipe_notes import RecipeNote 1b
26from mealie.schema.recipe.recipe_settings import RecipeSettings 1b
27from mealie.schema.recipe.recipe_step import RecipeStep 1b
28from mealie.schema.recipe.recipe_timeline_events import RecipeTimelineEventCreate, TimelineEventType 1b
29from mealie.schema.recipe.request_helpers import RecipeDuplicate 1b
30from mealie.schema.user.user import PrivateUser, UserRatingCreate 1b
31from mealie.services._base_service import BaseService 1b
32from mealie.services.household_services.household_service import HouseholdService 1b
33from mealie.services.openai import OpenAIDataInjection, OpenAILocalImage, OpenAIService 1b
34from mealie.services.recipe.recipe_data_service import RecipeDataService 1b
35from mealie.services.scraper import cleaner 1b
37from .template_service import TemplateService 1b
40class RecipeServiceBase(BaseService): 1b
41 def __init__(self, repos: AllRepositories, user: PrivateUser, household: HouseholdInDB, translator: Translator): 1b
42 self.repos = repos 1qcdefghijklmnopa
43 self.user = user 1qcdefghijklmnopa
44 self.household = household 1qcdefghijklmnopa
46 if repos.group_id != user.group_id != household.group_id: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true1qcdefghijklmnopa
47 raise Exception("group ids do not match")
48 if repos.household_id != user.household_id != household.id: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true1qcdefghijklmnopa
49 raise Exception("household ids do not match")
51 self.group_recipes = get_repositories(repos.session, group_id=repos.group_id, household_id=None).recipes 1qcdefghijklmnopa
52 """Recipes repo without a Household filter""" 1qcdefghijklmnopa
54 self.translator = translator 1qcdefghijklmnopa
55 self.t = translator.t 1qcdefghijklmnopa
57 super().__init__() 1qcdefghijklmnopa
60class RecipeService(RecipeServiceBase): 1b
61 def _get_recipe(self, data: str | UUID, key: str | None = None) -> Recipe: 1b
62 recipe = self.group_recipes.get_one(data, key) 1qcdefghijklmnopa
63 if recipe is None: 1qcdefghijklmnopa
64 raise exceptions.NoEntryFound("Recipe not found.") 1qcdefghijklmnopa
65 return recipe 1cda
67 def can_delete(self, recipe: Recipe) -> bool: 1b
68 if self.user.admin:
69 return True
70 else:
71 return self.can_update(recipe)
73 def can_update(self, recipe: Recipe) -> bool: 1b
74 if recipe.settings is None: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true1cda
75 raise exceptions.UnexpectedNone("Recipe Settings is None")
77 # Check if this user owns the recipe
78 if self.user.id == recipe.user_id: 78 ↛ 82line 78 didn't jump to line 82 because the condition on line 78 was always true1cda
79 return True 1cda
81 # Check if this user has permission to edit this recipe
82 if self.household.id != recipe.household_id:
83 other_household = self.repos.households.get_one(recipe.household_id)
84 if not (other_household and other_household.preferences):
85 return False
86 if other_household.preferences.lock_recipe_edits_from_other_households:
87 return False
88 if recipe.settings.locked:
89 return False
91 return True
93 def can_lock_unlock(self, recipe: Recipe) -> bool: 1b
94 return recipe.user_id == self.user.id
96 def check_assets(self, recipe: Recipe, original_slug: str) -> None: 1b
97 """Checks if the recipe slug has changed, and if so moves the assets to a new file with the new slug."""
98 if original_slug != recipe.slug: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true1ca
99 current_dir = self.directories.RECIPE_DATA_DIR.joinpath(original_slug)
101 try:
102 copytree(current_dir, recipe.directory, dirs_exist_ok=True)
103 self.logger.debug(f"Renaming Recipe Directory: {original_slug} -> {recipe.slug}")
104 except FileNotFoundError:
105 self.logger.error(f"Recipe Directory not Found: {original_slug}")
107 if recipe.assets is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true1ca
108 recipe.assets = []
110 all_asset_files = [x.file_name for x in recipe.assets] 1ca
112 for file in recipe.asset_dir.iterdir(): 1ca
113 if file.is_dir(): 113 ↛ 114line 113 didn't jump to line 114 because the condition on line 113 was never true1ca
114 continue
115 if file.name not in all_asset_files: 115 ↛ 116line 115 didn't jump to line 116 because the condition on line 115 was never true1ca
116 file.unlink()
118 def delete_assets(self, recipe: Recipe) -> None: 1b
119 recipe_dir = recipe.directory
120 rmtree(recipe_dir, ignore_errors=True)
121 self.logger.info(f"Recipe Directory Removed: {recipe.slug}")
123 def _recipe_creation_factory(self, name: str, additional_attrs: dict | None = None) -> Recipe: 1b
124 """
125 The main creation point for recipes. The factor method returns an instance of the
126 Recipe Schema class with the appropriate defaults set. Recipes should not be created
127 elsewhere to avoid conflicts.
128 """
129 additional_attrs = additional_attrs or {}
130 additional_attrs["name"] = name
131 additional_attrs["user_id"] = self.user.id
132 additional_attrs["household_id"] = self.household.id
133 additional_attrs["group_id"] = self.household.group_id
135 if additional_attrs.get("tags"): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true
136 for i in range(len(additional_attrs.get("tags", []))):
137 additional_attrs["tags"][i]["group_id"] = self.user.group_id
139 if not additional_attrs.get("recipe_ingredient"):
140 additional_attrs["recipe_ingredient"] = [
141 RecipeIngredient(note=self.t("recipe.recipe-defaults.ingredient-note"))
142 ]
144 if not additional_attrs.get("recipe_instructions"):
145 additional_attrs["recipe_instructions"] = [RecipeStep(text=self.t("recipe.recipe-defaults.step-text"))]
147 return Recipe(**additional_attrs)
149 def get_one(self, slug_or_id: str | UUID) -> Recipe: 1b
150 if isinstance(slug_or_id, str): 1qcdefghijklmnopa
151 try: 1cdefghijklmnopa
152 slug_or_id = UUID(slug_or_id) 1cdefghijklmnopa
153 except ValueError: 1cdefghijklmnopa
154 pass 1cdefghijklmnopa
156 if isinstance(slug_or_id, UUID): 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true1qcdefghijklmnopa
157 return self._get_recipe(slug_or_id, "id")
159 else:
160 return self._get_recipe(slug_or_id, "slug") 1qcdefghijklmnopa
162 def create_one(self, create_data: Recipe | CreateRecipe) -> Recipe: 1b
163 if create_data.name is None: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true
164 create_data.name = "New Recipe"
166 data: Recipe = self._recipe_creation_factory(name=create_data.name, additional_attrs=create_data.model_dump())
168 if isinstance(create_data, CreateRecipe) or create_data.settings is None: 168 ↛ 180line 168 didn't jump to line 180 because the condition on line 168 was always true
169 if self.household.preferences is not None: 169 ↛ 178line 169 didn't jump to line 178 because the condition on line 169 was always true
170 data.settings = RecipeSettings(
171 public=self.household.preferences.recipe_public,
172 show_nutrition=self.household.preferences.recipe_show_nutrition,
173 show_assets=self.household.preferences.recipe_show_assets,
174 landscape_view=self.household.preferences.recipe_landscape_view,
175 disable_comments=self.household.preferences.recipe_disable_comments,
176 )
177 else:
178 data.settings = RecipeSettings()
180 rating_input = data.rating
181 data.last_made = None
182 new_recipe = self.repos.recipes.create(data)
184 # convert rating into user rating
185 if rating_input: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true
186 self.repos.user_ratings.create(
187 UserRatingCreate(
188 user_id=self.user.id,
189 recipe_id=new_recipe.id,
190 rating=rating_input,
191 is_favorite=False,
192 )
193 )
195 # create first timeline entry
196 timeline_event_data = RecipeTimelineEventCreate(
197 user_id=new_recipe.user_id,
198 recipe_id=new_recipe.id,
199 subject=self.t("recipe.recipe-created"),
200 event_type=TimelineEventType.system,
201 timestamp=new_recipe.created_at or datetime.now(UTC),
202 )
204 self.repos.recipe_timeline_events.create(timeline_event_data)
205 return new_recipe
207 def _transform_user_id(self, user_id: str) -> str: 1b
208 query = self.repos.users.get_one(user_id)
209 if query:
210 return user_id
211 else:
212 # default to the current user
213 return str(self.user.id)
215 def _transform_category_or_tag(self, data: dict, repo: RepositoryGeneric) -> dict: 1b
216 slug = data.get("slug")
217 if not slug:
218 return data
220 # if the item exists, return the actual data
221 query = repo.get_one(slug, "slug")
222 if query:
223 return query.model_dump()
225 # otherwise, create the item
226 new_item = repo.create(data)
227 return new_item.model_dump()
229 def _process_recipe_data(self, key: str, data: list | dict | Any): 1b
230 if isinstance(data, list):
231 return [self._process_recipe_data(key, item) for item in data]
233 elif isinstance(data, str):
234 # make sure the user is valid
235 if key == "user_id":
236 return self._transform_user_id(str(data))
238 return data
240 elif not isinstance(data, dict):
241 return data
243 # force group_id and household_id to match the group id of the current user
244 data["group_id"] = str(self.user.group_id)
245 data["household_id"] = str(self.user.household_id)
247 # make sure categories and tags are valid
248 if key == "recipe_category":
249 return self._transform_category_or_tag(data, self.repos.categories)
250 elif key == "tags":
251 return self._transform_category_or_tag(data, self.repos.tags)
253 # recursively process other objects
254 for k, v in data.items():
255 data[k] = self._process_recipe_data(k, v)
257 return data
259 def clean_recipe_dict(self, recipe: dict[str, Any]) -> dict[str, Any]: 1b
260 return self._process_recipe_data("recipe", recipe)
262 def create_from_zip(self, archive: UploadFile, temp_path: Path) -> Recipe: 1b
263 """
264 `create_from_zip` creates a recipe in the database from a zip file exported from Mealie. This is NOT
265 a generic import from a zip file.
266 """
267 with temp_path.open("wb") as buffer: 1c
268 shutil.copyfileobj(archive.file, buffer) 1c
270 recipe_dict: dict | None = None 1c
271 recipe_image: bytes | None = None 1c
273 with ZipFile(temp_path) as myzip: 1c
274 for file in myzip.namelist():
275 if file.endswith(".json"):
276 with myzip.open(file) as myfile:
277 recipe_dict = json.loads(myfile.read())
278 elif file.endswith(".webp"):
279 with myzip.open(file) as myfile:
280 recipe_image = myfile.read()
282 if recipe_dict is None:
283 raise exceptions.UnexpectedNone("No json data found in Zip")
285 recipe = self.create_one(Recipe(**self.clean_recipe_dict(recipe_dict)))
287 if recipe and recipe.id:
288 data_service = RecipeDataService(recipe.id)
290 if recipe_image:
291 data_service.write_image(recipe_image, "webp")
293 return recipe
295 async def create_from_images(self, images: list[UploadFile], translate_language: str | None = None) -> Recipe: 1b
296 openai_recipe_service = OpenAIRecipeService(self.repos, self.user, self.household, self.translator)
297 with get_temporary_path() as temp_path:
298 local_images: list[Path] = []
299 for image in images:
300 with temp_path.joinpath(image.filename).open("wb") as buffer:
301 shutil.copyfileobj(image.file, buffer)
302 local_images.append(temp_path.joinpath(image.filename))
304 recipe_data = await openai_recipe_service.build_recipe_from_images(
305 local_images, translate_language=translate_language
306 )
307 recipe_data = cleaner.clean(recipe_data, self.translator)
309 recipe = self.create_one(recipe_data)
310 data_service = RecipeDataService(recipe.id)
312 with open(local_images[0], "rb") as f:
313 data_service.write_image(f.read(), "webp")
314 return recipe
316 def duplicate_one(self, old_slug_or_id: str | UUID, dup_data: RecipeDuplicate) -> Recipe: 1b
317 """Duplicates a recipe and returns the new recipe."""
319 old_recipe = self.get_one(old_slug_or_id) 1efghijklmnopa
320 new_recipe_data = old_recipe.model_dump(exclude={"id", "name", "slug", "image", "comments"}, round_trip=True)
321 new_recipe = Recipe.model_validate(new_recipe_data)
323 # Asset images in steps directly link to the original recipe, so we
324 # need to update them to references to the assets we copy below
325 def replace_recipe_step(step: RecipeStep) -> RecipeStep:
326 new_id = uuid4()
327 new_text = step.text.replace(str(old_recipe.id), str(new_recipe.id))
328 new_step = step.model_copy(update={"id": new_id, "text": new_text})
329 return new_step
331 # Copy ingredients to make them independent of the original
332 def copy_recipe_ingredient(ingredient: RecipeIngredient):
333 new_reference_id = uuid4()
334 new_ingredient = ingredient.model_copy(update={"reference_id": new_reference_id})
335 return new_ingredient
337 new_name = dup_data.name if dup_data.name else old_recipe.name or ""
338 new_recipe.id = uuid4()
339 new_recipe.slug = create_recipe_slug(new_name)
340 new_recipe.image = cache.cache_key.new_key() if old_recipe.image else None
341 new_recipe.recipe_instructions = (
342 None
343 if old_recipe.recipe_instructions is None
344 else list(map(replace_recipe_step, old_recipe.recipe_instructions))
345 )
346 new_recipe.recipe_ingredient = (
347 None
348 if old_recipe.recipe_ingredient is None
349 else list(map(copy_recipe_ingredient, old_recipe.recipe_ingredient))
350 )
351 new_recipe.last_made = None
353 new_recipe = self._recipe_creation_factory(new_name, additional_attrs=new_recipe.model_dump())
355 new_recipe = self.repos.recipes.create(new_recipe)
357 # Copy all assets (including images) to the new recipe directory
358 # This assures that replaced links in recipe steps continue to work when the old recipe is deleted
359 try:
360 new_service = RecipeDataService(new_recipe.id)
361 old_service = RecipeDataService(old_recipe.id)
362 copytree(
363 old_service.dir_data,
364 new_service.dir_data,
365 dirs_exist_ok=True,
366 )
367 except Exception as e:
368 self.logger.error(f"Failed to copy assets from {old_recipe.slug} to {new_recipe.slug}: {e}")
370 return new_recipe
372 def _pre_update_check(self, slug_or_id: str | UUID, new_data: Recipe) -> Recipe: 1b
373 """
374 gets the recipe from the database and performs a check to see if the user can update the recipe.
375 If the user can't update the recipe, an exception is raised.
377 Checks:
378 - That the recipe exists
379 - That the user can update the recipe (recipe is not locked or the user is the owner)
380 - _if_ the user is locking the recipe, that they can lock the recipe (user is the owner)
382 Args:
383 slug_or_id (str | UUID): recipe slug or id
384 new_data (Recipe): the new recipe data
386 Raises:
387 exceptions.PermissionDenied (403)
388 """
390 recipe = self.get_one(slug_or_id) 1qca
392 if recipe is None or recipe.settings is None: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true1ca
393 raise exceptions.NoEntryFound("Recipe not found.")
395 if not self.can_update(recipe): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true1ca
396 raise exceptions.PermissionDenied("You do not have permission to edit this recipe.")
398 setting_lock = new_data.settings is not None and recipe.settings.locked != new_data.settings.locked 1ca
399 if setting_lock and not self.can_lock_unlock(recipe): 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true1ca
400 raise exceptions.PermissionDenied("You do not have permission to lock/unlock this recipe.")
402 return recipe 1ca
404 def update_one(self, slug_or_id: str | UUID, update_data: Recipe) -> Recipe: 1b
405 recipe = self._pre_update_check(slug_or_id, update_data) 1ca
407 new_data = self.group_recipes.update(recipe.slug, update_data) 1ca
408 self.check_assets(new_data, recipe.slug) 1ca
409 return new_data 1ca
411 def update_recipe_image(self, slug: str, image: bytes, extension: str): 1b
412 recipe = self.get_one(slug) 1da
413 if not self.can_update(recipe): 413 ↛ 414line 413 didn't jump to line 414 because the condition on line 413 was never true1da
414 raise exceptions.PermissionDenied("You do not have permission to edit this recipe.")
416 data_service = RecipeDataService(recipe.id) 1da
417 data_service.write_image(image, extension) 1da
419 return self.group_recipes.update_image(slug, extension)
421 def patch_one(self, slug_or_id: str | UUID, patch_data: Recipe) -> Recipe: 1b
422 recipe: Recipe = self._pre_update_check(slug_or_id, patch_data) 1qa
424 new_data = self.group_recipes.patch(recipe.slug, patch_data.model_dump(exclude_unset=True))
426 self.check_assets(new_data, recipe.slug)
427 return new_data
429 def update_last_made(self, slug_or_id: str | UUID, timestamp: datetime) -> Recipe: 1b
430 # we bypass the pre update check since any user can update a recipe's last made date, even if it's locked,
431 # or if the user belongs to a different household
433 household_service = HouseholdService(self.user.group_id, self.user.household_id, self.repos) 1q
434 household_service.set_household_recipe(slug_or_id, HouseholdRecipeUpdate(last_made=timestamp)) 1q
436 return self.get_one(slug_or_id)
438 def delete_one(self, slug_or_id: str | UUID) -> Recipe: 1b
439 recipe = self.get_one(slug_or_id)
441 if not self.can_delete(recipe):
442 raise exceptions.PermissionDenied("You do not have permission to delete this recipe.")
444 data = self.group_recipes.delete(recipe.id, "id")
445 self.delete_assets(data)
446 return data
448 # =================================================================
449 # Recipe Template Methods
451 def render_template(self, recipe: Recipe, temp_dir: Path, template: str) -> Path: 1b
452 t_service = TemplateService(temp_dir)
453 return t_service.render(recipe, template)
456class OpenAIRecipeService(RecipeServiceBase): 1b
457 def _convert_recipe(self, openai_recipe: OpenAIRecipe) -> Recipe: 1b
458 return Recipe(
459 user_id=self.user.id,
460 group_id=self.user.group_id,
461 household_id=self.household.id,
462 name=openai_recipe.name,
463 slug=create_recipe_slug(openai_recipe.name),
464 description=openai_recipe.description,
465 recipe_yield=openai_recipe.recipe_yield,
466 total_time=openai_recipe.total_time,
467 prep_time=openai_recipe.prep_time,
468 perform_time=openai_recipe.perform_time,
469 recipe_ingredient=[
470 RecipeIngredient(title=ingredient.title, note=ingredient.text)
471 for ingredient in openai_recipe.ingredients
472 if ingredient.text
473 ],
474 recipe_instructions=[
475 RecipeStep(title=instruction.title, text=instruction.text)
476 for instruction in openai_recipe.instructions
477 if instruction.text
478 ],
479 notes=[RecipeNote(title=note.title or "", text=note.text) for note in openai_recipe.notes if note.text],
480 )
482 async def build_recipe_from_images(self, images: list[Path], translate_language: str | None) -> Recipe: 1b
483 settings = get_app_settings()
484 if not (settings.OPENAI_ENABLED and settings.OPENAI_ENABLE_IMAGE_SERVICES):
485 raise ValueError("OpenAI image services are not available")
487 openai_service = OpenAIService()
488 prompt = openai_service.get_prompt(
489 "recipes.parse-recipe-image",
490 data_injections=[
491 OpenAIDataInjection(
492 description=(
493 "This is the JSON response schema. You must respond in valid JSON that follows this schema. "
494 "Your payload should be as compact as possible, eliminating unncessesary whitespace. "
495 "Any fields with default values which you do not populate should not be in the payload."
496 ),
497 value=OpenAIRecipe,
498 )
499 ],
500 )
502 openai_images = [OpenAILocalImage(filename=os.path.basename(image), path=image) for image in images]
503 message = (
504 f"Please extract the recipe from the {'images' if len(openai_images) > 1 else 'image'} provided."
505 "There should be exactly one recipe."
506 )
508 if translate_language:
509 message += f" Please translate the recipe to {translate_language}."
511 try:
512 response = await openai_service.get_response(
513 prompt, message, images=openai_images, force_json_response=True
514 )
515 except Exception as e:
516 raise Exception("Failed to call OpenAI services") from e
518 try:
519 openai_recipe = OpenAIRecipe.parse_openai_response(response)
520 recipe = self._convert_recipe(openai_recipe)
521 except Exception as e:
522 raise ValueError("Unable to parse recipe from image") from e
524 return recipe