Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scraper/scraper_strategies.py: 35%
211 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
1import time 1a
2from abc import ABC, abstractmethod 1a
3from collections.abc import Callable 1a
4from typing import Any 1a
6import bs4 1a
7import extruct 1a
8from fastapi import HTTPException, status 1a
9from httpx import AsyncClient, Response 1a
10from recipe_scrapers import NoSchemaFoundInWildMode, SchemaScraperFactory, scrape_html 1a
11from slugify import slugify 1a
12from w3lib.html import get_base_url 1a
14from mealie.core.config import get_app_settings 1a
15from mealie.core.root_logger import get_logger 1a
16from mealie.lang.providers import Translator 1a
17from mealie.pkgs import safehttp 1a
18from mealie.schema.recipe.recipe import Recipe, RecipeStep 1a
19from mealie.services.openai import OpenAIService 1a
20from mealie.services.scraper.scraped_extras import ScrapedExtras 1a
22from . import cleaner 1a
23from .user_agents_manager import get_user_agents_manager 1a
25SCRAPER_TIMEOUT = 15 1a
26logger = get_logger() 1a
29class ForceTimeoutException(Exception): 1a
30 pass 1a
33async def safe_scrape_html(url: str) -> str: 1a
34 """
35 Scrapes the html from a url but will cancel the request
36 if the request takes longer than 15 seconds. This is used to mitigate
37 DDOS attacks from users providing a url with arbitrary large content.
38 """
39 user_agents_manager = get_user_agents_manager() 1c
41 logger.debug(f"Scraping URL: {url}") 1c
42 async with AsyncClient(transport=safehttp.AsyncSafeTransport()) as client: 1c
43 for user_agent in user_agents_manager.user_agents: 43 ↛ 70line 43 didn't jump to line 70 because the loop on line 43 didn't complete1c
44 logger.debug(f'Trying User-Agent: "{user_agent}"') 1c
46 response: Response | None = None 1c
47 html_bytes = b"" 1c
48 async with client.stream( 1c
49 "GET",
50 url,
51 timeout=SCRAPER_TIMEOUT,
52 headers=user_agents_manager.get_scrape_headers(user_agent),
53 follow_redirects=True,
54 ) as resp:
55 if resp.status_code == status.HTTP_403_FORBIDDEN:
56 logger.debug(f'403 Forbidden with User-Agent: "{user_agent}"')
57 continue
59 start_time = time.time()
61 async for chunk in resp.aiter_bytes(chunk_size=1024):
62 html_bytes += chunk
64 if time.time() - start_time > SCRAPER_TIMEOUT:
65 raise ForceTimeoutException()
67 response = resp
68 break
70 if not (response and html_bytes):
71 return ""
73 # =====================================
74 # Copied from requests text property
76 # Try charset from content-type
77 content = None
78 encoding = response.encoding
80 # Fallback to auto-detected encoding.
81 if encoding is None:
82 encoding = response.apparent_encoding
84 # Decode unicode from given encoding.
85 try:
86 content = str(html_bytes, encoding, errors="replace")
87 except (LookupError, TypeError):
88 # A LookupError is raised if the encoding was not found which could
89 # indicate a misspelling or similar mistake.
90 #
91 # A TypeError can be raised if encoding is None
92 #
93 # So we try blindly encoding.
94 content = str(html_bytes, errors="replace")
96 return content
99class ABCScraperStrategy(ABC): 1a
100 """
101 Abstract class for all recipe parsers.
102 """
104 url: str 1a
106 def __init__( 1a
107 self,
108 url: str,
109 translator: Translator,
110 raw_html: str | None = None,
111 ) -> None:
112 self.logger = get_logger() 1cb
113 self.url = url 1cb
114 self.raw_html = raw_html 1cb
115 self.translator = translator 1cb
117 @abstractmethod 1a
118 async def get_html(self, url: str) -> str: ... 118 ↛ exitline 118 didn't return from function 'get_html' because 1a
120 @abstractmethod 1a
121 async def parse(self) -> tuple[Recipe, ScrapedExtras] | tuple[None, None]: 1a
122 """Parse a recipe from a web URL.
124 Args:
125 recipe_url (str): Full URL of the recipe to scrape.
127 Returns:
128 Recipe: Recipe object.
129 """
130 ...
133class RecipeScraperPackage(ABCScraperStrategy): 1a
134 @staticmethod 1a
135 def ld_json_to_html(ld_json: str) -> str: 1a
136 return (
137 "<!DOCTYPE html><html><head>"
138 f'<script type="application/ld+json">{ld_json}</script>'
139 "</head><body></body></html>"
140 )
142 async def get_html(self, url: str) -> str: 1a
143 return self.raw_html or await safe_scrape_html(url) 1cb
145 def clean_scraper(self, scraped_data: SchemaScraperFactory.SchemaScraper, url: str) -> tuple[Recipe, ScrapedExtras]: 1a
146 def try_get_default(
147 func_call: Callable | None,
148 get_attr: str,
149 default: Any,
150 clean_func=None,
151 **clean_func_kwargs,
152 ):
153 value = default
155 if func_call:
156 try:
157 value = func_call()
158 except Exception:
159 self.logger.error(f"Error parsing recipe func_call for '{get_attr}'")
161 if value == default:
162 try:
163 value = scraped_data.schema.data.get(get_attr)
164 except Exception:
165 self.logger.error(f"Error parsing recipe attribute '{get_attr}'")
167 if clean_func:
168 value = clean_func(value, **clean_func_kwargs)
170 return value
172 def get_instructions() -> list[RecipeStep]:
173 instruction_as_text = try_get_default(
174 scraped_data.instructions,
175 "recipeInstructions",
176 ["No Instructions Found"],
177 )
179 self.logger.debug(f"Scraped Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
181 instruction_as_text = cleaner.clean_instructions(instruction_as_text)
183 self.logger.debug(f"Cleaned Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
185 try:
186 return [RecipeStep(title="", text=x.get("text")) for x in instruction_as_text]
187 except TypeError:
188 return []
190 cook_time = try_get_default(
191 None, "performTime", None, cleaner.clean_time, translator=self.translator
192 ) or try_get_default(scraped_data.cook_time, "cookTime", None, cleaner.clean_time, translator=self.translator)
194 extras = ScrapedExtras()
196 extras.set_tags(try_get_default(scraped_data.keywords, "keywords", "", cleaner.clean_tags))
198 recipe = Recipe(
199 name=try_get_default(scraped_data.title, "name", "No Name Found", cleaner.clean_string),
200 slug="",
201 image=try_get_default(scraped_data.image, "image", None, cleaner.clean_image),
202 description=try_get_default(scraped_data.description, "description", "", cleaner.clean_string),
203 nutrition=try_get_default(scraped_data.nutrients, "nutrition", None, cleaner.clean_nutrition),
204 recipe_yield=try_get_default(scraped_data.yields, "recipeYield", "1", cleaner.clean_string),
205 recipe_ingredient=try_get_default(
206 scraped_data.ingredients,
207 "recipeIngredient",
208 [""],
209 cleaner.clean_ingredients,
210 ),
211 recipe_instructions=get_instructions(),
212 total_time=try_get_default(
213 scraped_data.total_time, "totalTime", None, cleaner.clean_time, translator=self.translator
214 ),
215 prep_time=try_get_default(
216 scraped_data.prep_time, "prepTime", None, cleaner.clean_time, translator=self.translator
217 ),
218 perform_time=cook_time,
219 org_url=url or try_get_default(None, "url", None, cleaner.clean_string),
220 )
222 return recipe, extras
224 async def scrape_url(self) -> SchemaScraperFactory.SchemaScraper | Any | None: 1a
225 recipe_html = await self.get_html(self.url) 1cb
227 try: 1cb
228 # scrape_html requires a URL, but we might not have one, so we default to a dummy URL
229 scraped_schema = scrape_html(recipe_html, org_url=self.url or "https://example.com", supported_only=False) 1cb
230 except (NoSchemaFoundInWildMode, AttributeError): 1cb
231 self.logger.error(f"Recipe Scraper was unable to extract a recipe from {self.url}") 1cb
232 return None 1cb
234 except ConnectionError as e:
235 raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": "CONNECTION_ERROR"}) from e
237 # Check to see if the recipe is valid
238 try:
239 ingredients = scraped_schema.ingredients()
240 except Exception:
241 ingredients = []
243 try:
244 instruct: list | str = scraped_schema.instructions()
245 except Exception:
246 instruct = []
248 if instruct or ingredients:
249 return scraped_schema
251 self.logger.debug(f"Recipe Scraper [Package] was unable to extract a recipe from {self.url}")
252 return None
254 async def parse(self): 1a
255 """
256 Parse a recipe from a given url.
257 """
258 scraped_data = await self.scrape_url()
260 if scraped_data is None: 260 ↛ 263line 260 didn't jump to line 263 because the condition on line 260 was always true
261 return None
263 return self.clean_scraper(scraped_data, self.url)
266class RecipeScraperOpenAI(RecipeScraperPackage): 1a
267 """
268 A wrapper around the `RecipeScraperPackage` class that uses OpenAI to extract the recipe from the URL,
269 rather than trying to scrape it directly.
270 """
272 def extract_json_ld_data_from_html(self, soup: bs4.BeautifulSoup) -> str: 1a
273 data_parts: list[str] = []
274 for script in soup.find_all("script", type="application/ld+json"):
275 try:
276 script_data = script.string
277 if script_data:
278 data_parts.append(str(script_data))
279 except AttributeError:
280 pass
282 return "\n\n".join(data_parts)
284 def find_image(self, soup: bs4.BeautifulSoup) -> str | None: 1a
285 # find the open graph image tag
286 og_image = soup.find("meta", property="og:image")
287 if og_image and og_image.get("content"):
288 return og_image["content"]
290 # find the largest image on the page
291 largest_img = None
292 max_size = 0
293 for img in soup.find_all("img"):
294 width = img.get("width", 0)
295 height = img.get("height", 0)
296 if not width or not height:
297 continue
299 try:
300 size = int(width) * int(height)
301 except (ValueError, TypeError):
302 size = 1
303 if size > max_size:
304 max_size = size
305 largest_img = img
307 if largest_img:
308 return largest_img.get("src")
310 return None
312 def format_html_to_text(self, html: str) -> str: 1a
313 soup = bs4.BeautifulSoup(html, "lxml")
315 text = soup.get_text(separator="\n", strip=True)
316 text += self.extract_json_ld_data_from_html(soup)
317 if not text:
318 raise Exception("No text or ld+json data found in HTML")
320 try:
321 image = self.find_image(soup)
322 except Exception:
323 image = None
325 components = [f"Convert this content to JSON: {text}"]
326 if image:
327 components.append(f"Recipe Image: {image}")
328 return "\n".join(components)
330 async def get_html(self, url: str) -> str: 1a
331 settings = get_app_settings() 1cb
332 if not settings.OPENAI_ENABLED: 332 ↛ 335line 332 didn't jump to line 335 because the condition on line 332 was always true1cb
333 return "" 1cb
335 html = self.raw_html or await safe_scrape_html(url)
336 text = self.format_html_to_text(html)
337 try:
338 service = OpenAIService()
339 prompt = service.get_prompt("recipes.scrape-recipe")
341 response_json = await service.get_response(prompt, text, force_json_response=True)
342 if not response_json:
343 raise Exception("OpenAI did not return any data")
345 return self.ld_json_to_html(response_json)
346 except Exception:
347 self.logger.exception(f"OpenAI was unable to extract a recipe from {url}")
348 return ""
351class RecipeScraperOpenGraph(ABCScraperStrategy): 1a
352 async def get_html(self, url: str) -> str: 1a
353 return self.raw_html or await safe_scrape_html(url)
355 def get_recipe_fields(self, html) -> dict | None: 1a
356 """
357 Get the recipe fields from the Open Graph data.
358 """
360 def og_field(properties: dict, field_name: str) -> str:
361 return next((val for name, val in properties if name == field_name), "")
363 def og_fields(properties: list[tuple[str, str]], field_name: str) -> list[str]:
364 return list({val for name, val in properties if name == field_name})
366 base_url = get_base_url(html, self.url)
367 data = extruct.extract(html, base_url=base_url, errors="log")
368 try:
369 properties = data["opengraph"][0]["properties"]
370 except Exception:
371 return None
373 return {
374 "name": og_field(properties, "og:title"),
375 "description": og_field(properties, "og:description"),
376 "image": og_field(properties, "og:image"),
377 "recipeYield": "",
378 "recipeIngredient": ["Could not detect ingredients"],
379 "recipeInstructions": [{"text": "Could not detect instructions"}],
380 "slug": slugify(og_field(properties, "og:title")),
381 "orgURL": self.url or og_field(properties, "og:url"),
382 "categories": [],
383 "tags": og_fields(properties, "og:article:tag"),
384 "dateAdded": None,
385 "notes": [],
386 "extras": [],
387 }
389 async def parse(self): 1a
390 """
391 Parse a recipe from a given url.
392 """
393 html = await self.get_html(self.url)
395 og_data = self.get_recipe_fields(html)
397 if og_data is None: 397 ↛ 400line 397 didn't jump to line 400 because the condition on line 397 was always true
398 return None
400 return Recipe(**og_data), ScrapedExtras()