Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/migrations/plantoeat.py: 22%

77 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:32 +0000

1import asyncio 1a

2import csv 1a

3import tempfile 1a

4import zipfile 1a

5from pathlib import Path 1a

6 

7from slugify import slugify 1a

8 

9from mealie.pkgs.cache import cache_key 1a

10from mealie.services.scraper import cleaner 1a

11 

12from ._migration_base import BaseMigrator 1a

13from .utils.migration_alias import MigrationAlias 1a

14from .utils.migration_helpers import scrape_image, split_by_comma 1a

15 

16 

17def plantoeat_recipes(file: Path): 1a

18 """Yields all recipes inside the export file as dict""" 

19 with tempfile.TemporaryDirectory() as tmpdir: 

20 with zipfile.ZipFile(file) as zip_file: 

21 zip_file.extractall(tmpdir) 

22 

23 for name in Path(tmpdir).glob("**/[!.]*.csv"): 

24 with open(name, newline="") as csvfile: 

25 reader = csv.DictReader(csvfile) 

26 yield from reader 

27 

28 

29def get_value_as_string_or_none(dictionary: dict, key: str): 1a

30 value = dictionary.get(key) 

31 if value is not None: 

32 try: 

33 return str(value) 

34 except Exception: 

35 return None 

36 else: 

37 return None 

38 

39 

40nutrition_map = { 1a

41 "Calories": "calories", 

42 "Fat": "fatContent", 

43 "Saturated Fat": "saturatedFatContent", 

44 "Cholesterol": "cholesterolContent", 

45 "Sodium": "sodiumContent", 

46 "Sugar": "sugarContent", 

47 "Carbohydrate": "carbohydrateContent", 

48 "Fiber": "fiberContent", 

49 "Protein": "proteinContent", 

50} 

51 

52 

53class PlanToEatMigrator(BaseMigrator): 1a

54 def __init__(self, **kwargs): 1a

55 super().__init__(**kwargs) 

56 

57 self.name = "plantoeat" 

58 

59 self.key_aliases = [ 

60 MigrationAlias(key="name", alias="Title"), 

61 MigrationAlias(key="description", alias="Description"), 

62 MigrationAlias( 

63 key="recipeIngredient", 

64 alias="Ingredients", 

65 func=lambda x: [z for z in x.splitlines() if z.strip() and not z.startswith(", ")], 

66 ), 

67 MigrationAlias(key="recipeInstructions", alias="Directions"), 

68 MigrationAlias(key="recipeYield", alias="Servings"), 

69 MigrationAlias(key="orgURL", alias="Url"), 

70 MigrationAlias(key="rating", alias="Rating"), 

71 MigrationAlias(key="prepTime", alias="Prep Time"), 

72 MigrationAlias(key="performTime", alias="Cook Time"), 

73 MigrationAlias(key="totalTime", alias="Total Time"), 

74 MigrationAlias(key="dateAdded", alias="Created At", func=lambda x: x[: x.find(" ")]), 

75 ] 

76 

77 def _parse_recipe_nutrition_from_row(self, row: dict) -> dict: 1a

78 """Parses the nutrition data from the row""" 

79 nut_dict = {normalized_k: row[k] for k, normalized_k in nutrition_map.items() if k in row} 

80 

81 return cleaner.clean_nutrition(nut_dict) 

82 

83 def _get_categories_from_row(self, row: dict) -> list[str]: 1a

84 """Parses various category-like columns into categories""" 

85 

86 categories: list[str] = [] 

87 columns = ["Course", "Cuisine"] 

88 for column in columns: 

89 value = get_value_as_string_or_none(row, column) 

90 if value: 

91 categories.append(value) 

92 

93 return categories 

94 

95 def _get_tags_from_row(self, row: dict) -> list[str]: 1a

96 tag_str = get_value_as_string_or_none(row, "Tags") 

97 tags = split_by_comma(tag_str) or [] 

98 main_ingredient = get_value_as_string_or_none(row, "Main Ingredient") 

99 if main_ingredient: 

100 tags.append(main_ingredient) 

101 

102 return tags 

103 

104 def _process_recipe_row(self, row: dict) -> dict: 1a

105 """Reads a single recipe's row, merges columns, and converts the row to a dictionary""" 

106 

107 recipe_dict: dict = row 

108 

109 recipe_dict["recipeCategory"] = self._get_categories_from_row(row) 

110 recipe_dict["tags"] = self._get_tags_from_row(row) 

111 recipe_dict["nutrition"] = self._parse_recipe_nutrition_from_row(row) 

112 

113 return recipe_dict 

114 

115 def _migrate(self) -> None: 1a

116 recipe_image_urls = {} 

117 

118 recipes = [] 

119 for recipe in plantoeat_recipes(self.archive): 

120 if "Title" not in recipe: 

121 continue 

122 

123 if "Photo Url" in recipe: 

124 recipe_image_urls[slugify(recipe["Title"])] = recipe["Photo Url"] 

125 recipe["image"] = cache_key.new_key(4) 

126 

127 preprocess_recipe = self._process_recipe_row(recipe) 

128 

129 recipe_model = self.clean_recipe_dictionary(preprocess_recipe) 

130 

131 recipes.append(recipe_model) 

132 

133 results = self.import_recipes_to_database(recipes) 

134 

135 for slug, recipe_id, status in results: 

136 if not status: 

137 continue 

138 

139 try: 

140 asyncio.run(scrape_image(recipe_image_urls[slug], recipe_id)) 

141 except Exception as e: 

142 self.logger.error(f"Failed to download image for {slug}: {e}")