Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/migrations/utils/migration_helpers.py: 17%
96 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
1import json 1a
2from datetime import timedelta 1a
3from pathlib import Path 1a
4from typing import cast 1a
6import isodate 1a
7import yaml 1a
8from PIL import UnidentifiedImageError 1a
9from pydantic import UUID4 1a
11from mealie.services.recipe.recipe_data_service import RecipeDataService 1a
14class MigrationReaders: 1a
15 @staticmethod 1a
16 def json(json_file: Path) -> dict: 1a
17 with open(json_file) as f:
18 return json.loads(f.read())
20 @staticmethod 1a
21 def yaml(yaml_file: Path) -> dict: 1a
22 """A helper function to read in a yaml file from a Path. This assumes that the
23 first yaml document is the recipe data and the second, if exists, is the description.
25 Args:
26 yaml_file (Path): Path to yaml file
28 Returns:
29 dict: representing the yaml file as a dictionary
30 """
31 with open(yaml_file) as f:
32 contents = f.read().split("---")
33 recipe_data = {}
34 for document in contents:
35 # Check if None or Empty String
36 if document is None or document == "":
37 continue
39 # Check if 'title:' present
40 elif "title:" in document:
41 recipe_data.update(yaml.safe_load(document))
43 else:
44 recipe_data["description"] = document
46 return recipe_data
49def split_by_comma(tag_string: str): 1a
50 """Splits a single string by ',' performs a line strip and then title cases the resulting string
52 Args:
53 tag_string (str): [description]
55 Returns:
56 [type]: [description]
57 """
58 if not isinstance(tag_string, str):
59 return None
60 return [x.title().lstrip() for x in tag_string.split(",") if x != ""]
63def split_by_semicolon(input: str): 1a
64 """Splits a single string by ';', performs a line strip removes empty strings"""
66 if not isinstance(input, str):
67 return None
68 return [x.strip() for x in input.split(";") if x]
71def split_by_line_break(input: str): 1a
72 """Splits a single string by line break, performs a line strip removes empty strings"""
73 if not isinstance(input, str):
74 return None
75 return [x.strip() for x in input.split("\n") if x]
78def glob_walker(directory: Path, glob_str: str, return_parent=True) -> list[Path]: # TODO: 1a
79 """A Helper function that will return the glob matches for the temporary directotry
80 that was unpacked and passed in as the `directory` parameter. If `return_parent` is
81 True the return Paths will be the parent directory for the file that was matched. If
82 false the file itself will be returned.
84 Args:
85 directory (Path): Path to search directory
86 glob_str ([type]): glob style match string
87 return_parent (bool, optional): To return parent directory of match. Defaults to True.
89 Returns:
90 list[Path]:
91 """
92 directory = directory if isinstance(directory, Path) else Path(directory)
93 matches = []
94 for match in directory.glob(glob_str):
95 if return_parent:
96 matches.append(match.parent)
97 else:
98 matches.append(match)
100 return matches
103def import_image(src: str | Path, recipe_id: UUID4): 1a
104 """Read the successful migrations attribute and for each import the image
105 appropriately into the image directory. Minification is done in mass
106 after the migration occurs.
107 May raise an UnidentifiedImageError if the file is not a recognised format.
108 """
110 if isinstance(src, str):
111 src = Path(src)
113 if not src.exists():
114 return
116 data_service = RecipeDataService(recipe_id=recipe_id)
117 data_service.write_image(src, src.suffix)
120async def scrape_image(image_url: str, recipe_id: UUID4): 1a
121 """Read the successful migrations attribute and for each scrape the image
122 appropriately into the image directory. Minification is done in mass
123 after the migration occurs.
124 """
126 if not isinstance(image_url, str):
127 return
129 data_service = RecipeDataService(recipe_id=recipe_id)
131 try:
132 await data_service.scrape_image(image_url)
133 except UnidentifiedImageError:
134 return
137def parse_iso8601_duration(time: str | None) -> str: 1a
138 """
139 Parses an ISO8601 duration string
141 https://en.wikipedia.org/wiki/ISO_8601#Durations
142 """
144 if not time:
145 return ""
146 if time[0] == "P":
147 try:
148 delta = isodate.parse_duration(time)
149 if not isinstance(delta, timedelta):
150 return time
151 except isodate.ISO8601Error:
152 return time
154 # TODO: make singular and plural translatable
155 time_part_map: dict[str, dict] = {
156 "days": {"singular": "day", "plural": "days"},
157 "hours": {"singular": "hour", "plural": "hours"},
158 "minutes": {"singular": "minute", "plural": "minutes"},
159 "seconds": {"singular": "second", "plural": "seconds"},
160 }
162 delta = cast(timedelta, delta)
163 time_part_map["days"]["value"] = delta.days
164 time_part_map["hours"]["value"] = delta.seconds // 3600
165 time_part_map["minutes"]["value"] = (delta.seconds // 60) % 60
166 time_part_map["seconds"]["value"] = delta.seconds % 60
168 return_strings: list[str] = []
169 for value_map in time_part_map.values():
170 if not (value := value_map["value"]):
171 continue
173 unit_key = "singular" if value == 1 else "plural"
174 return_strings.append(f"{value} {value_map[unit_key]}")
176 return " ".join(return_strings) if return_strings else time
179def format_time(minutes: int) -> str: 1a
180 # TODO: make this translatable
181 hour_label = "hour"
182 hours_label = "hours"
183 minute_label = "minute"
184 minutes_label = "minutes"
186 hours, minutes = divmod(minutes, 60)
187 parts: list[str] = []
189 if hours:
190 parts.append(f"{int(hours)} {hour_label if hours == 1 else hours_label}")
191 if minutes:
192 parts.append(f"{minutes} {minute_label if minutes == 1 else minutes_label}")
194 return " ".join(parts)