Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scraper/scraper_strategies.py: 35%

211 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1import time 1a

2from abc import ABC, abstractmethod 1a

3from collections.abc import Callable 1a

4from typing import Any 1a

5 

6import bs4 1a

7import extruct 1a

8from fastapi import HTTPException, status 1a

9from httpx import AsyncClient, Response 1a

10from recipe_scrapers import NoSchemaFoundInWildMode, SchemaScraperFactory, scrape_html 1a

11from slugify import slugify 1a

12from w3lib.html import get_base_url 1a

13 

14from mealie.core.config import get_app_settings 1a

15from mealie.core.root_logger import get_logger 1a

16from mealie.lang.providers import Translator 1a

17from mealie.pkgs import safehttp 1a

18from mealie.schema.recipe.recipe import Recipe, RecipeStep 1a

19from mealie.services.openai import OpenAIService 1a

20from mealie.services.scraper.scraped_extras import ScrapedExtras 1a

21 

22from . import cleaner 1a

23from .user_agents_manager import get_user_agents_manager 1a

24 

25SCRAPER_TIMEOUT = 15 1a

26logger = get_logger() 1a

27 

28 

29class ForceTimeoutException(Exception): 1a

30 pass 1a

31 

32 

33async def safe_scrape_html(url: str) -> str: 1a

34 """ 

35 Scrapes the html from a url but will cancel the request 

36 if the request takes longer than 15 seconds. This is used to mitigate 

37 DDOS attacks from users providing a url with arbitrary large content. 

38 """ 

39 user_agents_manager = get_user_agents_manager() 

40 

41 logger.debug(f"Scraping URL: {url}") 

42 async with AsyncClient(transport=safehttp.AsyncSafeTransport()) as client: 

43 for user_agent in user_agents_manager.user_agents: 43 ↛ 70line 43 didn't jump to line 70 because the loop on line 43 didn't complete

44 logger.debug(f'Trying User-Agent: "{user_agent}"') 

45 

46 response: Response | None = None 

47 html_bytes = b"" 

48 async with client.stream( 

49 "GET", 

50 url, 

51 timeout=SCRAPER_TIMEOUT, 

52 headers=user_agents_manager.get_scrape_headers(user_agent), 

53 follow_redirects=True, 

54 ) as resp: 

55 if resp.status_code == status.HTTP_403_FORBIDDEN: 

56 logger.debug(f'403 Forbidden with User-Agent: "{user_agent}"') 

57 continue 

58 

59 start_time = time.time() 

60 

61 async for chunk in resp.aiter_bytes(chunk_size=1024): 

62 html_bytes += chunk 

63 

64 if time.time() - start_time > SCRAPER_TIMEOUT: 

65 raise ForceTimeoutException() 

66 

67 response = resp 

68 break 

69 

70 if not (response and html_bytes): 

71 return "" 

72 

73 # ===================================== 

74 # Copied from requests text property 

75 

76 # Try charset from content-type 

77 content = None 

78 encoding = response.encoding 

79 

80 # Fallback to auto-detected encoding. 

81 if encoding is None: 

82 encoding = response.apparent_encoding 

83 

84 # Decode unicode from given encoding. 

85 try: 

86 content = str(html_bytes, encoding, errors="replace") 

87 except (LookupError, TypeError): 

88 # A LookupError is raised if the encoding was not found which could 

89 # indicate a misspelling or similar mistake. 

90 # 

91 # A TypeError can be raised if encoding is None 

92 # 

93 # So we try blindly encoding. 

94 content = str(html_bytes, errors="replace") 

95 

96 return content 

97 

98 

99class ABCScraperStrategy(ABC): 1a

100 """ 

101 Abstract class for all recipe parsers. 

102 """ 

103 

104 url: str 1a

105 

106 def __init__( 1a

107 self, 

108 url: str, 

109 translator: Translator, 

110 raw_html: str | None = None, 

111 ) -> None: 

112 self.logger = get_logger() 1cb

113 self.url = url 1cb

114 self.raw_html = raw_html 1cb

115 self.translator = translator 1cb

116 

117 @abstractmethod 1a

118 async def get_html(self, url: str) -> str: ... 118 ↛ exitline 118 didn't return from function 'get_html' because 1a

119 

120 @abstractmethod 1a

121 async def parse(self) -> tuple[Recipe, ScrapedExtras] | tuple[None, None]: 1a

122 """Parse a recipe from a web URL. 

123 

124 Args: 

125 recipe_url (str): Full URL of the recipe to scrape. 

126 

127 Returns: 

128 Recipe: Recipe object. 

129 """ 

130 ... 

131 

132 

133class RecipeScraperPackage(ABCScraperStrategy): 1a

134 @staticmethod 1a

135 def ld_json_to_html(ld_json: str) -> str: 1a

136 return ( 

137 "<!DOCTYPE html><html><head>" 

138 f'<script type="application/ld+json">{ld_json}</script>' 

139 "</head><body></body></html>" 

140 ) 

141 

142 async def get_html(self, url: str) -> str: 1a

143 return self.raw_html or await safe_scrape_html(url) 1cb

144 

145 def clean_scraper(self, scraped_data: SchemaScraperFactory.SchemaScraper, url: str) -> tuple[Recipe, ScrapedExtras]: 1a

146 def try_get_default( 

147 func_call: Callable | None, 

148 get_attr: str, 

149 default: Any, 

150 clean_func=None, 

151 **clean_func_kwargs, 

152 ): 

153 value = default 

154 

155 if func_call: 

156 try: 

157 value = func_call() 

158 except Exception: 

159 self.logger.error(f"Error parsing recipe func_call for '{get_attr}'") 

160 

161 if value == default: 

162 try: 

163 value = scraped_data.schema.data.get(get_attr) 

164 except Exception: 

165 self.logger.error(f"Error parsing recipe attribute '{get_attr}'") 

166 

167 if clean_func: 

168 value = clean_func(value, **clean_func_kwargs) 

169 

170 return value 

171 

172 def get_instructions() -> list[RecipeStep]: 

173 instruction_as_text = try_get_default( 

174 scraped_data.instructions, 

175 "recipeInstructions", 

176 ["No Instructions Found"], 

177 ) 

178 

179 self.logger.debug(f"Scraped Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}") 

180 

181 instruction_as_text = cleaner.clean_instructions(instruction_as_text) 

182 

183 self.logger.debug(f"Cleaned Instructions: (Type: {type(instruction_as_text)}) \n {instruction_as_text}") 

184 

185 try: 

186 return [RecipeStep(title="", text=x.get("text")) for x in instruction_as_text] 

187 except TypeError: 

188 return [] 

189 

190 cook_time = try_get_default( 

191 None, "performTime", None, cleaner.clean_time, translator=self.translator 

192 ) or try_get_default(scraped_data.cook_time, "cookTime", None, cleaner.clean_time, translator=self.translator) 

193 

194 extras = ScrapedExtras() 

195 

196 extras.set_tags(try_get_default(scraped_data.keywords, "keywords", "", cleaner.clean_tags)) 

197 

198 recipe = Recipe( 

199 name=try_get_default(scraped_data.title, "name", "No Name Found", cleaner.clean_string), 

200 slug="", 

201 image=try_get_default(scraped_data.image, "image", None, cleaner.clean_image), 

202 description=try_get_default(scraped_data.description, "description", "", cleaner.clean_string), 

203 nutrition=try_get_default(scraped_data.nutrients, "nutrition", None, cleaner.clean_nutrition), 

204 recipe_yield=try_get_default(scraped_data.yields, "recipeYield", "1", cleaner.clean_string), 

205 recipe_ingredient=try_get_default( 

206 scraped_data.ingredients, 

207 "recipeIngredient", 

208 [""], 

209 cleaner.clean_ingredients, 

210 ), 

211 recipe_instructions=get_instructions(), 

212 total_time=try_get_default( 

213 scraped_data.total_time, "totalTime", None, cleaner.clean_time, translator=self.translator 

214 ), 

215 prep_time=try_get_default( 

216 scraped_data.prep_time, "prepTime", None, cleaner.clean_time, translator=self.translator 

217 ), 

218 perform_time=cook_time, 

219 org_url=url or try_get_default(None, "url", None, cleaner.clean_string), 

220 ) 

221 

222 return recipe, extras 

223 

224 async def scrape_url(self) -> SchemaScraperFactory.SchemaScraper | Any | None: 1a

225 recipe_html = await self.get_html(self.url) 1cb

226 

227 try: 1cb

228 # scrape_html requires a URL, but we might not have one, so we default to a dummy URL 

229 scraped_schema = scrape_html(recipe_html, org_url=self.url or "https://example.com", supported_only=False) 1cb

230 except (NoSchemaFoundInWildMode, AttributeError): 1cb

231 self.logger.error(f"Recipe Scraper was unable to extract a recipe from {self.url}") 1cb

232 return None 1cb

233 

234 except ConnectionError as e: 

235 raise HTTPException(status.HTTP_400_BAD_REQUEST, {"details": "CONNECTION_ERROR"}) from e 

236 

237 # Check to see if the recipe is valid 

238 try: 

239 ingredients = scraped_schema.ingredients() 

240 except Exception: 

241 ingredients = [] 

242 

243 try: 

244 instruct: list | str = scraped_schema.instructions() 

245 except Exception: 

246 instruct = [] 

247 

248 if instruct or ingredients: 

249 return scraped_schema 

250 

251 self.logger.debug(f"Recipe Scraper [Package] was unable to extract a recipe from {self.url}") 

252 return None 

253 

254 async def parse(self): 1a

255 """ 

256 Parse a recipe from a given url. 

257 """ 

258 scraped_data = await self.scrape_url() 1cb

259 

260 if scraped_data is None: 260 ↛ 263line 260 didn't jump to line 263 because the condition on line 260 was always true1cb

261 return None 1cb

262 

263 return self.clean_scraper(scraped_data, self.url) 

264 

265 

266class RecipeScraperOpenAI(RecipeScraperPackage): 1a

267 """ 

268 A wrapper around the `RecipeScraperPackage` class that uses OpenAI to extract the recipe from the URL, 

269 rather than trying to scrape it directly. 

270 """ 

271 

272 def extract_json_ld_data_from_html(self, soup: bs4.BeautifulSoup) -> str: 1a

273 data_parts: list[str] = [] 

274 for script in soup.find_all("script", type="application/ld+json"): 

275 try: 

276 script_data = script.string 

277 if script_data: 

278 data_parts.append(str(script_data)) 

279 except AttributeError: 

280 pass 

281 

282 return "\n\n".join(data_parts) 

283 

284 def find_image(self, soup: bs4.BeautifulSoup) -> str | None: 1a

285 # find the open graph image tag 

286 og_image = soup.find("meta", property="og:image") 

287 if og_image and og_image.get("content"): 

288 return og_image["content"] 

289 

290 # find the largest image on the page 

291 largest_img = None 

292 max_size = 0 

293 for img in soup.find_all("img"): 

294 width = img.get("width", 0) 

295 height = img.get("height", 0) 

296 if not width or not height: 

297 continue 

298 

299 try: 

300 size = int(width) * int(height) 

301 except (ValueError, TypeError): 

302 size = 1 

303 if size > max_size: 

304 max_size = size 

305 largest_img = img 

306 

307 if largest_img: 

308 return largest_img.get("src") 

309 

310 return None 

311 

312 def format_html_to_text(self, html: str) -> str: 1a

313 soup = bs4.BeautifulSoup(html, "lxml") 

314 

315 text = soup.get_text(separator="\n", strip=True) 

316 text += self.extract_json_ld_data_from_html(soup) 

317 if not text: 

318 raise Exception("No text or ld+json data found in HTML") 

319 

320 try: 

321 image = self.find_image(soup) 

322 except Exception: 

323 image = None 

324 

325 components = [f"Convert this content to JSON: {text}"] 

326 if image: 

327 components.append(f"Recipe Image: {image}") 

328 return "\n".join(components) 

329 

330 async def get_html(self, url: str) -> str: 1a

331 settings = get_app_settings() 1cb

332 if not settings.OPENAI_ENABLED: 332 ↛ 335line 332 didn't jump to line 335 because the condition on line 332 was always true1cb

333 return "" 1cb

334 

335 html = self.raw_html or await safe_scrape_html(url) 

336 text = self.format_html_to_text(html) 

337 try: 

338 service = OpenAIService() 

339 prompt = service.get_prompt("recipes.scrape-recipe") 

340 

341 response_json = await service.get_response(prompt, text, force_json_response=True) 

342 if not response_json: 

343 raise Exception("OpenAI did not return any data") 

344 

345 return self.ld_json_to_html(response_json) 

346 except Exception: 

347 self.logger.exception(f"OpenAI was unable to extract a recipe from {url}") 

348 return "" 

349 

350 

351class RecipeScraperOpenGraph(ABCScraperStrategy): 1a

352 async def get_html(self, url: str) -> str: 1a

353 return self.raw_html or await safe_scrape_html(url) 1cb

354 

355 def get_recipe_fields(self, html) -> dict | None: 1a

356 """ 

357 Get the recipe fields from the Open Graph data. 

358 """ 

359 

360 def og_field(properties: dict, field_name: str) -> str: 1cb

361 return next((val for name, val in properties if name == field_name), "") 

362 

363 def og_fields(properties: list[tuple[str, str]], field_name: str) -> list[str]: 1cb

364 return list({val for name, val in properties if name == field_name}) 

365 

366 base_url = get_base_url(html, self.url) 1cb

367 data = extruct.extract(html, base_url=base_url, errors="log") 1cb

368 try: 1cb

369 properties = data["opengraph"][0]["properties"] 1cb

370 except Exception: 1cb

371 return None 1cb

372 

373 return { 

374 "name": og_field(properties, "og:title"), 

375 "description": og_field(properties, "og:description"), 

376 "image": og_field(properties, "og:image"), 

377 "recipeYield": "", 

378 "recipeIngredient": ["Could not detect ingredients"], 

379 "recipeInstructions": [{"text": "Could not detect instructions"}], 

380 "slug": slugify(og_field(properties, "og:title")), 

381 "orgURL": self.url or og_field(properties, "og:url"), 

382 "categories": [], 

383 "tags": og_fields(properties, "og:article:tag"), 

384 "dateAdded": None, 

385 "notes": [], 

386 "extras": [], 

387 } 

388 

389 async def parse(self): 1a

390 """ 

391 Parse a recipe from a given url. 

392 """ 

393 html = await self.get_html(self.url) 1cb

394 

395 og_data = self.get_recipe_fields(html) 1cb

396 

397 if og_data is None: 397 ↛ 400line 397 didn't jump to line 400 because the condition on line 397 was always true1cb

398 return None 1cb

399 

400 return Recipe(**og_data), ScrapedExtras()