Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/migrations/utils/migration_helpers.py: 17%

96 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1import json 1a

2from datetime import timedelta 1a

3from pathlib import Path 1a

4from typing import cast 1a

5 

6import isodate 1a

7import yaml 1a

8from PIL import UnidentifiedImageError 1a

9from pydantic import UUID4 1a

10 

11from mealie.services.recipe.recipe_data_service import RecipeDataService 1a

12 

13 

14class MigrationReaders: 1a

15 @staticmethod 1a

16 def json(json_file: Path) -> dict: 1a

17 with open(json_file) as f: 

18 return json.loads(f.read()) 

19 

20 @staticmethod 1a

21 def yaml(yaml_file: Path) -> dict: 1a

22 """A helper function to read in a yaml file from a Path. This assumes that the 

23 first yaml document is the recipe data and the second, if exists, is the description. 

24 

25 Args: 

26 yaml_file (Path): Path to yaml file 

27 

28 Returns: 

29 dict: representing the yaml file as a dictionary 

30 """ 

31 with open(yaml_file) as f: 

32 contents = f.read().split("---") 

33 recipe_data = {} 

34 for document in contents: 

35 # Check if None or Empty String 

36 if document is None or document == "": 

37 continue 

38 

39 # Check if 'title:' present 

40 elif "title:" in document: 

41 recipe_data.update(yaml.safe_load(document)) 

42 

43 else: 

44 recipe_data["description"] = document 

45 

46 return recipe_data 

47 

48 

49def split_by_comma(tag_string: str): 1a

50 """Splits a single string by ',' performs a line strip and then title cases the resulting string 

51 

52 Args: 

53 tag_string (str): [description] 

54 

55 Returns: 

56 [type]: [description] 

57 """ 

58 if not isinstance(tag_string, str): 

59 return None 

60 return [x.title().lstrip() for x in tag_string.split(",") if x != ""] 

61 

62 

63def split_by_semicolon(input: str): 1a

64 """Splits a single string by ';', performs a line strip removes empty strings""" 

65 

66 if not isinstance(input, str): 

67 return None 

68 return [x.strip() for x in input.split(";") if x] 

69 

70 

71def split_by_line_break(input: str): 1a

72 """Splits a single string by line break, performs a line strip removes empty strings""" 

73 if not isinstance(input, str): 

74 return None 

75 return [x.strip() for x in input.split("\n") if x] 

76 

77 

78def glob_walker(directory: Path, glob_str: str, return_parent=True) -> list[Path]: # TODO: 1a

79 """A Helper function that will return the glob matches for the temporary directotry 

80 that was unpacked and passed in as the `directory` parameter. If `return_parent` is 

81 True the return Paths will be the parent directory for the file that was matched. If 

82 false the file itself will be returned. 

83 

84 Args: 

85 directory (Path): Path to search directory 

86 glob_str ([type]): glob style match string 

87 return_parent (bool, optional): To return parent directory of match. Defaults to True. 

88 

89 Returns: 

90 list[Path]: 

91 """ 

92 directory = directory if isinstance(directory, Path) else Path(directory) 

93 matches = [] 

94 for match in directory.glob(glob_str): 

95 if return_parent: 

96 matches.append(match.parent) 

97 else: 

98 matches.append(match) 

99 

100 return matches 

101 

102 

103def import_image(src: str | Path, recipe_id: UUID4): 1a

104 """Read the successful migrations attribute and for each import the image 

105 appropriately into the image directory. Minification is done in mass 

106 after the migration occurs. 

107 May raise an UnidentifiedImageError if the file is not a recognised format. 

108 """ 

109 

110 if isinstance(src, str): 

111 src = Path(src) 

112 

113 if not src.exists(): 

114 return 

115 

116 data_service = RecipeDataService(recipe_id=recipe_id) 

117 data_service.write_image(src, src.suffix) 

118 

119 

120async def scrape_image(image_url: str, recipe_id: UUID4): 1a

121 """Read the successful migrations attribute and for each scrape the image 

122 appropriately into the image directory. Minification is done in mass 

123 after the migration occurs. 

124 """ 

125 

126 if not isinstance(image_url, str): 

127 return 

128 

129 data_service = RecipeDataService(recipe_id=recipe_id) 

130 

131 try: 

132 await data_service.scrape_image(image_url) 

133 except UnidentifiedImageError: 

134 return 

135 

136 

137def parse_iso8601_duration(time: str | None) -> str: 1a

138 """ 

139 Parses an ISO8601 duration string 

140 

141 https://en.wikipedia.org/wiki/ISO_8601#Durations 

142 """ 

143 

144 if not time: 

145 return "" 

146 if time[0] == "P": 

147 try: 

148 delta = isodate.parse_duration(time) 

149 if not isinstance(delta, timedelta): 

150 return time 

151 except isodate.ISO8601Error: 

152 return time 

153 

154 # TODO: make singular and plural translatable 

155 time_part_map: dict[str, dict] = { 

156 "days": {"singular": "day", "plural": "days"}, 

157 "hours": {"singular": "hour", "plural": "hours"}, 

158 "minutes": {"singular": "minute", "plural": "minutes"}, 

159 "seconds": {"singular": "second", "plural": "seconds"}, 

160 } 

161 

162 delta = cast(timedelta, delta) 

163 time_part_map["days"]["value"] = delta.days 

164 time_part_map["hours"]["value"] = delta.seconds // 3600 

165 time_part_map["minutes"]["value"] = (delta.seconds // 60) % 60 

166 time_part_map["seconds"]["value"] = delta.seconds % 60 

167 

168 return_strings: list[str] = [] 

169 for value_map in time_part_map.values(): 

170 if not (value := value_map["value"]): 

171 continue 

172 

173 unit_key = "singular" if value == 1 else "plural" 

174 return_strings.append(f"{value} {value_map[unit_key]}") 

175 

176 return " ".join(return_strings) if return_strings else time 

177 

178 

179def format_time(minutes: int) -> str: 1a

180 # TODO: make this translatable 

181 hour_label = "hour" 

182 hours_label = "hours" 

183 minute_label = "minute" 

184 minutes_label = "minutes" 

185 

186 hours, minutes = divmod(minutes, 60) 

187 parts: list[str] = [] 

188 

189 if hours: 

190 parts.append(f"{int(hours)} {hour_label if hours == 1 else hours_label}") 

191 if minutes: 

192 parts.append(f"{minutes} {minute_label if minutes == 1 else minutes_label}") 

193 

194 return " ".join(parts)