Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/recipe/recipe_service.py: 40%
296 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1import json 1b
2import os 1b
3import shutil 1b
4from datetime import UTC, datetime 1b
5from pathlib import Path 1b
6from shutil import copytree, rmtree 1b
7from typing import Any 1b
8from uuid import UUID, uuid4 1b
9from zipfile import ZipFile 1b
11from fastapi import UploadFile 1b
13from mealie.core import exceptions 1b
14from mealie.core.config import get_app_settings 1b
15from mealie.core.dependencies.dependencies import get_temporary_path 1b
16from mealie.lang.providers import Translator 1b
17from mealie.pkgs import cache 1b
18from mealie.repos.all_repositories import get_repositories 1b
19from mealie.repos.repository_factory import AllRepositories 1b
20from mealie.repos.repository_generic import RepositoryGeneric 1b
21from mealie.schema.household.household import HouseholdInDB, HouseholdRecipeUpdate 1b
22from mealie.schema.openai.recipe import OpenAIRecipe 1b
23from mealie.schema.recipe.recipe import CreateRecipe, Recipe, create_recipe_slug 1b
24from mealie.schema.recipe.recipe_ingredient import RecipeIngredient 1b
25from mealie.schema.recipe.recipe_notes import RecipeNote 1b
26from mealie.schema.recipe.recipe_settings import RecipeSettings 1b
27from mealie.schema.recipe.recipe_step import RecipeStep 1b
28from mealie.schema.recipe.recipe_timeline_events import RecipeTimelineEventCreate, TimelineEventType 1b
29from mealie.schema.recipe.request_helpers import RecipeDuplicate 1b
30from mealie.schema.user.user import PrivateUser, UserRatingCreate 1b
31from mealie.services._base_service import BaseService 1b
32from mealie.services.household_services.household_service import HouseholdService 1b
33from mealie.services.openai import OpenAIDataInjection, OpenAILocalImage, OpenAIService 1b
34from mealie.services.recipe.recipe_data_service import RecipeDataService 1b
35from mealie.services.scraper import cleaner 1b
37from .template_service import TemplateService 1b
40class RecipeServiceBase(BaseService): 1b
41 def __init__(self, repos: AllRepositories, user: PrivateUser, household: HouseholdInDB, translator: Translator): 1b
42 self.repos = repos 1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
43 self.user = user 1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
44 self.household = household 1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
46 if repos.group_id != user.group_id != household.group_id: 46 ↛ 47line 46 didn't jump to line 47 because the condition on line 46 was never true1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
47 raise Exception("group ids do not match")
48 if repos.household_id != user.household_id != household.id: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
49 raise Exception("household ids do not match")
51 self.group_recipes = get_repositories(repos.session, group_id=repos.group_id, household_id=None).recipes 1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
52 """Recipes repo without a Household filter""" 1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
54 self.translator = translator 1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
55 self.t = translator.t 1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
57 super().__init__() 1FcJhGHIijdefgklmnopqrstuvwxyzABCDEa
60class RecipeService(RecipeServiceBase): 1b
61 def _get_recipe(self, data: str | UUID, key: str | None = None) -> Recipe: 1b
62 recipe = self.group_recipes.get_one(data, key) 1FhGHIijdefgklmnopqrstuvwxyzABCDEa
63 if recipe is None: 1FhGHIijdefgklmnopqrstuvwxyzABCDEa
64 raise exceptions.NoEntryFound("Recipe not found.") 1FhGHIijdefgklmnopqrstuvwxyzABCDEa
65 return recipe
67 def can_delete(self, recipe: Recipe) -> bool: 1b
68 if self.user.admin:
69 return True
70 else:
71 return self.can_update(recipe)
73 def can_update(self, recipe: Recipe) -> bool: 1b
74 if recipe.settings is None: 74 ↛ 75line 74 didn't jump to line 75 because the condition on line 74 was never true
75 raise exceptions.UnexpectedNone("Recipe Settings is None")
77 # Check if this user owns the recipe
78 if self.user.id == recipe.user_id: 78 ↛ 82line 78 didn't jump to line 82 because the condition on line 78 was always true
79 return True
81 # Check if this user has permission to edit this recipe
82 if self.household.id != recipe.household_id:
83 other_household = self.repos.households.get_one(recipe.household_id)
84 if not (other_household and other_household.preferences):
85 return False
86 if other_household.preferences.lock_recipe_edits_from_other_households:
87 return False
88 if recipe.settings.locked:
89 return False
91 return True
93 def can_lock_unlock(self, recipe: Recipe) -> bool: 1b
94 return recipe.user_id == self.user.id
96 def check_assets(self, recipe: Recipe, original_slug: str) -> None: 1b
97 """Checks if the recipe slug has changed, and if so moves the assets to a new file with the new slug."""
98 if original_slug != recipe.slug: 98 ↛ 99line 98 didn't jump to line 99 because the condition on line 98 was never true
99 current_dir = self.directories.RECIPE_DATA_DIR.joinpath(original_slug)
101 try:
102 copytree(current_dir, recipe.directory, dirs_exist_ok=True)
103 self.logger.debug(f"Renaming Recipe Directory: {original_slug} -> {recipe.slug}")
104 except FileNotFoundError:
105 self.logger.error(f"Recipe Directory not Found: {original_slug}")
107 if recipe.assets is None: 107 ↛ 108line 107 didn't jump to line 108 because the condition on line 107 was never true
108 recipe.assets = []
110 all_asset_files = [x.file_name for x in recipe.assets]
112 for file in recipe.asset_dir.iterdir(): 112 ↛ 113line 112 didn't jump to line 113 because the loop on line 112 never started
113 if file.is_dir():
114 continue
115 if file.name not in all_asset_files:
116 file.unlink()
118 def delete_assets(self, recipe: Recipe) -> None: 1b
119 recipe_dir = recipe.directory
120 rmtree(recipe_dir, ignore_errors=True)
121 self.logger.info(f"Recipe Directory Removed: {recipe.slug}")
123 def _recipe_creation_factory(self, name: str, additional_attrs: dict | None = None) -> Recipe: 1b
124 """
125 The main creation point for recipes. The factor method returns an instance of the
126 Recipe Schema class with the appropriate defaults set. Recipes should not be created
127 elsewhere to avoid conflicts.
128 """
129 additional_attrs = additional_attrs or {} 1ca
130 additional_attrs["name"] = name 1ca
131 additional_attrs["user_id"] = self.user.id 1ca
132 additional_attrs["household_id"] = self.household.id 1ca
133 additional_attrs["group_id"] = self.household.group_id 1ca
135 if additional_attrs.get("tags"): 135 ↛ 136line 135 didn't jump to line 136 because the condition on line 135 was never true1ca
136 for i in range(len(additional_attrs.get("tags", []))):
137 additional_attrs["tags"][i]["group_id"] = self.user.group_id
139 if not additional_attrs.get("recipe_ingredient"): 139 ↛ 144line 139 didn't jump to line 144 because the condition on line 139 was always true1ca
140 additional_attrs["recipe_ingredient"] = [ 1ca
141 RecipeIngredient(note=self.t("recipe.recipe-defaults.ingredient-note"))
142 ]
144 if not additional_attrs.get("recipe_instructions"): 144 ↛ 147line 144 didn't jump to line 147 because the condition on line 144 was always true1ca
145 additional_attrs["recipe_instructions"] = [RecipeStep(text=self.t("recipe.recipe-defaults.step-text"))] 1ca
147 return Recipe(**additional_attrs) 1ca
149 def get_one(self, slug_or_id: str | UUID) -> Recipe: 1b
150 if isinstance(slug_or_id, str): 1FhGHIijdefgklmnopqrstuvwxyzABCDEa
151 try: 1FhijdefgklmnopqrstuvwxyzABCDEa
152 slug_or_id = UUID(slug_or_id) 1FhijdefgklmnopqrstuvwxyzABCDEa
153 except ValueError: 1FhijdefgklmnopqrstuvwxyzABCDEa
154 pass 1FhijdefgklmnopqrstuvwxyzABCDEa
156 if isinstance(slug_or_id, UUID): 156 ↛ 157line 156 didn't jump to line 157 because the condition on line 156 was never true1FhGHIijdefgklmnopqrstuvwxyzABCDEa
157 return self._get_recipe(slug_or_id, "id")
159 else:
160 return self._get_recipe(slug_or_id, "slug") 1FhGHIijdefgklmnopqrstuvwxyzABCDEa
162 def create_one(self, create_data: Recipe | CreateRecipe) -> Recipe: 1b
163 if create_data.name is None: 163 ↛ 164line 163 didn't jump to line 164 because the condition on line 163 was never true1ca
164 create_data.name = "New Recipe"
166 data: Recipe = self._recipe_creation_factory(name=create_data.name, additional_attrs=create_data.model_dump()) 1ca
168 if isinstance(create_data, CreateRecipe) or create_data.settings is None: 168 ↛ 180line 168 didn't jump to line 180 because the condition on line 168 was always true1ca
169 if self.household.preferences is not None: 169 ↛ 178line 169 didn't jump to line 178 because the condition on line 169 was always true1ca
170 data.settings = RecipeSettings( 1ca
171 public=self.household.preferences.recipe_public,
172 show_nutrition=self.household.preferences.recipe_show_nutrition,
173 show_assets=self.household.preferences.recipe_show_assets,
174 landscape_view=self.household.preferences.recipe_landscape_view,
175 disable_comments=self.household.preferences.recipe_disable_comments,
176 )
177 else:
178 data.settings = RecipeSettings()
180 rating_input = data.rating 1ca
181 data.last_made = None 1ca
182 new_recipe = self.repos.recipes.create(data) 1ca
184 # convert rating into user rating
185 if rating_input: 185 ↛ 186line 185 didn't jump to line 186 because the condition on line 185 was never true1ca
186 self.repos.user_ratings.create(
187 UserRatingCreate(
188 user_id=self.user.id,
189 recipe_id=new_recipe.id,
190 rating=rating_input,
191 is_favorite=False,
192 )
193 )
195 # create first timeline entry
196 timeline_event_data = RecipeTimelineEventCreate( 1ca
197 user_id=new_recipe.user_id,
198 recipe_id=new_recipe.id,
199 subject=self.t("recipe.recipe-created"),
200 event_type=TimelineEventType.system,
201 timestamp=new_recipe.created_at or datetime.now(UTC),
202 )
204 self.repos.recipe_timeline_events.create(timeline_event_data) 1ca
205 return new_recipe 1ca
207 def _transform_user_id(self, user_id: str) -> str: 1b
208 query = self.repos.users.get_one(user_id)
209 if query:
210 return user_id
211 else:
212 # default to the current user
213 return str(self.user.id)
215 def _transform_category_or_tag(self, data: dict, repo: RepositoryGeneric) -> dict: 1b
216 slug = data.get("slug")
217 if not slug:
218 return data
220 # if the item exists, return the actual data
221 query = repo.get_one(slug, "slug")
222 if query:
223 return query.model_dump()
225 # otherwise, create the item
226 new_item = repo.create(data)
227 return new_item.model_dump()
229 def _process_recipe_data(self, key: str, data: list | dict | Any): 1b
230 if isinstance(data, list):
231 return [self._process_recipe_data(key, item) for item in data]
233 elif isinstance(data, str):
234 # make sure the user is valid
235 if key == "user_id":
236 return self._transform_user_id(str(data))
238 return data
240 elif not isinstance(data, dict):
241 return data
243 # force group_id and household_id to match the group id of the current user
244 data["group_id"] = str(self.user.group_id)
245 data["household_id"] = str(self.user.household_id)
247 # make sure categories and tags are valid
248 if key == "recipe_category":
249 return self._transform_category_or_tag(data, self.repos.categories)
250 elif key == "tags":
251 return self._transform_category_or_tag(data, self.repos.tags)
253 # recursively process other objects
254 for k, v in data.items():
255 data[k] = self._process_recipe_data(k, v)
257 return data
259 def clean_recipe_dict(self, recipe: dict[str, Any]) -> dict[str, Any]: 1b
260 return self._process_recipe_data("recipe", recipe)
262 def create_from_zip(self, archive: UploadFile, temp_path: Path) -> Recipe: 1b
263 """
264 `create_from_zip` creates a recipe in the database from a zip file exported from Mealie. This is NOT
265 a generic import from a zip file.
266 """
267 with temp_path.open("wb") as buffer: 1Ja
268 shutil.copyfileobj(archive.file, buffer) 1Ja
270 recipe_dict: dict | None = None 1Ja
271 recipe_image: bytes | None = None 1Ja
273 with ZipFile(temp_path) as myzip: 1Ja
274 for file in myzip.namelist():
275 if file.endswith(".json"):
276 with myzip.open(file) as myfile:
277 recipe_dict = json.loads(myfile.read())
278 elif file.endswith(".webp"):
279 with myzip.open(file) as myfile:
280 recipe_image = myfile.read()
282 if recipe_dict is None:
283 raise exceptions.UnexpectedNone("No json data found in Zip")
285 recipe = self.create_one(Recipe(**self.clean_recipe_dict(recipe_dict)))
287 if recipe and recipe.id:
288 data_service = RecipeDataService(recipe.id)
290 if recipe_image:
291 data_service.write_image(recipe_image, "webp")
293 return recipe
295 async def create_from_images(self, images: list[UploadFile], translate_language: str | None = None) -> Recipe: 1b
296 openai_recipe_service = OpenAIRecipeService(self.repos, self.user, self.household, self.translator)
297 with get_temporary_path() as temp_path:
298 local_images: list[Path] = []
299 for image in images:
300 with temp_path.joinpath(image.filename).open("wb") as buffer:
301 shutil.copyfileobj(image.file, buffer)
302 local_images.append(temp_path.joinpath(image.filename))
304 recipe_data = await openai_recipe_service.build_recipe_from_images(
305 local_images, translate_language=translate_language
306 )
307 recipe_data = cleaner.clean(recipe_data, self.translator)
309 recipe = self.create_one(recipe_data)
310 data_service = RecipeDataService(recipe.id)
312 with open(local_images[0], "rb") as f:
313 data_service.write_image(f.read(), "webp")
314 return recipe
316 def duplicate_one(self, old_slug_or_id: str | UUID, dup_data: RecipeDuplicate) -> Recipe: 1b
317 """Duplicates a recipe and returns the new recipe."""
319 old_recipe = self.get_one(old_slug_or_id) 1hklmnopqrstuvwxyzABCDEa
320 new_recipe_data = old_recipe.model_dump(exclude={"id", "name", "slug", "image", "comments"}, round_trip=True)
321 new_recipe = Recipe.model_validate(new_recipe_data)
323 # Asset images in steps directly link to the original recipe, so we
324 # need to update them to references to the assets we copy below
325 def replace_recipe_step(step: RecipeStep) -> RecipeStep:
326 new_id = uuid4()
327 new_text = step.text.replace(str(old_recipe.id), str(new_recipe.id))
328 new_step = step.model_copy(update={"id": new_id, "text": new_text})
329 return new_step
331 # Copy ingredients to make them independent of the original
332 def copy_recipe_ingredient(ingredient: RecipeIngredient):
333 new_reference_id = uuid4()
334 new_ingredient = ingredient.model_copy(update={"reference_id": new_reference_id})
335 return new_ingredient
337 new_name = dup_data.name if dup_data.name else old_recipe.name or ""
338 new_recipe.id = uuid4()
339 new_recipe.slug = create_recipe_slug(new_name)
340 new_recipe.image = cache.cache_key.new_key() if old_recipe.image else None
341 new_recipe.recipe_instructions = (
342 None
343 if old_recipe.recipe_instructions is None
344 else list(map(replace_recipe_step, old_recipe.recipe_instructions))
345 )
346 new_recipe.recipe_ingredient = (
347 None
348 if old_recipe.recipe_ingredient is None
349 else list(map(copy_recipe_ingredient, old_recipe.recipe_ingredient))
350 )
351 new_recipe.last_made = None
353 new_recipe = self._recipe_creation_factory(new_name, additional_attrs=new_recipe.model_dump())
355 new_recipe = self.repos.recipes.create(new_recipe)
357 # Copy all assets (including images) to the new recipe directory
358 # This assures that replaced links in recipe steps continue to work when the old recipe is deleted
359 try:
360 new_service = RecipeDataService(new_recipe.id)
361 old_service = RecipeDataService(old_recipe.id)
362 copytree(
363 old_service.dir_data,
364 new_service.dir_data,
365 dirs_exist_ok=True,
366 )
367 except Exception as e:
368 self.logger.error(f"Failed to copy assets from {old_recipe.slug} to {new_recipe.slug}: {e}")
370 return new_recipe
372 def _pre_update_check(self, slug_or_id: str | UUID, new_data: Recipe) -> Recipe: 1b
373 """
374 gets the recipe from the database and performs a check to see if the user can update the recipe.
375 If the user can't update the recipe, an exception is raised.
377 Checks:
378 - That the recipe exists
379 - That the user can update the recipe (recipe is not locked or the user is the owner)
380 - _if_ the user is locking the recipe, that they can lock the recipe (user is the owner)
382 Args:
383 slug_or_id (str | UUID): recipe slug or id
384 new_data (Recipe): the new recipe data
386 Raises:
387 exceptions.PermissionDenied (403)
388 """
390 recipe = self.get_one(slug_or_id) 1GHIdefga
392 if recipe is None or recipe.settings is None: 392 ↛ 393line 392 didn't jump to line 393 because the condition on line 392 was never true
393 raise exceptions.NoEntryFound("Recipe not found.")
395 if not self.can_update(recipe): 395 ↛ 396line 395 didn't jump to line 396 because the condition on line 395 was never true
396 raise exceptions.PermissionDenied("You do not have permission to edit this recipe.")
398 setting_lock = new_data.settings is not None and recipe.settings.locked != new_data.settings.locked
399 if setting_lock and not self.can_lock_unlock(recipe): 399 ↛ 400line 399 didn't jump to line 400 because the condition on line 399 was never true
400 raise exceptions.PermissionDenied("You do not have permission to lock/unlock this recipe.")
402 return recipe
404 def update_one(self, slug_or_id: str | UUID, update_data: Recipe) -> Recipe: 1b
405 recipe = self._pre_update_check(slug_or_id, update_data) 1GHIga
407 new_data = self.group_recipes.update(recipe.slug, update_data)
408 self.check_assets(new_data, recipe.slug)
409 return new_data
411 def update_recipe_image(self, slug: str, image: bytes, extension: str): 1b
412 recipe = self.get_one(slug) 1ija
413 if not self.can_update(recipe):
414 raise exceptions.PermissionDenied("You do not have permission to edit this recipe.")
416 data_service = RecipeDataService(recipe.id)
417 data_service.write_image(image, extension)
419 return self.group_recipes.update_image(slug, extension)
421 def patch_one(self, slug_or_id: str | UUID, patch_data: Recipe) -> Recipe: 1b
422 recipe: Recipe = self._pre_update_check(slug_or_id, patch_data) 1defa
424 new_data = self.group_recipes.patch(recipe.slug, patch_data.model_dump(exclude_unset=True))
426 self.check_assets(new_data, recipe.slug)
427 return new_data
429 def update_last_made(self, slug_or_id: str | UUID, timestamp: datetime) -> Recipe: 1b
430 # we bypass the pre update check since any user can update a recipe's last made date, even if it's locked,
431 # or if the user belongs to a different household
433 household_service = HouseholdService(self.user.group_id, self.user.household_id, self.repos)
434 household_service.set_household_recipe(slug_or_id, HouseholdRecipeUpdate(last_made=timestamp))
436 return self.get_one(slug_or_id)
438 def delete_one(self, slug_or_id: str | UUID) -> Recipe: 1b
439 recipe = self.get_one(slug_or_id)
441 if not self.can_delete(recipe):
442 raise exceptions.PermissionDenied("You do not have permission to delete this recipe.")
444 data = self.group_recipes.delete(recipe.id, "id")
445 self.delete_assets(data)
446 return data
448 # =================================================================
449 # Recipe Template Methods
451 def render_template(self, recipe: Recipe, temp_dir: Path, template: str) -> Path: 1b
452 t_service = TemplateService(temp_dir)
453 return t_service.render(recipe, template)
456class OpenAIRecipeService(RecipeServiceBase): 1b
457 def _convert_recipe(self, openai_recipe: OpenAIRecipe) -> Recipe: 1b
458 return Recipe(
459 user_id=self.user.id,
460 group_id=self.user.group_id,
461 household_id=self.household.id,
462 name=openai_recipe.name,
463 slug=create_recipe_slug(openai_recipe.name),
464 description=openai_recipe.description,
465 recipe_yield=openai_recipe.recipe_yield,
466 total_time=openai_recipe.total_time,
467 prep_time=openai_recipe.prep_time,
468 perform_time=openai_recipe.perform_time,
469 recipe_ingredient=[
470 RecipeIngredient(title=ingredient.title, note=ingredient.text)
471 for ingredient in openai_recipe.ingredients
472 if ingredient.text
473 ],
474 recipe_instructions=[
475 RecipeStep(title=instruction.title, text=instruction.text)
476 for instruction in openai_recipe.instructions
477 if instruction.text
478 ],
479 notes=[RecipeNote(title=note.title or "", text=note.text) for note in openai_recipe.notes if note.text],
480 )
482 async def build_recipe_from_images(self, images: list[Path], translate_language: str | None) -> Recipe: 1b
483 settings = get_app_settings()
484 if not (settings.OPENAI_ENABLED and settings.OPENAI_ENABLE_IMAGE_SERVICES):
485 raise ValueError("OpenAI image services are not available")
487 openai_service = OpenAIService()
488 prompt = openai_service.get_prompt(
489 "recipes.parse-recipe-image",
490 data_injections=[
491 OpenAIDataInjection(
492 description=(
493 "This is the JSON response schema. You must respond in valid JSON that follows this schema. "
494 "Your payload should be as compact as possible, eliminating unncessesary whitespace. "
495 "Any fields with default values which you do not populate should not be in the payload."
496 ),
497 value=OpenAIRecipe,
498 )
499 ],
500 )
502 openai_images = [OpenAILocalImage(filename=os.path.basename(image), path=image) for image in images]
503 message = (
504 f"Please extract the recipe from the {'images' if len(openai_images) > 1 else 'image'} provided."
505 "There should be exactly one recipe."
506 )
508 if translate_language:
509 message += f" Please translate the recipe to {translate_language}."
511 try:
512 response = await openai_service.get_response(
513 prompt, message, images=openai_images, force_json_response=True
514 )
515 except Exception as e:
516 raise Exception("Failed to call OpenAI services") from e
518 try:
519 openai_recipe = OpenAIRecipe.parse_openai_response(response)
520 recipe = self._convert_recipe(openai_recipe)
521 except Exception as e:
522 raise ValueError("Unable to parse recipe from image") from e
524 return recipe