Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scraper/scraper_strategies.py: 43%
211 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 14:03 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 14:03 +0000
1import time 1b
2from abc import ABC, abstractmethod 1b
3from collections.abc import Callable 1b
4from typing import Any 1b
6import bs4 1b
7import extruct 1b
8from fastapi import HTTPException, status 1b
9from httpx import AsyncClient, Response 1b
10from recipe_scrapers import NoSchemaFoundInWildMode, SchemaScraperFactory, scrape_html 1b
11from slugify import slugify 1b
12from w3lib.html import get_base_url 1b
14from mealie.core.config import get_app_settings 1b
15from mealie.core.root_logger import get_logger 1b
16from mealie.lang.providers import Translator 1b
17from mealie.pkgs import safehttp 1b
18from mealie.schema.recipe.recipe import Recipe, RecipeStep 1b
19from mealie.services.openai import OpenAIService 1b
20from mealie.services.scraper.scraped_extras import ScrapedExtras 1b
22from . import cleaner 1b
23from .user_agents_manager import get_user_agents_manager 1b
25SCRAPER_TIMEOUT = 15 1b
26logger = get_logger() 1b
29class ForceTimeoutException(Exception): 1b
30 pass 1b
33async def safe_scrape_html(url: str) -> str: 1b
34 """
35 Scrapes the html from a url but will cancel the request
36 if the request takes longer than 15 seconds. This is used to mitigate
37 DDOS attacks from users providing a url with arbitrary large content.
38 """
39 user_agents_manager = get_user_agents_manager() 1ad
41 logger.debug(f"Scraping URL: {url}") 1ad
42 async with AsyncClient(transport=safehttp.AsyncSafeTransport()) as client: 1ad
43 for user_agent in user_agents_manager.user_agents: 43 ↛ 70line 43 didn't jump to line 70 because the loop on line 43 didn't complete1ad
44 logger.debug(f'Trying User-Agent: "{user_agent}"') 1ad
46 response: Response | None = None 1ad
47 html_bytes = b"" 1ad
48 async with client.stream( 1ad
49 "GET",
50 url,
51 timeout=SCRAPER_TIMEOUT,
52 headers=user_agents_manager.get_scrape_headers(user_agent),
53 follow_redirects=True,
54 ) as resp:
55 if resp.status_code == status.HTTP_403_FORBIDDEN: 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true1a
56 logger.debug(f'403 Forbidden with User-Agent: "{user_agent}"')
57 continue
59 start_time = time.time() 1a
61 async for chunk in resp.aiter_bytes(chunk_size=1024): 1a
62 html_bytes += chunk 1a
64 if time.time() - start_time > SCRAPER_TIMEOUT: 64 ↛ 65line 64 didn't jump to line 65 because the condition on line 64 was never true1a
65 raise ForceTimeoutException()
67 response = resp 1a
68 break 1a
70 if not (response and html_bytes): 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true1a
71 return ""
73 # =====================================
74 # Copied from requests text property
76 # Try charset from content-type
77 content = None 1a
78 encoding = response.encoding 1a
80 # Fallback to auto-detected encoding.
81 if encoding is None: 81 ↛ 82line 81 didn't jump to line 82 because the condition on line 81 was never true1a
82 encoding = response.apparent_encoding
84 # Decode unicode from given encoding.
85 try: 1a
86 content = str(html_bytes, encoding, errors="replace") 1a
87 except (LookupError, TypeError):
88 # A LookupError is raised if the encoding was not found which could
89 # indicate a misspelling or similar mistake.
90 #
91 # A TypeError can be raised if encoding is None
92 #
93 # So we try blindly encoding.
94 content = str(html_bytes, errors="replace")
96 return content 1a
99class ABCScraperStrategy(ABC): 1b
100 """
101 Abstract class for all recipe parsers.
102 """
104 url: str 1b
106 def __init__( 1b
107 self,
108 url: str,
109 translator: Translator,
110 raw_html: str | None = None,
111 ) -> None:
112 self.logger = get_logger() 1adc
113 self.url = url 1adc
114 self.raw_html = raw_html 1adc
115 self.translator = translator 1adc
117 @abstractmethod 1b
118 async def get_html(self, url: str) -> str: ... 118 ↛ exitline 118 didn't return from function 'get_html' because 1b
120 @abstractmethod 1b
121 async def parse(self) -> tuple[Recipe, ScrapedExtras] | tuple[None, None]: 1b
122 """Parse a recipe from a web URL.
124 Args:
125 recipe_url (str): Full URL of the recipe to scrape.
127 Returns:
128 Recipe: Recipe object.
129 """
130 ...
133class RecipeScraperPackage(ABCScraperStrategy): 1b
134 @staticmethod 1b
135 def ld_json_to_html(ld_json: str) -> str: 1b
136 return (
137 "<!DOCTYPE html><html><head>"
138 f'<script type="application/ld+json">{ld_json}</script>'
139 "</head><body></body></html>"
140 )
142 async def get_html(self, url: str) -> str: 1b
143 return self.raw_html or await safe_scrape_html(url) 1adc
145 def clean_scraper(self, scraped_data: SchemaScraperFactory.SchemaScraper, url: str) -> tuple[Recipe, ScrapedExtras]: 1b
146 def try_get_default(
147 func_call: Callable | None,
148 get_attr: str,
149 default: Any,
150 clean_func=None,
151 **clean_func_kwargs,
152 ):
153 value = default
155 if func_call:
156 try:
157 value = func_call()
158 except Exception:
159 self.logger.error(f"Error parsing recipe func_call for '{get_attr}'")
161 if value == default:
162 try:
163 value = scraped_data.schema.data.get(get_attr)
164 except Exception:
165 self.logger.error(f"Error parsing recipe attribute '{get_attr}'")
167 if clean_func:
168 value = clean_func(value, **clean_func_kwargs)
170 return value
172 def get_instructions() -> list[RecipeStep]:
173 instruction_as_text = try_get_default(
174 scraped_data.instructions,
175 "recipeInstructions",
176 ["No Instructions Found"],
177 )
179 self.logger.debug(f"Scraped Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
181 instruction_as_text = cleaner.clean_instructions(instruction_as_text)
183 self.logger.debug(f"Cleaned Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}")
185 try:
186 return [RecipeStep(title="", text=x.get("text")) for x in instruction_as_text]
187 except TypeError:
188 return []
190 cook_time = try_get_default(
191 None, "performTime", None, cleaner.clean_time, translator=self.translator
192 ) or try_get_default(scraped_data.cook_time, "cookTime", None, cleaner.clean_time, translator=self.translator)
194 extras = ScrapedExtras()
196 extras.set_tags(try_get_default(scraped_data.keywords, "keywords", "", cleaner.clean_tags))
198 recipe = Recipe(
199 name=try_get_default(scraped_data.title, "name", "No Name Found", cleaner.clean_string),
200 slug="",
201 image=try_get_default(scraped_data.image, "image", None, cleaner.clean_image),
202 description=try_get_default(scraped_data.description, "description", "", cleaner.clean_string),
203 nutrition=try_get_default(scraped_data.nutrients, "nutrition", None, cleaner.clean_nutrition),
204 recipe_yield=try_get_default(scraped_data.yields, "recipeYield", "1", cleaner.clean_string),
205 recipe_ingredient=try_get_default(
206 scraped_data.ingredients,
207 "recipeIngredient",
208 [""],
209 cleaner.clean_ingredients,
210 ),
211 recipe_instructions=get_instructions(),
212 total_time=try_get_default(
213 scraped_data.total_time, "totalTime", None, cleaner.clean_time, translator=self.translator
214 ),
215 prep_time=try_get_default(
216 scraped_data.prep_time, "prepTime", None, cleaner.clean_time, translator=self.translator
217 ),
218 perform_time=cook_time,
219 org_url=url or try_get_default(None, "url", None, cleaner.clean_string),
220 )
222 return recipe, extras
224 async def scrape_url(self) -> SchemaScraperFactory.SchemaScraper | Any | None: 1b
225 recipe_html = await self.get_html(self.url) 1adc
227 try: 1adc
228 # scrape_html requires a URL, but we might not have one, so we default to a dummy URL
229 scraped_schema = scrape_html(recipe_html, org_url=self.url or "https://example.com", supported_only=False) 1adc
230 except (NoSchemaFoundInWildMode, AttributeError): 1adc
231 self.logger.error(f"Recipe Scraper was unable to extract a recipe from {self.url}") 1adc
232 return None 1adc
234 except ConnectionError as e:
235 raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": "CONNECTION_ERROR"}) from e
237 # Check to see if the recipe is valid
238 try:
239 ingredients = scraped_schema.ingredients()
240 except Exception:
241 ingredients = []
243 try:
244 instruct: list | str = scraped_schema.instructions()
245 except Exception:
246 instruct = []
248 if instruct or ingredients:
249 return scraped_schema
251 self.logger.debug(f"Recipe Scraper [Package] was unable to extract a recipe from {self.url}")
252 return None
254 async def parse(self): 1b
255 """
256 Parse a recipe from a given url.
257 """
258 scraped_data = await self.scrape_url() 1ac
260 if scraped_data is None: 260 ↛ 263line 260 didn't jump to line 263 because the condition on line 260 was always true1ac
261 return None 1ac
263 return self.clean_scraper(scraped_data, self.url)
266class RecipeScraperOpenAI(RecipeScraperPackage): 1b
267 """
268 A wrapper around the `RecipeScraperPackage` class that uses OpenAI to extract the recipe from the URL,
269 rather than trying to scrape it directly.
270 """
272 def extract_json_ld_data_from_html(self, soup: bs4.BeautifulSoup) -> str: 1b
273 data_parts: list[str] = []
274 for script in soup.find_all("script", type="application/ld+json"):
275 try:
276 script_data = script.string
277 if script_data:
278 data_parts.append(str(script_data))
279 except AttributeError:
280 pass
282 return "\n\n".join(data_parts)
284 def find_image(self, soup: bs4.BeautifulSoup) -> str | None: 1b
285 # find the open graph image tag
286 og_image = soup.find("meta", property="og:image")
287 if og_image and og_image.get("content"):
288 return og_image["content"]
290 # find the largest image on the page
291 largest_img = None
292 max_size = 0
293 for img in soup.find_all("img"):
294 width = img.get("width", 0)
295 height = img.get("height", 0)
296 if not width or not height:
297 continue
299 try:
300 size = int(width) * int(height)
301 except (ValueError, TypeError):
302 size = 1
303 if size > max_size:
304 max_size = size
305 largest_img = img
307 if largest_img:
308 return largest_img.get("src")
310 return None
312 def format_html_to_text(self, html: str) -> str: 1b
313 soup = bs4.BeautifulSoup(html, "lxml")
315 text = soup.get_text(separator="\n", strip=True)
316 text += self.extract_json_ld_data_from_html(soup)
317 if not text:
318 raise Exception("No text or ld+json data found in HTML")
320 try:
321 image = self.find_image(soup)
322 except Exception:
323 image = None
325 components = [f"Convert this content to JSON: {text}"]
326 if image:
327 components.append(f"Recipe Image: {image}")
328 return "\n".join(components)
330 async def get_html(self, url: str) -> str: 1b
331 settings = get_app_settings() 1adc
332 if not settings.OPENAI_ENABLED: 332 ↛ 335line 332 didn't jump to line 335 because the condition on line 332 was always true1adc
333 return "" 1adc
335 html = self.raw_html or await safe_scrape_html(url)
336 text = self.format_html_to_text(html)
337 try:
338 service = OpenAIService()
339 prompt = service.get_prompt("recipes.scrape-recipe")
341 response_json = await service.get_response(prompt, text, force_json_response=True)
342 if not response_json:
343 raise Exception("OpenAI did not return any data")
345 return self.ld_json_to_html(response_json)
346 except Exception:
347 self.logger.exception(f"OpenAI was unable to extract a recipe from {url}")
348 return ""
351class RecipeScraperOpenGraph(ABCScraperStrategy): 1b
352 async def get_html(self, url: str) -> str: 1b
353 return self.raw_html or await safe_scrape_html(url) 1ac
355 def get_recipe_fields(self, html) -> dict | None: 1b
356 """
357 Get the recipe fields from the Open Graph data.
358 """
360 def og_field(properties: dict, field_name: str) -> str: 1ac
361 return next((val for name, val in properties if name == field_name), "")
363 def og_fields(properties: list[tuple[str, str]], field_name: str) -> list[str]: 1ac
364 return list({val for name, val in properties if name == field_name})
366 base_url = get_base_url(html, self.url) 1ac
367 data = extruct.extract(html, base_url=base_url, errors="log") 1ac
368 try: 1ac
369 properties = data["opengraph"][0]["properties"] 1ac
370 except Exception: 1ac
371 return None 1ac
373 return {
374 "name": og_field(properties, "og:title"),
375 "description": og_field(properties, "og:description"),
376 "image": og_field(properties, "og:image"),
377 "recipeYield": "",
378 "recipeIngredient": ["Could not detect ingredients"],
379 "recipeInstructions": [{"text": "Could not detect instructions"}],
380 "slug": slugify(og_field(properties, "og:title")),
381 "orgURL": self.url or og_field(properties, "og:url"),
382 "categories": [],
383 "tags": og_fields(properties, "og:article:tag"),
384 "dateAdded": None,
385 "notes": [],
386 "extras": [],
387 }
389 async def parse(self): 1b
390 """
391 Parse a recipe from a given url.
392 """
393 html = await self.get_html(self.url) 1ac
395 og_data = self.get_recipe_fields(html) 1ac
397 if og_data is None: 397 ↛ 400line 397 didn't jump to line 400 because the condition on line 397 was always true1ac
398 return None 1ac
400 return Recipe(**og_data), ScrapedExtras()