Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scraper/cleaner.py: 10%
288 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1import contextlib 1a
2import functools 1a
3import html 1a
4import json 1a
5import numbers 1a
6import operator 1a
7import re 1a
8import typing 1a
9from datetime import datetime, timedelta 1a
11from slugify import slugify 1a
13from mealie.core.root_logger import get_logger 1a
14from mealie.lang.providers import Translator, get_all_translations 1a
15from mealie.schema.recipe.recipe import Recipe 1a
16from mealie.services.parser_services.parser_utils import extract_quantity_from_string 1a
18logger = get_logger("recipe-scraper") 1a
21MATCH_DIGITS = re.compile(r"\d+([.,]\d+)?") 1a
22""" Allow for commas as decimals (common in Europe) """ 1a
24MATCH_ISO_STR = re.compile( 1a
25 r"^P((\d+)Y)?((\d+)M)?((?P<days>\d+)D)?" r"T((?P<hours>\d+)H)?((?P<minutes>\d+)M)?((?P<seconds>\d+(?:\.\d+)?)S)?$",
26)
27""" Match Duration Strings """ 1a
29MATCH_HTML_TAGS = re.compile(r"<[^<]+?>") 1a
30""" Matches HTML tags `<p>Text</p>` -> `Text` """ 1a
32MATCH_MULTI_SPACE = re.compile(r" +") 1a
33""" Matches multiple spaces `Hello World` -> `Hello World` """ 1a
35MATCH_ERRONEOUS_WHITE_SPACE = re.compile(r"\n\s*\n") 1a
36""" Matches multiple new lines and removes erroneous white space """ 1a
39def clean(recipe_data: Recipe | dict, translator: Translator, url=None) -> Recipe: 1a
40 """Main entrypoint to clean a recipe extracted from the web
41 and format the data into an accectable format for the database
43 Args:
44 recipe_data (dict): raw recipe or recipe dictionary
46 Returns:
47 dict: cleaned recipe dictionary
48 """
49 if not isinstance(recipe_data, dict):
50 # format the recipe like a scraped dictionary
51 recipe_data_dict = recipe_data.model_dump(by_alias=True)
52 recipe_data_dict["recipeIngredient"] = [ing.display for ing in recipe_data.recipe_ingredient]
54 recipe_data = recipe_data_dict
56 recipe_data["slug"] = slugify(recipe_data.get("name", ""))
57 recipe_data["description"] = clean_string(recipe_data.get("description", ""))
59 recipe_data["prepTime"] = clean_time(recipe_data.get("prepTime"), translator)
60 recipe_data["performTime"] = clean_time(recipe_data.get("performTime"), translator)
61 recipe_data["totalTime"] = clean_time(recipe_data.get("totalTime"), translator)
63 recipe_data["recipeServings"], recipe_data["recipeYieldQuantity"], recipe_data["recipeYield"] = clean_yield(
64 recipe_data.get("recipeYield")
65 )
66 recipe_data["recipeCategory"] = clean_categories(recipe_data.get("recipeCategory", []))
67 recipe_data["recipeIngredient"] = clean_ingredients(recipe_data.get("recipeIngredient", []))
68 recipe_data["recipeInstructions"] = clean_instructions(recipe_data.get("recipeInstructions", []))
70 recipe_data["image"] = clean_image(recipe_data.get("image"))[0]
71 recipe_data["orgURL"] = url or recipe_data.get("orgURL")
72 recipe_data["notes"] = clean_notes(recipe_data.get("notes"))
73 recipe_data["rating"] = clean_int(recipe_data.get("rating"))
75 return Recipe(**recipe_data)
78def clean_string(text: str | list | int | float) -> str: 1a
79 """Cleans a string of HTML tags and extra white space"""
80 if not isinstance(text, str):
81 if isinstance(text, list):
82 if text:
83 return clean_string(text[0])
84 else:
85 text = ""
86 elif text is None:
87 text = ""
88 else:
89 text = str(text)
91 if not text:
92 return ""
94 text = typing.cast(str, text) # at this point we know text is a string
96 cleaned_text = html.unescape(text)
97 cleaned_text = MATCH_HTML_TAGS.sub("", cleaned_text)
98 cleaned_text = MATCH_MULTI_SPACE.sub(" ", cleaned_text)
99 cleaned_text = MATCH_ERRONEOUS_WHITE_SPACE.sub("\n\n", cleaned_text)
101 cleaned_text = cleaned_text.replace("</p>", "\n").replace("\xa0", " ").replace("\t", " ").strip()
102 return cleaned_text
105def clean_image(image: str | list | dict | None = None, default: str = "no image") -> list[str]: 1a
106 """
107 image attempts to parse the image field from a recipe and return a string. Currenty
109 Supported Structures:
110 - `https://example.com` - A string
111 - `{ "url": "https://example.com" }` - A dictionary with a `url` key
112 - `["https://example.com"]` - A list of strings
113 - `[{ "url": "https://example.com" }]` - A list of dictionaries with a `url` key
115 Raises:
116 TypeError: If the image field is not a supported type a TypeError is raised.
118 Returns:
119 list[str]: list of urls, or [default] if input is empty
120 """
121 if not image:
122 return [default]
124 match image:
125 case str(image):
126 return [image]
127 case [str(_), *_]:
128 # Only return non-null strings in list
129 return [x for x in image if x]
130 case [{"url": str(_)}, *_]:
131 return [x["url"] for x in image if "url" in x]
132 case {"url": str(image)}:
133 return [image]
134 case [{"@id": str(_)}, *_]:
135 return [x["@id"] for x in image if "@id" in x]
136 case _:
137 logger.exception(f"Unexpected type for image: {type(image)}, {image}")
138 return [default]
141def clean_instructions(steps_object: list | dict | str, default: list | None = None) -> list[dict]: 1a
142 """
143 instructions attempts to parse the instructions field from a recipe and return a list of
144 dictionaries. See match statement for supported types and structures
146 Raises:
147 TypeError: If the instructions field is not a supported type a TypeError is raised.
149 Returns:
150 list[dict]: An ordered list of dictionaries with the keys `text`
151 """
152 if not steps_object:
153 return default or []
155 match steps_object:
156 case [{"text": str()}]: # Base Case
157 return steps_object
158 case [{"text": str()}, *_]:
159 # The is the most common case. Most other operations eventually resolve to this
160 # match case before being converted to a list of instructions
161 #
162 # [
163 # {"text": "Instruction A"},
164 # {"text": "Instruction B"},
165 # ]
166 #
167 return [
168 {"text": _sanitize_instruction_text(instruction["text"])}
169 for instruction in steps_object
170 if "text" in instruction and instruction["text"].strip()
171 ]
172 case {0: {"text": str()}} | {"0": {"text": str()}} | {1: {"text": str()}} | {"1": {"text": str()}}:
173 # Some recipes have a dict with a string key representing the index, unsure if these can
174 # be an int or not so we match against both. Additionally, we match against both 0 and 1 indexed
175 # list like dicts.
176 #
177 # {
178 # "0": {"text": "Instruction A"},
179 # "1": {"text": "Instruction B"},
180 # }
181 #
182 steps_object = typing.cast(dict, steps_object)
183 return clean_instructions(list(steps_object.values()))
184 case str(step_as_str):
185 # Strings are weird, some sites return a single string with newlines
186 # others returns a json string for some reasons
187 #
188 # "Instruction A\nInstruction B\nInstruction C"
189 # '{"0": {"text": "Instruction A"}, "1": {"text": "Instruction B"}, "2": {"text": "Instruction C"}}'
190 #
191 if step_as_str.startswith("[") or step_as_str.startswith("{"):
192 try:
193 return clean_instructions(json.loads(step_as_str))
194 except json.JSONDecodeError:
195 pass
196 return [
197 {"text": _sanitize_instruction_text(instruction)}
198 for instruction in step_as_str.splitlines()
199 if instruction.strip()
200 ]
201 case [str(), *_]:
202 # Assume list of strings is a valid list of instructions
203 #
204 # [
205 # "Instruction A",
206 # "Instruction B",
207 # ]
208 #
209 return [
210 {"text": _sanitize_instruction_text(instruction)} for instruction in steps_object if instruction.strip()
211 ]
212 case [{"@type": "HowToSection"}, *_] | [{"type": "HowToSection"}, *_]:
213 # HowToSections should have the following layout,
214 # {
215 # "@type": "HowToSection",
216 # "itemListElement": [
217 # {
218 # "@type": "HowToStep",
219 # "text": "Instruction A"
220 # },
221 # }
222 #
223 steps_object = typing.cast(list[dict[str, str]], steps_object)
224 return clean_instructions(
225 functools.reduce(
226 operator.concat, # type: ignore
227 [x["itemListElement"] for x in steps_object],
228 [],
229 )
230 )
231 case _:
232 raise TypeError(f"Unexpected type for instructions: {type(steps_object)}, {steps_object}")
235def _sanitize_instruction_text(line: str | dict) -> str: 1a
236 """
237 _sanitize_instructions_text does some basic checking if the value is a string or dictionary
238 and returns the value of the `text` key if it is a dictionary. The returned string is passed through the
239 `clean_string` function to remove any html tags and extra whitespace in a loop until the string
240 is stable.
242 Calling `clean_string` in a loop is necessary because some sites return a string with erroneously escaped
243 html tags or markup.
244 """
245 if isinstance(line, dict):
246 # Some Recipes dotnot adhear to schema
247 try:
248 line = line["text"]
249 except Exception:
250 line = ""
252 if not line:
253 return ""
255 line = typing.cast(str, line)
256 clean_line = clean_string(line.strip())
258 while not clean_line == (clean_line := clean_string(clean_line)):
259 pass
261 return clean_line
264def clean_ingredients(ingredients: list | str | None, default: list | None = None) -> list[str | dict]: 1a
265 """
266 ingredient attempts to parse the ingredients field from a recipe and return a list of
268 Supported Structures:
269 - `["1 cup flour"]` - A list of strings
270 - `"1 cup flour"` - A string
271 - `None` - returns an empty list
273 Raises:
274 TypeError: If the ingredients field is not a supported type a TypeError is raised.
275 """
276 match ingredients:
277 case None:
278 return default or []
279 case list(ingredients):
280 cleaned_ingredients: list[str | dict] = []
281 for ing in ingredients:
282 if isinstance(ing, dict):
283 cleaned_ingredients.append({clean_string(k): clean_string(v) for k, v in ing.items()})
284 else:
285 cleaned_ingredients.append(clean_string(ing))
286 return cleaned_ingredients
287 case [str()]:
288 return [clean_string(ingredient) for ingredient in ingredients]
289 case str(ingredients):
290 return [clean_string(ingredient) for ingredient in ingredients.splitlines() if ingredient.strip()]
291 case _:
292 raise TypeError(f"Unexpected type for ingredients: {type(ingredients)}, {ingredients}")
295def clean_int(val: str | int | None, min: int | None = None, max: int | None = None): 1a
296 if val is None or isinstance(val, int):
297 return val
299 filtered_val = "".join(c for c in val if c.isnumeric())
300 if not filtered_val:
301 return None
303 val = int(filtered_val)
304 if min is None or max is None:
305 return val
307 if not (min <= val <= max):
308 return None
310 return val
313def clean_notes(notes: typing.Any) -> list[dict] | None: 1a
314 if not isinstance(notes, list):
315 return None
317 parsed_notes: list[dict] = []
318 for note in notes:
319 if not isinstance(note, str | dict):
320 continue
322 if isinstance(note, dict):
323 if "text" not in note:
324 continue
326 if "title" not in note:
327 note["title"] = ""
329 parsed_notes.append(note)
330 continue
332 parsed_notes.append({"title": "", "text": note})
334 return parsed_notes
337@functools.lru_cache 1a
338def _get_servings_options() -> set[str]: 1a
339 options: set[str] = set()
340 for key in [
341 "recipe.servings-text.makes",
342 "recipe.servings-text.serves",
343 "recipe.servings-text.serving",
344 "recipe.servings-text.servings",
345 "recipe.servings-text.yield",
346 "recipe.servings-text.yields",
347 ]:
348 options.update([t.strip().lower() for t in get_all_translations(key).values()])
350 return options
353def _is_serving_string(txt: str) -> bool: 1a
354 txt = txt.strip().lower()
355 for option in _get_servings_options():
356 if option in txt.strip().lower():
357 return True
358 return False
361def clean_yield(yields: str | list[str] | None) -> tuple[float, float, str]: 1a
362 """
363 yield_amount attemps to parse out the yield amount from a recipe.
365 Supported Structures:
366 - `"4 servings"` - returns the string unmodified
367 - `["4 servings", "4 Pies"]` - returns the last value
369 Returns:
370 float: The servings, if it can be parsed else 0
371 float: The yield quantity, if it can be parsed else 0
372 str: The yield amount, if it can be parsed else an empty string
373 """
374 servings_qty: float = 0
375 yld_qty: float = 0
376 yld_str = ""
378 if not yields:
379 return servings_qty, yld_qty, yld_str
381 if not isinstance(yields, list):
382 yields = [yields]
384 for yld in yields:
385 if not yld:
386 continue
387 if not isinstance(yld, str):
388 yld = str(yld)
390 qty, txt = extract_quantity_from_string(yld)
391 if qty and _is_serving_string(yld):
392 servings_qty = qty
393 else:
394 yld_qty = qty
395 yld_str = txt
397 return servings_qty, yld_qty, yld_str
400def clean_time(time_entry: str | timedelta | int | float | None, translator: Translator) -> None | str: 1a
401 """_summary_
403 Supported Structures:
404 - `None` - returns None
405 - `"PT1H"` - returns "1 hour"
406 - `"PT1H30M"` - returns "1 hour 30 minutes"
407 - `timedelta(hours=1, minutes=30)` - returns "1 hour 30 minutes"
408 - `{"minValue": "PT1H30M"}` - returns "1 hour 30 minutes"
409 - `30` - as a `int` or `float` assumed to be in minutes, returns "30 minutes"
411 Raises:
412 TypeError: if the type is not supported a TypeError is raised
414 Returns:
415 None | str: None if the time_entry is None, otherwise a string representing the time
416 """
417 if not time_entry:
418 return None
420 match time_entry:
421 case numbers.Number():
422 # type checked by case statement
423 time_delta = timedelta(minutes=time_entry) # type: ignore
424 return pretty_print_timedelta(time_delta, translator)
425 case str(time_entry):
426 if not time_entry.strip():
427 return None
429 try:
430 time_delta_instructionsect = parse_duration(time_entry)
431 return pretty_print_timedelta(time_delta_instructionsect, translator)
432 except ValueError:
433 return str(time_entry)
434 case timedelta():
435 return pretty_print_timedelta(time_entry, translator)
436 case {"minValue": str(value)}:
437 return clean_time(value, translator)
438 case [str(), *_]:
439 return clean_time(time_entry[0], translator)
440 case datetime():
441 # TODO: Not sure what to do here
442 return str(time_entry)
443 case _:
444 logger.warning(
445 "[SCRAPER] Unexpected type(%s) or structure for variable time_entry: %s", type(time_entry), time_entry
446 )
447 return None
450def parse_duration(iso_duration: str) -> timedelta: 1a
451 """
452 Parses an ISO 8601 duration string into a datetime.timedelta instance.
454 Args:
455 iso_duration: an ISO 8601 duration string.
457 Raises:
458 ValueError: if the input string is not a valid ISO 8601 duration string.
459 """
461 m = MATCH_ISO_STR.match(iso_duration)
463 if m is None:
464 raise ValueError("invalid ISO 8601 duration string")
466 # Years and months are not being utilized here, as there is not enough
467 # information provided to determine which year and which month.
468 # Python's time_delta class stores durations as days, seconds and
469 # microseconds internally, and therefore we'd have to
470 # convert parsed years and months to specific number of days.
472 times = {"days": 0, "hours": 0, "minutes": 0, "seconds": 0}
473 for unit in times.keys():
474 if m.group(unit):
475 times[unit] = int(float(m.group(unit)))
477 return timedelta(**times)
480def pretty_print_timedelta(t: timedelta, translator: Translator, max_components=None, max_decimal_places=2): 1a
481 """
482 Print a pretty string for a timedelta.
483 For example datetime.timedelta(days=2, seconds=17280) will be printed as '2 days 4 Hours 48 Minutes'.
484 Setting max_components to e.g. 1 will change this to '2.2 days', where the number of decimal
485 points can also be set.
486 """
487 time_scale_translation_keys_dict = {
488 timedelta(days=365): "datetime.year",
489 timedelta(days=1): "datetime.day",
490 timedelta(hours=1): "datetime.hour",
491 timedelta(minutes=1): "datetime.minute",
492 timedelta(seconds=1): "datetime.second",
493 timedelta(microseconds=1000): "datetime.millisecond",
494 timedelta(microseconds=1): "datetime.microsecond",
495 }
496 count = 0
497 out_list = []
498 for scale, scale_translation_key in time_scale_translation_keys_dict.items():
499 if t >= scale:
500 count += 1
501 n = t / scale if count == max_components else int(t / scale)
502 t -= n * scale
504 n_txt = str(round(n, max_decimal_places))
505 if n_txt[-2:] == ".0":
506 n_txt = n_txt[:-2]
508 scale_value = translator.t(scale_translation_key, count=n)
509 out_list.append(f"{n_txt} {scale_value}")
511 if out_list == []:
512 return "none"
513 return " ".join(out_list)
516def clean_categories(category: str | list) -> list[str]: 1a
517 if not category:
518 return []
520 match category:
521 case str(category):
522 if not category.strip():
523 return []
525 return [category]
526 case [str(), *_]:
527 return [cat.strip().title() for cat in category if cat.strip()]
528 case [{"name": str(), "slug": str()}, *_]:
529 # Special case for when we use the cleaner to cleanup a migration.
530 #
531 # [
532 # { "name": "Dessert", "slug": "dessert"}
533 # ]
534 #
535 return [cat["name"] for cat in category if "name" in cat]
536 case _:
537 raise TypeError(f"Unexpected type for category: {type(category)}, {category}")
540def clean_tags(data: str | list[str]) -> list[str]: 1a
541 """
542 Gets keywords as a list or natural language list and returns
543 them into a list of strings of individual tags
544 """
545 if not data:
546 return []
548 match data:
549 case [str(), *_]:
550 return [tag.strip().title() for tag in data if tag.strip()]
551 case str(data):
552 return clean_tags(data.split(","))
553 case _:
554 return []
555 # should probably raise exception
556 # raise TypeError(f"Unexpected type for tags: {type(data)}, {data}")
559def clean_nutrition(nutrition: dict | None) -> dict[str, str]: 1a
560 """
561 clean_nutrition takes a dictionary of nutrition information and cleans it up
562 to be stored in the database. It will remove any keys that are not in the
563 list of valid keys
565 Assumptionas:
566 - All units are supplied in grams, expect sodium and cholesterol which maybe be in milligrams
568 Returns:
569 dict[str, str]: If the argument is None, or not a dictionary, an empty dictionary is returned
570 """
571 if not isinstance(nutrition, dict):
572 return {}
574 output_nutrition = {}
575 for key, val in nutrition.items():
576 with contextlib.suppress(AttributeError, TypeError):
577 if matched_digits := MATCH_DIGITS.search(val):
578 output_nutrition[key] = matched_digits.group(0).replace(",", ".")
580 for key in ["sodiumContent", "cholesterolContent"]:
581 if val := nutrition.get(key, None):
582 if isinstance(val, str) and "m" not in val and "g" in val:
583 with contextlib.suppress(AttributeError, TypeError):
584 output_nutrition[key] = str(float(output_nutrition[key]) * 1000)
586 for key in ["calories"]:
587 if val := nutrition.get(key, None):
588 if isinstance(val, int | float):
589 with contextlib.suppress(AttributeError, TypeError):
590 output_nutrition[key] = str(val)
592 return output_nutrition