Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/migrations/cookn.py: 8%
278 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1import os 1a
2import re 1a
3import tempfile 1a
4import zipfile 1a
5from pathlib import Path 1a
6from typing import Any 1a
8from mealie.schema.recipe.recipe_ingredient import RecipeIngredient, SaveIngredientFood, SaveIngredientUnit 1a
9from mealie.schema.reports.reports import ReportEntryCreate 1a
10from mealie.services.parser_services._base import DataMatcher 1a
11from mealie.services.parser_services.parser_utils.string_utils import extract_quantity_from_string 1a
13from ._migration_base import BaseMigrator 1a
14from .utils.migration_helpers import format_time 1a
17class DSVParser: 1a
18 def __init__(self, directory: Path): 1a
19 self.directory = directory
20 self.tables: dict[str, list[dict[str, Any]]] = {}
21 self.load_files()
23 def load_files(self) -> None: 1a
24 """Loads all .dsv files from the directory into lists of dictionaries."""
25 for file in self.directory.glob("*.dsv"):
26 with open(file, "rb") as f:
27 file_contents = f.read().decode("utf-8", errors="ignore")
29 # Replace unique delimiters
30 file_contents = file_contents.replace("||||", "\x06")
31 file_contents = file_contents.replace("!@#%^&*()", "\x07")
33 # Manually parse rows
34 rows = file_contents.strip().split("\x07")
35 if not rows:
36 continue # Skip empty files
38 # Extract header
39 headers = rows[0].split("\x06")
40 data = [dict(zip(headers, row.split("\x06"), strict=False)) for row in rows[1:] if row]
42 self.tables[file.stem] = data # Store parsed table
44 def query_by_id(self, table_name: str, column_name: str, ids: list[str]) -> list[dict[str, Any]]: 1a
45 """Returns rows from a specified table where column_name matches any of the provided IDs."""
46 if table_name not in self.tables:
47 raise ValueError(f"Table '{table_name}' not found.")
49 results = [row for row in self.tables[table_name] if row.get(column_name) in ids]
51 if len(results) == 0:
52 results.append({})
54 return results
56 def get_data(self, row: dict[str, Any], column: str) -> Any: 1a
57 """Get column data from row. Handles a few bad data cases."""
58 data = row.get(column, "")
59 if data is None or data == "[null]":
60 data = ""
61 return data
63 def get_table(self, table_name: str) -> list[dict[str, Any]]: 1a
64 """Returns the entire table as a list of dictionaries."""
65 if table_name not in self.tables:
66 raise ValueError(f"Table '{table_name}' not found.")
67 return self.tables[table_name]
69 def list_tables(self) -> list[str]: 1a
70 """Returns a list of available tables."""
71 return list(self.tables.keys())
74class CooknMigrator(BaseMigrator): 1a
75 def __init__(self, **kwargs): 1a
76 super().__init__(**kwargs)
77 self.name = "cookn"
78 self.key_aliases = []
79 self.matcher = DataMatcher(self.db, food_fuzzy_match_threshold=95, unit_fuzzy_match_threshold=100)
81 def _parse_units_table(self, db: DSVParser): 1a
82 """Parses the Cook'n units table and adds missing units to Mealie DB."""
83 _units_table = db.get_table("temp_unit")
84 for _unit_row in _units_table:
85 name = db.get_data(_unit_row, "NAME")
86 plural_name = db.get_data(_unit_row, "PLURAL_NAME")
87 abbreviation = db.get_data(_unit_row, "ABBREVIATION")
89 # exact match
90 if not name or name in self.matcher.units_by_alias:
91 continue
93 # fuzzy match
94 match = self.matcher.find_unit_match(name)
95 if match is None:
96 save = SaveIngredientUnit(
97 group_id=self.group.id,
98 name=name,
99 plural_name=plural_name,
100 abbreviation=abbreviation,
101 )
102 # update DataMatcher
103 self.matcher = DataMatcher(self.db, food_fuzzy_match_threshold=95, unit_fuzzy_match_threshold=100)
104 try:
105 self.db.ingredient_units.create(save)
106 except Exception as e:
107 self.logger.error(e)
108 else:
109 self.logger.debug("Fuzzy match for unit (%s -> %s)", name, match.name)
111 def _parse_foods_table(self, db: DSVParser): 1a
112 """Parses the Cook'n food table and adds missing foods to Mealie DB."""
113 _foods_table = db.get_table("temp_food")
114 for _food_row in _foods_table:
115 name = db.get_data(_food_row, "NAME")
116 plural_name = db.get_data(_food_row, "PLURAL_NAME")
118 # exact match
119 if not name or name in self.matcher.foods_by_alias:
120 continue
122 match = self.matcher.find_food_match(name)
123 if match is None:
124 save = SaveIngredientFood(group_id=self.group.id, name=name, plural_name=plural_name, description="")
125 # update DataMatcher
126 self.matcher = DataMatcher(self.db, food_fuzzy_match_threshold=95, unit_fuzzy_match_threshold=100)
127 try:
128 self.db.ingredient_foods.create(save)
129 except Exception as e:
130 self.logger.error(e)
131 else:
132 self.logger.debug("Fuzzy match for food (%s -> %s)", name, match.name)
134 def _parse_media(self, _cookbook_id: str, _chapter_id: str, _recipe_id: str, db: DSVParser) -> str | None: 1a
135 """Checks recipe, chapter, and cookbook for images. Return path to most specific available image."""
136 _media_recipe_row = db.query_by_id("temp_media", "ENTITY_ID", [_recipe_id])[0]
137 _media_chapter_row = db.query_by_id("temp_media", "ENTITY_ID", [_chapter_id])[0]
138 _media_cookbook_row = db.query_by_id("temp_media", "ENTITY_ID", [_cookbook_id])[0]
140 # Get recipe image
141 _media_row = _media_recipe_row
142 _media_id = db.get_data(_media_row, "ID")
143 if _media_id == "":
144 # Get chapter image if no recipe image
145 _media_row = _media_chapter_row
146 _media_id = db.get_data(_media_row, "ID")
147 if _media_id == "":
148 # Get cookbook image if no chapter image
149 _media_row = _media_cookbook_row
150 _media_id = db.get_data(_media_row, "ID")
152 # If we found an image
153 if _media_id != "":
154 _media_type = db.get_data(_media_row, "MEDIA_CONTENT_TYPE")
155 # If the file has no extention add one (this is the normal case)
156 if Path(str(_media_id)).suffix == "":
157 if _media_type != "":
158 # Determine file extension based on media type
159 _extension = _media_type.split("/")[-1]
160 _old_image_path = os.path.join(db.directory, str(_media_id))
161 new_image_path = f"{_old_image_path}.{_extension}"
162 # Rename the file if it exists and has no extension
163 if os.path.exists(_old_image_path) and not os.path.exists(new_image_path):
164 os.rename(_old_image_path, new_image_path)
165 if Path(new_image_path).exists():
166 return new_image_path
167 else:
168 return os.path.join(db.directory, str(_media_id))
169 return None
171 def _parse_ingredients(self, _recipe_id: str, db: DSVParser) -> list[RecipeIngredient]: 1a
172 """Parses ingredients for recipe from Cook'n ingredients table."""
173 ingredients = []
174 ingredients_order = []
175 _ingredient_rows = db.query_by_id("temp_ingredient", "PARENT_ID", [_recipe_id])
176 for _ingredient_row in _ingredient_rows:
177 _unit_id = db.get_data(_ingredient_row, "AMOUNT_UNIT")
178 _unit_row = db.query_by_id("temp_unit", "ID", [_unit_id])[0]
179 _food_id = db.get_data(_ingredient_row, "INGREDIENT_FOOD_ID")
180 _food_row = db.query_by_id("temp_food", "ID", [_food_id])[0]
181 _brand_id = db.get_data(_ingredient_row, "BRAND_ID")
182 _brand_row = db.query_by_id("temp_brand", "ID", [_brand_id])[0]
184 amount_str = db.get_data(_ingredient_row, "AMOUNT_QTY_STRING")
185 amount, _ = extract_quantity_from_string(amount_str)
186 unit_name = db.get_data(_unit_row, "NAME")
187 food_name = db.get_data(_food_row, "NAME")
189 # Match unit and food from Mealie DB
190 unit = self.matcher.find_unit_match(unit_name)
191 food = self.matcher.find_food_match(food_name)
193 pre_qualifier = db.get_data(_ingredient_row, "PRE_QUALIFIER").lstrip().rstrip()
194 post_qualifier = db.get_data(_ingredient_row, "POST_QUALIFIER").lstrip().rstrip()
195 brand = db.get_data(_brand_row, "NAME")
197 # Combine pre-qualifier and post-qualifier into single note
198 note = ""
199 if pre_qualifier != "":
200 if pre_qualifier[-1] == ",":
201 pre_qualifier = pre_qualifier[:-1]
202 note += pre_qualifier
203 if post_qualifier != "":
204 if pre_qualifier != "":
205 note += ", "
206 if post_qualifier[-1] == ",":
207 post_qualifier = post_qualifier[:-1]
208 if post_qualifier[0] == ",":
209 post_qualifier = post_qualifier[1:].lstrip()
210 note += post_qualifier
212 # Remove empty lines (unless amount was a text input)
213 if not amount and not unit and not food and not note:
214 self.logger.debug("%s, %s", amount_str, type(amount_str))
215 if amount_str and amount_str != "0":
216 note = amount_str
217 else:
218 continue
220 og_text = ""
221 if amount_str != "0":
222 og_text += amount_str + " "
223 if unit_name:
224 og_text += unit_name + " "
225 if pre_qualifier:
226 og_text += pre_qualifier + " "
227 if food_name:
228 og_text += food_name + " "
229 if post_qualifier:
230 og_text += post_qualifier + " "
231 if brand:
232 og_text += brand
234 base_ingredient = RecipeIngredient(
235 quantity=amount,
236 unit=unit,
237 food=food,
238 note=note,
239 original_text=og_text.strip(),
240 disable_amount=False,
241 )
242 try:
243 _display_order = db.get_data(_ingredient_row, "DISPLAY_ORDER")
244 ingredients_order.append(int(_display_order))
245 ingredients.append(base_ingredient)
246 except ValueError:
247 self.logger.warning("Invalid ingredient order: %s, %s", _display_order, base_ingredient.original_text)
248 continue
249 return [obj for _, obj in sorted(zip(ingredients_order, ingredients, strict=False))]
251 def _parse_instructions(self, instructions: str) -> list[str]: 1a
252 """
253 Parses recipe instructions into a list of steps.
254 Detects numbered lists, bulleted lists, and plain new-line-separated steps.
255 """
256 # Detects numbered lists (1., 1), 1-, etc.) and bulleted lists (-, *, •)
257 numbered_pattern = re.compile(r"^(\d+)[.)-]\s*(.*)")
258 bullet_pattern = re.compile(r"^[\-*•]\s*(.*)")
260 lines = instructions.splitlines()
261 steps = []
262 current_step: list[str] = []
264 for line in lines:
265 line = line.strip()
267 if not line:
268 continue # Skip empty lines
270 num_match = numbered_pattern.match(line)
271 bullet_match = bullet_pattern.match(line)
273 if num_match:
274 # If there's a current step, store it before starting a new one
275 if current_step:
276 steps.append("\n".join(current_step))
277 current_step = []
279 current_step.append(num_match.group(2))
280 elif bullet_match:
281 if current_step:
282 steps.append("\n".join(current_step))
283 current_step = []
285 current_step.append(bullet_match.group(1))
286 else:
287 # Continuation of a previous step
288 if current_step:
289 current_step.append(line)
290 else:
291 # If no clear separator is found, treat each new line as a new step
292 steps.append(line)
294 if current_step:
295 steps.append(" ".join(current_step))
297 return steps
299 def _process_recipe_document(self, _recipe_row: dict[str, Any], db: DSVParser) -> dict: 1a
300 """Parses recipe row from the Cook'n recipe table."""
301 recipe_data: dict[str, str | list[str] | list[RecipeIngredient]] = {}
303 # Select db values
304 _recipe_id = db.get_data(_recipe_row, "ID")
305 _recipe_desc_row = db.query_by_id("temp_recipe_desc", "ID", [_recipe_id])[0]
306 _chapter_id = db.get_data(_recipe_desc_row, "PARENT")
307 _chapter_row = db.query_by_id("temp_chapter_desc", "ID", [_chapter_id])[0]
308 _cookbook_id = db.get_data(_chapter_row, "PARENT")
309 _cookbook_row = db.query_by_id("temp_cookBook_desc", "ID", [_cookbook_id])[0]
311 # Parse general recipe info
312 cookbook = db.get_data(_cookbook_row, "TITLE")
313 chapter = db.get_data(_chapter_row, "TITLE")
314 name = db.get_data(_recipe_desc_row, "TITLE")
315 description = db.get_data(_recipe_desc_row, "DESCRIPTION")
316 serves = db.get_data(_recipe_row, "SERVES")
317 try:
318 prep_time = int(db.get_data(_recipe_row, "PREPTIME"))
319 except ValueError:
320 prep_time = 0
321 try:
322 cook_time = int(db.get_data(_recipe_row, "COOKTIME"))
323 except ValueError:
324 cook_time = 0
326 recipe_data["recipeCategory"] = [cookbook + " - " + chapter]
327 recipe_data["name"] = name
328 recipe_data["description"] = description
329 recipe_data["recipeYield"] = serves
330 recipe_data["prepTime"] = format_time(prep_time)
331 recipe_data["performTime"] = format_time(cook_time)
332 recipe_data["totalTime"] = format_time(prep_time + cook_time)
334 # Parse image file
335 image_path = self._parse_media(_cookbook_id, _chapter_id, _recipe_id, db)
336 if image_path is not None:
337 recipe_data["image"] = [image_path]
339 # Parse ingredients
340 recipe_data["_parsed_ingredients"] = self._parse_ingredients(_recipe_id, db)
342 # Parse instructions
343 recipe_data["recipeInstructions"] = self._parse_instructions(db.get_data(_recipe_row, "INSTRUCTIONS"))
345 return recipe_data
347 def _process_cookbook(self, path: Path) -> None: 1a
348 """Processes contents of a zip file."""
349 source_dir = self.get_zip_base_path(path)
350 db = DSVParser(source_dir)
351 # Load units and foods from Cook'n
352 self._parse_units_table(db)
353 self._parse_foods_table(db)
354 # Reload DataMatcher with updated tables
355 self.matcher = DataMatcher(self.db, food_fuzzy_match_threshold=95, unit_fuzzy_match_threshold=100)
357 # Load recipes from cookn
358 _recipe_table = db.get_table("temp_recipe")
360 recipes_as_dicts = []
361 for _recipe_row in _recipe_table:
362 try:
363 recipes_as_dicts.append(self._process_recipe_document(_recipe_row, db))
365 except Exception as e:
366 self.report_entries.append(
367 ReportEntryCreate(
368 report_id=self.report_id,
369 success=False,
370 message="Failed to parse recipe",
371 exception=f"{type(e).__name__}: {e}",
372 )
373 )
375 recipes = []
376 for r in recipes_as_dicts:
377 # Clean recipes and re-add ingredient w/ amounts
378 ingredients = r["_parsed_ingredients"]
379 r = self.clean_recipe_dictionary(r)
380 r.recipe_ingredient = ingredients
381 recipes.append(r)
383 # add recipes and images to database
384 results = self.import_recipes_to_database(recipes)
385 recipe_lookup = {r.slug: r for r in recipes}
386 for slug, recipe_id, status in results:
387 if status:
388 recipe = recipe_lookup.get(slug)
389 if recipe:
390 if recipe.image:
391 self.import_image(slug, recipe.image, recipe_id)
392 else:
393 index_len = len(slug.split("-")[-1])
394 recipe = recipe_lookup.get(slug[: -(index_len + 1)])
395 if recipe:
396 self.logger.warning("Duplicate recipe (%s) found! Saved as copy...", recipe.name)
397 if recipe.image:
398 self.import_image(slug, recipe.image, recipe_id)
399 else:
400 self.logger.warning("Failed to lookup recipe! (%s)", slug)
402 def _migrate(self) -> None: 1a
403 """Migrates recipes from Cook'n cookboop .zip. Also will handle a .zip folder of .zip folders"""
404 with tempfile.TemporaryDirectory() as tmpdir:
405 with zipfile.ZipFile(self.archive) as zip_file:
406 zip_file.extractall(tmpdir)
408 # Process single zipped cookbook
409 if Path(f"{tmpdir}/temp_recipe.dsv").exists():
410 self._process_cookbook(Path(tmpdir))
412 # Process a zip folder of zipped cookbooks
413 for file in Path(tmpdir).glob("*.zip"):
414 with tempfile.TemporaryDirectory() as tmpdir2:
415 with zipfile.ZipFile(file) as zip_file2:
416 zip_file2.extractall(tmpdir2)
418 self._process_cookbook(Path(tmpdir2))