Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/migrations/myrecipebox.py: 16%

86 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:32 +0000

1import asyncio 1a

2import csv 1a

3from pathlib import Path 1a

4from typing import Any 1a

5 

6from slugify import slugify 1a

7 

8from mealie.schema.recipe.recipe import Recipe 1a

9from mealie.services.migrations.utils.migration_alias import MigrationAlias 1a

10from mealie.services.scraper import cleaner 1a

11 

12from ._migration_base import BaseMigrator 1a

13from .utils.migration_helpers import scrape_image, split_by_line_break, split_by_semicolon 1a

14 

15nutrition_map = { 1a

16 "carbohydrate": "carbohydrateContent", 

17 "protein": "proteinContent", 

18 "fat": "fatContent", 

19 "saturatedfat": "saturatedFatContent", 

20 "transfat": "transFatContent", 

21 "sodium": "sodiumContent", 

22 "fiber": "fiberContent", 

23 "sugar": "sugarContent", 

24 "unsaturatedfat": "unsaturatedFatContent", 

25} 

26 

27 

28class MyRecipeBoxMigrator(BaseMigrator): 1a

29 def __init__(self, **kwargs): 1a

30 super().__init__(**kwargs) 

31 

32 self.name = "myrecipebox" 

33 

34 self.key_aliases = [ 

35 MigrationAlias(key="name", alias="title", func=None), 

36 MigrationAlias(key="prepTime", alias="preparationTime", func=self.parse_time), 

37 MigrationAlias(key="performTime", alias="cookingTime", func=self.parse_time), 

38 MigrationAlias(key="totalTime", alias="totalTime", func=self.parse_time), 

39 MigrationAlias(key="recipeYield", alias="quantity", func=str), 

40 MigrationAlias(key="recipeIngredient", alias="ingredients", func=None), 

41 MigrationAlias(key="recipeInstructions", alias="instructions", func=split_by_line_break), 

42 MigrationAlias(key="notes", alias="notes", func=split_by_line_break), 

43 MigrationAlias(key="nutrition", alias="nutrition", func=self.parse_nutrition), 

44 MigrationAlias(key="recipeCategory", alias="categories", func=split_by_semicolon), 

45 MigrationAlias(key="tags", alias="tags", func=split_by_semicolon), 

46 MigrationAlias(key="orgURL", alias="source", func=None), 

47 ] 

48 

49 def parse_time(self, time: Any) -> str | None: 1a

50 """Converts a time value to a string with minutes""" 

51 try: 

52 if not time: 

53 return None 

54 if not (isinstance(time, int) or isinstance(time, float) or isinstance(time, str)): 

55 time = str(time) 

56 

57 if isinstance(time, str): 

58 try: 

59 time = int(time) 

60 except ValueError: 

61 return time 

62 

63 unit = self.translator.t("datetime.minute", count=time) 

64 return f"{time} {unit}" 

65 except Exception: 

66 return None 

67 

68 def parse_nutrition(self, input_: Any) -> dict | None: 1a

69 if not input_ or not isinstance(input_, str): 

70 return None 

71 

72 nutrition = {} 

73 

74 vals = (x.strip() for x in input_.split("\n") if x) 

75 for val in vals: 

76 try: 

77 key, value = (x.strip() for x in val.split(":", maxsplit=1)) 

78 

79 if not (key and value): 

80 continue 

81 

82 key = nutrition_map.get(key.lower(), key) 

83 

84 except ValueError: 

85 continue 

86 

87 nutrition[key] = value 

88 

89 return cleaner.clean_nutrition(nutrition) if nutrition else None 

90 

91 def extract_rows(self, file: Path) -> list[dict]: 1a

92 """Extracts the rows from the CSV file and returns a list of dictionaries""" 

93 rows: list[dict] = [] 

94 with open(file, newline="", encoding="utf-8", errors="ignore") as f: 

95 reader = csv.DictReader(f) 

96 for row in reader: 

97 rows.append(row) 

98 

99 return rows 

100 

101 def pre_process_row(self, row: dict) -> dict: 1a

102 if not (video := row.get("video")): 

103 return row 

104 

105 # if there is no source, use the video as the source 

106 if not row.get("source"): 

107 row["source"] = video 

108 return row 

109 

110 # otherwise, add the video as a note 

111 notes = row.get("notes", "") 

112 if notes: 

113 notes = f"{notes}\n{video}" 

114 else: 

115 notes = video 

116 

117 row["notes"] = notes 

118 return row 

119 

120 def _migrate(self) -> None: 1a

121 recipe_image_urls: dict = {} 

122 

123 recipes: list[Recipe] = [] 

124 for row in self.extract_rows(self.archive): 

125 recipe_dict = self.pre_process_row(row) 

126 if (title := recipe_dict.get("title")) and (image_url := recipe_dict.get("originalPicture")): 

127 try: 

128 slug = slugify(title) 

129 recipe_image_urls[slug] = image_url 

130 except Exception: 

131 pass 

132 

133 recipe_model = self.clean_recipe_dictionary(recipe_dict) 

134 recipes.append(recipe_model) 

135 

136 results = self.import_recipes_to_database(recipes) 

137 for slug, recipe_id, status in results: 

138 if not status or not (recipe_image_url := recipe_image_urls.get(slug)): 

139 continue 

140 

141 try: 

142 asyncio.run(scrape_image(recipe_image_url, recipe_id)) 

143 except Exception as e: 

144 self.logger.error(f"Failed to download image for {slug}: {e}")