Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/recipe/recipe_data_service.py: 16%
105 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1import asyncio 1a
2import shutil 1a
3from pathlib import Path 1a
5from httpx import AsyncClient, Response 1a
6from pydantic import UUID4 1a
8from mealie.pkgs import img, safehttp 1a
9from mealie.pkgs.safehttp.transport import AsyncSafeTransport 1a
10from mealie.schema.recipe.recipe import Recipe 1a
11from mealie.services._base_service import BaseService 1a
12from mealie.services.scraper.user_agents_manager import get_user_agents_manager 1a
15async def gather_with_concurrency(n, *coros, ignore_exceptions=False): 1a
16 semaphore = asyncio.Semaphore(n)
18 async def sem_coro(coro):
19 async with semaphore:
20 return await coro
22 results = await asyncio.gather(*(sem_coro(c) for c in coros), return_exceptions=ignore_exceptions)
23 if ignore_exceptions:
24 results = [r for r in results if not isinstance(r, Exception)]
25 return results
28async def largest_content_len(urls: list[str]) -> tuple[str, int]: 1a
29 user_agent_manager = get_user_agents_manager()
31 largest_url = ""
32 largest_len = 0
34 max_concurrency = 10
36 async def do(client: AsyncClient, url: str) -> Response:
37 return await client.head(url, headers=user_agent_manager.get_scrape_headers())
39 async with AsyncClient(transport=safehttp.AsyncSafeTransport()) as client:
40 tasks = [do(client, url) for url in urls]
41 responses: list[Response] = await gather_with_concurrency(max_concurrency, *tasks, ignore_exceptions=True)
42 for response in responses:
43 len_int = int(response.headers.get("Content-Length", 0))
44 if len_int > largest_len:
45 largest_url = str(response.url)
46 largest_len = len_int
48 return largest_url, largest_len
51class NotAnImageError(Exception): 1a
52 pass 1a
55class InvalidDomainError(Exception): 1a
56 pass 1a
59class RecipeDataService(BaseService): 1a
60 minifier: img.ABCMinifier 1a
62 def __init__(self, recipe_id: UUID4) -> None: 1a
63 """
64 RecipeDataService is a service that consolidates the reading/writing actions related
65 to assets, and images for a recipe.
66 """
67 super().__init__()
69 self.recipe_id = recipe_id
70 self.minifier = img.PillowMinifier(purge=True, logger=self.logger)
72 self.dir_data = Recipe.directory_from_id(self.recipe_id)
73 self.dir_image = self.dir_data.joinpath("images")
74 self.dir_image_timeline = self.dir_image.joinpath("timeline")
75 self.dir_assets = self.dir_data.joinpath("assets")
77 for dir in [self.dir_image, self.dir_image_timeline, self.dir_assets]:
78 dir.mkdir(parents=True, exist_ok=True)
80 def delete_all_data(self) -> None: 1a
81 try:
82 shutil.rmtree(self.dir_data)
83 except Exception as e:
84 self.logger.exception(f"Failed to delete recipe data: {e}")
86 def write_image(self, file_data: bytes | Path, extension: str, image_dir: Path | None = None) -> Path: 1a
87 if not image_dir:
88 image_dir = self.dir_image
90 extension = extension.replace(".", "")
91 image_path = image_dir.joinpath(f"original.{extension}")
92 image_path.unlink(missing_ok=True)
94 if isinstance(file_data, Path):
95 shutil.copy2(file_data, image_path)
96 elif isinstance(file_data, bytes):
97 with open(image_path, "ab") as f:
98 f.write(file_data)
99 else:
100 with open(image_path, "ab") as f:
101 shutil.copyfileobj(file_data, f)
103 self.minifier.minify(image_path)
105 return image_path
107 async def scrape_image(self, image_url: str | dict[str, str] | list[str]) -> None: 1a
108 self.logger.info(f"Image URL: {image_url}")
109 user_agent = get_user_agents_manager().user_agents[0]
111 image_url_str = ""
113 if isinstance(image_url, str): # Handles String Types
114 image_url_str = image_url
116 elif isinstance(image_url, list): # Handles List Types
117 # Multiple images have been defined in the schema - usually different resolutions
118 # Typically would be in smallest->biggest order, but can't be certain so test each.
119 # 'Google will pick the best image to display in Search results based on the aspect ratio and resolution.'
120 image_url_str, _ = await largest_content_len(image_url)
122 elif isinstance(image_url, dict): # Handles Dictionary Types
123 for key in image_url:
124 if key == "url":
125 image_url_str = image_url.get("url", "")
127 if not image_url_str:
128 raise ValueError(f"image url could not be parsed from input: {image_url}")
130 ext = image_url_str.split(".")[-1]
132 if ext not in img.IMAGE_EXTENSIONS:
133 ext = "jpg" # Guess the extension
135 file_name = f"{self.recipe_id!s}.{ext}"
136 file_path = Recipe.directory_from_id(self.recipe_id).joinpath("images", file_name)
138 async with AsyncClient(transport=AsyncSafeTransport()) as client:
139 try:
140 r = await client.get(image_url_str, headers={"User-Agent": user_agent})
141 except Exception:
142 self.logger.exception("Fatal Image Request Exception")
143 return None
145 if r.status_code != 200:
146 # TODO: Probably should throw an exception in this case as well, but before these changes
147 # we were returning None if it failed anyways.
148 return None
150 content_type = r.headers.get("content-type", "")
152 if "image" not in content_type:
153 self.logger.error(f"Content-Type: {content_type} is not an image")
154 raise NotAnImageError(f"Content-Type {content_type} is not an image")
156 self.logger.debug(f"File Name Suffix {file_path.suffix}")
157 self.write_image(r.read(), file_path.suffix)
158 file_path.unlink(missing_ok=True)