Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scraper/cleaner.py: 10%

288 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1import contextlib 1a

2import functools 1a

3import html 1a

4import json 1a

5import numbers 1a

6import operator 1a

7import re 1a

8import typing 1a

9from datetime import datetime, timedelta 1a

10 

11from slugify import slugify 1a

12 

13from mealie.core.root_logger import get_logger 1a

14from mealie.lang.providers import Translator, get_all_translations 1a

15from mealie.schema.recipe.recipe import Recipe 1a

16from mealie.services.parser_services.parser_utils import extract_quantity_from_string 1a

17 

18logger = get_logger("recipe-scraper") 1a

19 

20 

21MATCH_DIGITS = re.compile(r"\d+([.,]\d+)?") 1a

22""" Allow for commas as decimals (common in Europe) """ 1a

23 

24MATCH_ISO_STR = re.compile( 1a

25 r"^P((\d+)Y)?((\d+)M)?((?P<days>\d+)D)?" r"T((?P<hours>\d+)H)?((?P<minutes>\d+)M)?((?P<seconds>\d+(?:\.\d+)?)S)?$", 

26) 

27""" Match Duration Strings """ 1a

28 

29MATCH_HTML_TAGS = re.compile(r"<[^<]+?>") 1a

30""" Matches HTML tags `<p>Text</p>` -> `Text` """ 1a

31 

32MATCH_MULTI_SPACE = re.compile(r" +") 1a

33""" Matches multiple spaces `Hello World` -> `Hello World` """ 1a

34 

35MATCH_ERRONEOUS_WHITE_SPACE = re.compile(r"\n\s*\n") 1a

36""" Matches multiple new lines and removes erroneous white space """ 1a

37 

38 

39def clean(recipe_data: Recipe | dict, translator: Translator, url=None) -> Recipe: 1a

40 """Main entrypoint to clean a recipe extracted from the web 

41 and format the data into an accectable format for the database 

42 

43 Args: 

44 recipe_data (dict): raw recipe or recipe dictionary 

45 

46 Returns: 

47 dict: cleaned recipe dictionary 

48 """ 

49 if not isinstance(recipe_data, dict): 

50 # format the recipe like a scraped dictionary 

51 recipe_data_dict = recipe_data.model_dump(by_alias=True) 

52 recipe_data_dict["recipeIngredient"] = [ing.display for ing in recipe_data.recipe_ingredient] 

53 

54 recipe_data = recipe_data_dict 

55 

56 recipe_data["slug"] = slugify(recipe_data.get("name", "")) 

57 recipe_data["description"] = clean_string(recipe_data.get("description", "")) 

58 

59 recipe_data["prepTime"] = clean_time(recipe_data.get("prepTime"), translator) 

60 recipe_data["performTime"] = clean_time(recipe_data.get("performTime"), translator) 

61 recipe_data["totalTime"] = clean_time(recipe_data.get("totalTime"), translator) 

62 

63 recipe_data["recipeServings"], recipe_data["recipeYieldQuantity"], recipe_data["recipeYield"] = clean_yield( 

64 recipe_data.get("recipeYield") 

65 ) 

66 recipe_data["recipeCategory"] = clean_categories(recipe_data.get("recipeCategory", [])) 

67 recipe_data["recipeIngredient"] = clean_ingredients(recipe_data.get("recipeIngredient", [])) 

68 recipe_data["recipeInstructions"] = clean_instructions(recipe_data.get("recipeInstructions", [])) 

69 

70 recipe_data["image"] = clean_image(recipe_data.get("image"))[0] 

71 recipe_data["orgURL"] = url or recipe_data.get("orgURL") 

72 recipe_data["notes"] = clean_notes(recipe_data.get("notes")) 

73 recipe_data["rating"] = clean_int(recipe_data.get("rating")) 

74 

75 return Recipe(**recipe_data) 

76 

77 

78def clean_string(text: str | list | int | float) -> str: 1a

79 """Cleans a string of HTML tags and extra white space""" 

80 if not isinstance(text, str): 

81 if isinstance(text, list): 

82 if text: 

83 return clean_string(text[0]) 

84 else: 

85 text = "" 

86 elif text is None: 

87 text = "" 

88 else: 

89 text = str(text) 

90 

91 if not text: 

92 return "" 

93 

94 text = typing.cast(str, text) # at this point we know text is a string 

95 

96 cleaned_text = html.unescape(text) 

97 cleaned_text = MATCH_HTML_TAGS.sub("", cleaned_text) 

98 cleaned_text = MATCH_MULTI_SPACE.sub(" ", cleaned_text) 

99 cleaned_text = MATCH_ERRONEOUS_WHITE_SPACE.sub("\n\n", cleaned_text) 

100 

101 cleaned_text = cleaned_text.replace("</p>", "\n").replace("\xa0", " ").replace("\t", " ").strip() 

102 return cleaned_text 

103 

104 

105def clean_image(image: str | list | dict | None = None, default: str = "no image") -> list[str]: 1a

106 """ 

107 image attempts to parse the image field from a recipe and return a string. Currenty 

108 

109 Supported Structures: 

110 - `https://example.com` - A string 

111 - `{ "url": "https://example.com" }` - A dictionary with a `url` key 

112 - `["https://example.com"]` - A list of strings 

113 - `[{ "url": "https://example.com" }]` - A list of dictionaries with a `url` key 

114 

115 Raises: 

116 TypeError: If the image field is not a supported type a TypeError is raised. 

117 

118 Returns: 

119 list[str]: list of urls, or [default] if input is empty 

120 """ 

121 if not image: 

122 return [default] 

123 

124 match image: 

125 case str(image): 

126 return [image] 

127 case [str(_), *_]: 

128 # Only return non-null strings in list 

129 return [x for x in image if x] 

130 case [{"url": str(_)}, *_]: 

131 return [x["url"] for x in image if "url" in x] 

132 case {"url": str(image)}: 

133 return [image] 

134 case [{"@id": str(_)}, *_]: 

135 return [x["@id"] for x in image if "@id" in x] 

136 case _: 

137 logger.exception(f"Unexpected type for image: {type(image)}, {image}") 

138 return [default] 

139 

140 

141def clean_instructions(steps_object: list | dict | str, default: list | None = None) -> list[dict]: 1a

142 """ 

143 instructions attempts to parse the instructions field from a recipe and return a list of 

144 dictionaries. See match statement for supported types and structures 

145 

146 Raises: 

147 TypeError: If the instructions field is not a supported type a TypeError is raised. 

148 

149 Returns: 

150 list[dict]: An ordered list of dictionaries with the keys `text` 

151 """ 

152 if not steps_object: 

153 return default or [] 

154 

155 match steps_object: 

156 case [{"text": str()}]: # Base Case 

157 return steps_object 

158 case [{"text": str()}, *_]: 

159 # The is the most common case. Most other operations eventually resolve to this 

160 # match case before being converted to a list of instructions 

161 # 

162 # [ 

163 # {"text": "Instruction A"}, 

164 # {"text": "Instruction B"}, 

165 # ] 

166 # 

167 return [ 

168 {"text": _sanitize_instruction_text(instruction["text"])} 

169 for instruction in steps_object 

170 if "text" in instruction and instruction["text"].strip() 

171 ] 

172 case {0: {"text": str()}} | {"0": {"text": str()}} | {1: {"text": str()}} | {"1": {"text": str()}}: 

173 # Some recipes have a dict with a string key representing the index, unsure if these can 

174 # be an int or not so we match against both. Additionally, we match against both 0 and 1 indexed 

175 # list like dicts. 

176 # 

177 # { 

178 # "0": {"text": "Instruction A"}, 

179 # "1": {"text": "Instruction B"}, 

180 # } 

181 # 

182 steps_object = typing.cast(dict, steps_object) 

183 return clean_instructions(list(steps_object.values())) 

184 case str(step_as_str): 

185 # Strings are weird, some sites return a single string with newlines 

186 # others returns a json string for some reasons 

187 # 

188 # "Instruction A\nInstruction B\nInstruction C" 

189 # '{"0": {"text": "Instruction A"}, "1": {"text": "Instruction B"}, "2": {"text": "Instruction C"}}' 

190 # 

191 if step_as_str.startswith("[") or step_as_str.startswith("{"): 

192 try: 

193 return clean_instructions(json.loads(step_as_str)) 

194 except json.JSONDecodeError: 

195 pass 

196 return [ 

197 {"text": _sanitize_instruction_text(instruction)} 

198 for instruction in step_as_str.splitlines() 

199 if instruction.strip() 

200 ] 

201 case [str(), *_]: 

202 # Assume list of strings is a valid list of instructions 

203 # 

204 # [ 

205 # "Instruction A", 

206 # "Instruction B", 

207 # ] 

208 # 

209 return [ 

210 {"text": _sanitize_instruction_text(instruction)} for instruction in steps_object if instruction.strip() 

211 ] 

212 case [{"@type": "HowToSection"}, *_] | [{"type": "HowToSection"}, *_]: 

213 # HowToSections should have the following layout, 

214 # { 

215 # "@type": "HowToSection", 

216 # "itemListElement": [ 

217 # { 

218 # "@type": "HowToStep", 

219 # "text": "Instruction A" 

220 # }, 

221 # } 

222 # 

223 steps_object = typing.cast(list[dict[str, str]], steps_object) 

224 return clean_instructions( 

225 functools.reduce( 

226 operator.concat, # type: ignore 

227 [x["itemListElement"] for x in steps_object], 

228 [], 

229 ) 

230 ) 

231 case _: 

232 raise TypeError(f"Unexpected type for instructions: {type(steps_object)}, {steps_object}") 

233 

234 

235def _sanitize_instruction_text(line: str | dict) -> str: 1a

236 """ 

237 _sanitize_instructions_text does some basic checking if the value is a string or dictionary 

238 and returns the value of the `text` key if it is a dictionary. The returned string is passed through the 

239 `clean_string` function to remove any html tags and extra whitespace in a loop until the string 

240 is stable. 

241 

242 Calling `clean_string` in a loop is necessary because some sites return a string with erroneously escaped 

243 html tags or markup. 

244 """ 

245 if isinstance(line, dict): 

246 # Some Recipes dotnot adhear to schema 

247 try: 

248 line = line["text"] 

249 except Exception: 

250 line = "" 

251 

252 if not line: 

253 return "" 

254 

255 line = typing.cast(str, line) 

256 clean_line = clean_string(line.strip()) 

257 

258 while not clean_line == (clean_line := clean_string(clean_line)): 

259 pass 

260 

261 return clean_line 

262 

263 

264def clean_ingredients(ingredients: list | str | None, default: list | None = None) -> list[str | dict]: 1a

265 """ 

266 ingredient attempts to parse the ingredients field from a recipe and return a list of 

267 

268 Supported Structures: 

269 - `["1 cup flour"]` - A list of strings 

270 - `"1 cup flour"` - A string 

271 - `None` - returns an empty list 

272 

273 Raises: 

274 TypeError: If the ingredients field is not a supported type a TypeError is raised. 

275 """ 

276 match ingredients: 

277 case None: 

278 return default or [] 

279 case list(ingredients): 

280 cleaned_ingredients: list[str | dict] = [] 

281 for ing in ingredients: 

282 if isinstance(ing, dict): 

283 cleaned_ingredients.append({clean_string(k): clean_string(v) for k, v in ing.items()}) 

284 else: 

285 cleaned_ingredients.append(clean_string(ing)) 

286 return cleaned_ingredients 

287 case [str()]: 

288 return [clean_string(ingredient) for ingredient in ingredients] 

289 case str(ingredients): 

290 return [clean_string(ingredient) for ingredient in ingredients.splitlines() if ingredient.strip()] 

291 case _: 

292 raise TypeError(f"Unexpected type for ingredients: {type(ingredients)}, {ingredients}") 

293 

294 

295def clean_int(val: str | int | None, min: int | None = None, max: int | None = None): 1a

296 if val is None or isinstance(val, int): 

297 return val 

298 

299 filtered_val = "".join(c for c in val if c.isnumeric()) 

300 if not filtered_val: 

301 return None 

302 

303 val = int(filtered_val) 

304 if min is None or max is None: 

305 return val 

306 

307 if not (min <= val <= max): 

308 return None 

309 

310 return val 

311 

312 

313def clean_notes(notes: typing.Any) -> list[dict] | None: 1a

314 if not isinstance(notes, list): 

315 return None 

316 

317 parsed_notes: list[dict] = [] 

318 for note in notes: 

319 if not isinstance(note, str | dict): 

320 continue 

321 

322 if isinstance(note, dict): 

323 if "text" not in note: 

324 continue 

325 

326 if "title" not in note: 

327 note["title"] = "" 

328 

329 parsed_notes.append(note) 

330 continue 

331 

332 parsed_notes.append({"title": "", "text": note}) 

333 

334 return parsed_notes 

335 

336 

337@functools.lru_cache 1a

338def _get_servings_options() -> set[str]: 1a

339 options: set[str] = set() 

340 for key in [ 

341 "recipe.servings-text.makes", 

342 "recipe.servings-text.serves", 

343 "recipe.servings-text.serving", 

344 "recipe.servings-text.servings", 

345 "recipe.servings-text.yield", 

346 "recipe.servings-text.yields", 

347 ]: 

348 options.update([t.strip().lower() for t in get_all_translations(key).values()]) 

349 

350 return options 

351 

352 

353def _is_serving_string(txt: str) -> bool: 1a

354 txt = txt.strip().lower() 

355 for option in _get_servings_options(): 

356 if option in txt.strip().lower(): 

357 return True 

358 return False 

359 

360 

361def clean_yield(yields: str | list[str] | None) -> tuple[float, float, str]: 1a

362 """ 

363 yield_amount attemps to parse out the yield amount from a recipe. 

364 

365 Supported Structures: 

366 - `"4 servings"` - returns the string unmodified 

367 - `["4 servings", "4 Pies"]` - returns the last value 

368 

369 Returns: 

370 float: The servings, if it can be parsed else 0 

371 float: The yield quantity, if it can be parsed else 0 

372 str: The yield amount, if it can be parsed else an empty string 

373 """ 

374 servings_qty: float = 0 

375 yld_qty: float = 0 

376 yld_str = "" 

377 

378 if not yields: 

379 return servings_qty, yld_qty, yld_str 

380 

381 if not isinstance(yields, list): 

382 yields = [yields] 

383 

384 for yld in yields: 

385 if not yld: 

386 continue 

387 if not isinstance(yld, str): 

388 yld = str(yld) 

389 

390 qty, txt = extract_quantity_from_string(yld) 

391 if qty and _is_serving_string(yld): 

392 servings_qty = qty 

393 else: 

394 yld_qty = qty 

395 yld_str = txt 

396 

397 return servings_qty, yld_qty, yld_str 

398 

399 

400def clean_time(time_entry: str | timedelta | int | float | None, translator: Translator) -> None | str: 1a

401 """_summary_ 

402 

403 Supported Structures: 

404 - `None` - returns None 

405 - `"PT1H"` - returns "1 hour" 

406 - `"PT1H30M"` - returns "1 hour 30 minutes" 

407 - `timedelta(hours=1, minutes=30)` - returns "1 hour 30 minutes" 

408 - `{"minValue": "PT1H30M"}` - returns "1 hour 30 minutes" 

409 - `30` - as a `int` or `float` assumed to be in minutes, returns "30 minutes" 

410 

411 Raises: 

412 TypeError: if the type is not supported a TypeError is raised 

413 

414 Returns: 

415 None | str: None if the time_entry is None, otherwise a string representing the time 

416 """ 

417 if not time_entry: 

418 return None 

419 

420 match time_entry: 

421 case numbers.Number(): 

422 # type checked by case statement 

423 time_delta = timedelta(minutes=time_entry) # type: ignore 

424 return pretty_print_timedelta(time_delta, translator) 

425 case str(time_entry): 

426 if not time_entry.strip(): 

427 return None 

428 

429 try: 

430 time_delta_instructionsect = parse_duration(time_entry) 

431 return pretty_print_timedelta(time_delta_instructionsect, translator) 

432 except ValueError: 

433 return str(time_entry) 

434 case timedelta(): 

435 return pretty_print_timedelta(time_entry, translator) 

436 case {"minValue": str(value)}: 

437 return clean_time(value, translator) 

438 case [str(), *_]: 

439 return clean_time(time_entry[0], translator) 

440 case datetime(): 

441 # TODO: Not sure what to do here 

442 return str(time_entry) 

443 case _: 

444 logger.warning( 

445 "[SCRAPER] Unexpected type(%s) or structure for variable time_entry: %s", type(time_entry), time_entry 

446 ) 

447 return None 

448 

449 

450def parse_duration(iso_duration: str) -> timedelta: 1a

451 """ 

452 Parses an ISO 8601 duration string into a datetime.timedelta instance. 

453 

454 Args: 

455 iso_duration: an ISO 8601 duration string. 

456 

457 Raises: 

458 ValueError: if the input string is not a valid ISO 8601 duration string. 

459 """ 

460 

461 m = MATCH_ISO_STR.match(iso_duration) 

462 

463 if m is None: 

464 raise ValueError("invalid ISO 8601 duration string") 

465 

466 # Years and months are not being utilized here, as there is not enough 

467 # information provided to determine which year and which month. 

468 # Python's time_delta class stores durations as days, seconds and 

469 # microseconds internally, and therefore we'd have to 

470 # convert parsed years and months to specific number of days. 

471 

472 times = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0} 

473 for unit in times.keys(): 

474 if m.group(unit): 

475 times[unit] = int(float(m.group(unit))) 

476 

477 return timedelta(**times) 

478 

479 

480def pretty_print_timedelta(t: timedelta, translator: Translator, max_components=None, max_decimal_places=2): 1a

481 """ 

482 Print a pretty string for a timedelta. 

483 For example datetime.timedelta(days=2, seconds=17280) will be printed as '2 days 4 Hours 48 Minutes'. 

484 Setting max_components to e.g. 1 will change this to '2.2 days', where the number of decimal 

485 points can also be set. 

486 """ 

487 time_scale_translation_keys_dict = { 

488 timedelta(days=365): "datetime.year", 

489 timedelta(days=1): "datetime.day", 

490 timedelta(hours=1): "datetime.hour", 

491 timedelta(minutes=1): "datetime.minute", 

492 timedelta(seconds=1): "datetime.second", 

493 timedelta(microseconds=1000): "datetime.millisecond", 

494 timedelta(microseconds=1): "datetime.microsecond", 

495 } 

496 count = 0 

497 out_list = [] 

498 for scale, scale_translation_key in time_scale_translation_keys_dict.items(): 

499 if t >= scale: 

500 count += 1 

501 n = t / scale if count == max_components else int(t / scale) 

502 t -= n * scale 

503 

504 n_txt = str(round(n, max_decimal_places)) 

505 if n_txt[-2:] == ".0": 

506 n_txt = n_txt[:-2] 

507 

508 scale_value = translator.t(scale_translation_key, count=n) 

509 out_list.append(f"{n_txt} {scale_value}") 

510 

511 if out_list == []: 

512 return "none" 

513 return " ".join(out_list) 

514 

515 

516def clean_categories(category: str | list) -> list[str]: 1a

517 if not category: 

518 return [] 

519 

520 match category: 

521 case str(category): 

522 if not category.strip(): 

523 return [] 

524 

525 return [category] 

526 case [str(), *_]: 

527 return [cat.strip().title() for cat in category if cat.strip()] 

528 case [{"name": str(), "slug": str()}, *_]: 

529 # Special case for when we use the cleaner to cleanup a migration. 

530 # 

531 # [ 

532 # { "name": "Dessert", "slug": "dessert"} 

533 # ] 

534 # 

535 return [cat["name"] for cat in category if "name" in cat] 

536 case _: 

537 raise TypeError(f"Unexpected type for category: {type(category)}, {category}") 

538 

539 

540def clean_tags(data: str | list[str]) -> list[str]: 1a

541 """ 

542 Gets keywords as a list or natural language list and returns 

543 them into a list of strings of individual tags 

544 """ 

545 if not data: 

546 return [] 

547 

548 match data: 

549 case [str(), *_]: 

550 return [tag.strip().title() for tag in data if tag.strip()] 

551 case str(data): 

552 return clean_tags(data.split(",")) 

553 case _: 

554 return [] 

555 # should probably raise exception 

556 # raise TypeError(f"Unexpected type for tags: {type(data)}, {data}") 

557 

558 

559def clean_nutrition(nutrition: dict | None) -> dict[str, str]: 1a

560 """ 

561 clean_nutrition takes a dictionary of nutrition information and cleans it up 

562 to be stored in the database. It will remove any keys that are not in the 

563 list of valid keys 

564 

565 Assumptionas: 

566 - All units are supplied in grams, expect sodium and cholesterol which maybe be in milligrams 

567 

568 Returns: 

569 dict[str, str]: If the argument is None, or not a dictionary, an empty dictionary is returned 

570 """ 

571 if not isinstance(nutrition, dict): 

572 return {} 

573 

574 output_nutrition = {} 

575 for key, val in nutrition.items(): 

576 with contextlib.suppress(AttributeError, TypeError): 

577 if matched_digits := MATCH_DIGITS.search(val): 

578 output_nutrition[key] = matched_digits.group(0).replace(",", ".") 

579 

580 for key in ["sodiumContent", "cholesterolContent"]: 

581 if val := nutrition.get(key, None): 

582 if isinstance(val, str) and "m" not in val and "g" in val: 

583 with contextlib.suppress(AttributeError, TypeError): 

584 output_nutrition[key] = str(float(output_nutrition[key]) * 1000) 

585 

586 for key in ["calories"]: 

587 if val := nutrition.get(key, None): 

588 if isinstance(val, int | float): 

589 with contextlib.suppress(AttributeError, TypeError): 

590 output_nutrition[key] = str(val) 

591 

592 return output_nutrition