Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/migrations/cookn.py: 8%

278 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1import os 1a

2import re 1a

3import tempfile 1a

4import zipfile 1a

5from pathlib import Path 1a

6from typing import Any 1a

7 

8from mealie.schema.recipe.recipe_ingredient import RecipeIngredient, SaveIngredientFood, SaveIngredientUnit 1a

9from mealie.schema.reports.reports import ReportEntryCreate 1a

10from mealie.services.parser_services._base import DataMatcher 1a

11from mealie.services.parser_services.parser_utils.string_utils import extract_quantity_from_string 1a

12 

13from ._migration_base import BaseMigrator 1a

14from .utils.migration_helpers import format_time 1a

15 

16 

17class DSVParser: 1a

18 def __init__(self, directory: Path): 1a

19 self.directory = directory 

20 self.tables: dict[str, list[dict[str, Any]]] = {} 

21 self.load_files() 

22 

23 def load_files(self) -> None: 1a

24 """Loads all .dsv files from the directory into lists of dictionaries.""" 

25 for file in self.directory.glob("*.dsv"): 

26 with open(file, "rb") as f: 

27 file_contents = f.read().decode("utf-8", errors="ignore") 

28 

29 # Replace unique delimiters 

30 file_contents = file_contents.replace("||||", "\x06") 

31 file_contents = file_contents.replace("!@#%^&*()", "\x07") 

32 

33 # Manually parse rows 

34 rows = file_contents.strip().split("\x07") 

35 if not rows: 

36 continue # Skip empty files 

37 

38 # Extract header 

39 headers = rows[0].split("\x06") 

40 data = [dict(zip(headers, row.split("\x06"), strict=False)) for row in rows[1:] if row] 

41 

42 self.tables[file.stem] = data # Store parsed table 

43 

44 def query_by_id(self, table_name: str, column_name: str, ids: list[str]) -> list[dict[str, Any]]: 1a

45 """Returns rows from a specified table where column_name matches any of the provided IDs.""" 

46 if table_name not in self.tables: 

47 raise ValueError(f"Table '{table_name}' not found.") 

48 

49 results = [row for row in self.tables[table_name] if row.get(column_name) in ids] 

50 

51 if len(results) == 0: 

52 results.append({}) 

53 

54 return results 

55 

56 def get_data(self, row: dict[str, Any], column: str) -> Any: 1a

57 """Get column data from row. Handles a few bad data cases.""" 

58 data = row.get(column, "") 

59 if data is None or data == "[null]": 

60 data = "" 

61 return data 

62 

63 def get_table(self, table_name: str) -> list[dict[str, Any]]: 1a

64 """Returns the entire table as a list of dictionaries.""" 

65 if table_name not in self.tables: 

66 raise ValueError(f"Table '{table_name}' not found.") 

67 return self.tables[table_name] 

68 

69 def list_tables(self) -> list[str]: 1a

70 """Returns a list of available tables.""" 

71 return list(self.tables.keys()) 

72 

73 

74class CooknMigrator(BaseMigrator): 1a

75 def __init__(self, **kwargs): 1a

76 super().__init__(**kwargs) 

77 self.name = "cookn" 

78 self.key_aliases = [] 

79 self.matcher = DataMatcher(self.db, food_fuzzy_match_threshold=95, unit_fuzzy_match_threshold=100) 

80 

81 def _parse_units_table(self, db: DSVParser): 1a

82 """Parses the Cook'n units table and adds missing units to Mealie DB.""" 

83 _units_table = db.get_table("temp_unit") 

84 for _unit_row in _units_table: 

85 name = db.get_data(_unit_row, "NAME") 

86 plural_name = db.get_data(_unit_row, "PLURAL_NAME") 

87 abbreviation = db.get_data(_unit_row, "ABBREVIATION") 

88 

89 # exact match 

90 if not name or name in self.matcher.units_by_alias: 

91 continue 

92 

93 # fuzzy match 

94 match = self.matcher.find_unit_match(name) 

95 if match is None: 

96 save = SaveIngredientUnit( 

97 group_id=self.group.id, 

98 name=name, 

99 plural_name=plural_name, 

100 abbreviation=abbreviation, 

101 ) 

102 # update DataMatcher 

103 self.matcher = DataMatcher(self.db, food_fuzzy_match_threshold=95, unit_fuzzy_match_threshold=100) 

104 try: 

105 self.db.ingredient_units.create(save) 

106 except Exception as e: 

107 self.logger.error(e) 

108 else: 

109 self.logger.debug("Fuzzy match for unit (%s -> %s)", name, match.name) 

110 

111 def _parse_foods_table(self, db: DSVParser): 1a

112 """Parses the Cook'n food table and adds missing foods to Mealie DB.""" 

113 _foods_table = db.get_table("temp_food") 

114 for _food_row in _foods_table: 

115 name = db.get_data(_food_row, "NAME") 

116 plural_name = db.get_data(_food_row, "PLURAL_NAME") 

117 

118 # exact match 

119 if not name or name in self.matcher.foods_by_alias: 

120 continue 

121 

122 match = self.matcher.find_food_match(name) 

123 if match is None: 

124 save = SaveIngredientFood(group_id=self.group.id, name=name, plural_name=plural_name, description="") 

125 # update DataMatcher 

126 self.matcher = DataMatcher(self.db, food_fuzzy_match_threshold=95, unit_fuzzy_match_threshold=100) 

127 try: 

128 self.db.ingredient_foods.create(save) 

129 except Exception as e: 

130 self.logger.error(e) 

131 else: 

132 self.logger.debug("Fuzzy match for food (%s -> %s)", name, match.name) 

133 

134 def _parse_media(self, _cookbook_id: str, _chapter_id: str, _recipe_id: str, db: DSVParser) -> str | None: 1a

135 """Checks recipe, chapter, and cookbook for images. Return path to most specific available image.""" 

136 _media_recipe_row = db.query_by_id("temp_media", "ENTITY_ID", [_recipe_id])[0] 

137 _media_chapter_row = db.query_by_id("temp_media", "ENTITY_ID", [_chapter_id])[0] 

138 _media_cookbook_row = db.query_by_id("temp_media", "ENTITY_ID", [_cookbook_id])[0] 

139 

140 # Get recipe image 

141 _media_row = _media_recipe_row 

142 _media_id = db.get_data(_media_row, "ID") 

143 if _media_id == "": 

144 # Get chapter image if no recipe image 

145 _media_row = _media_chapter_row 

146 _media_id = db.get_data(_media_row, "ID") 

147 if _media_id == "": 

148 # Get cookbook image if no chapter image 

149 _media_row = _media_cookbook_row 

150 _media_id = db.get_data(_media_row, "ID") 

151 

152 # If we found an image 

153 if _media_id != "": 

154 _media_type = db.get_data(_media_row, "MEDIA_CONTENT_TYPE") 

155 # If the file has no extention add one (this is the normal case) 

156 if Path(str(_media_id)).suffix == "": 

157 if _media_type != "": 

158 # Determine file extension based on media type 

159 _extension = _media_type.split("/")[-1] 

160 _old_image_path = os.path.join(db.directory, str(_media_id)) 

161 new_image_path = f"{_old_image_path}.{_extension}" 

162 # Rename the file if it exists and has no extension 

163 if os.path.exists(_old_image_path) and not os.path.exists(new_image_path): 

164 os.rename(_old_image_path, new_image_path) 

165 if Path(new_image_path).exists(): 

166 return new_image_path 

167 else: 

168 return os.path.join(db.directory, str(_media_id)) 

169 return None 

170 

171 def _parse_ingredients(self, _recipe_id: str, db: DSVParser) -> list[RecipeIngredient]: 1a

172 """Parses ingredients for recipe from Cook'n ingredients table.""" 

173 ingredients = [] 

174 ingredients_order = [] 

175 _ingredient_rows = db.query_by_id("temp_ingredient", "PARENT_ID", [_recipe_id]) 

176 for _ingredient_row in _ingredient_rows: 

177 _unit_id = db.get_data(_ingredient_row, "AMOUNT_UNIT") 

178 _unit_row = db.query_by_id("temp_unit", "ID", [_unit_id])[0] 

179 _food_id = db.get_data(_ingredient_row, "INGREDIENT_FOOD_ID") 

180 _food_row = db.query_by_id("temp_food", "ID", [_food_id])[0] 

181 _brand_id = db.get_data(_ingredient_row, "BRAND_ID") 

182 _brand_row = db.query_by_id("temp_brand", "ID", [_brand_id])[0] 

183 

184 amount_str = db.get_data(_ingredient_row, "AMOUNT_QTY_STRING") 

185 amount, _ = extract_quantity_from_string(amount_str) 

186 unit_name = db.get_data(_unit_row, "NAME") 

187 food_name = db.get_data(_food_row, "NAME") 

188 

189 # Match unit and food from Mealie DB 

190 unit = self.matcher.find_unit_match(unit_name) 

191 food = self.matcher.find_food_match(food_name) 

192 

193 pre_qualifier = db.get_data(_ingredient_row, "PRE_QUALIFIER").lstrip().rstrip() 

194 post_qualifier = db.get_data(_ingredient_row, "POST_QUALIFIER").lstrip().rstrip() 

195 brand = db.get_data(_brand_row, "NAME") 

196 

197 # Combine pre-qualifier and post-qualifier into single note 

198 note = "" 

199 if pre_qualifier != "": 

200 if pre_qualifier[-1] == ",": 

201 pre_qualifier = pre_qualifier[:-1] 

202 note += pre_qualifier 

203 if post_qualifier != "": 

204 if pre_qualifier != "": 

205 note += ", " 

206 if post_qualifier[-1] == ",": 

207 post_qualifier = post_qualifier[:-1] 

208 if post_qualifier[0] == ",": 

209 post_qualifier = post_qualifier[1:].lstrip() 

210 note += post_qualifier 

211 

212 # Remove empty lines (unless amount was a text input) 

213 if not amount and not unit and not food and not note: 

214 self.logger.debug("%s, %s", amount_str, type(amount_str)) 

215 if amount_str and amount_str != "0": 

216 note = amount_str 

217 else: 

218 continue 

219 

220 og_text = "" 

221 if amount_str != "0": 

222 og_text += amount_str + " " 

223 if unit_name: 

224 og_text += unit_name + " " 

225 if pre_qualifier: 

226 og_text += pre_qualifier + " " 

227 if food_name: 

228 og_text += food_name + " " 

229 if post_qualifier: 

230 og_text += post_qualifier + " " 

231 if brand: 

232 og_text += brand 

233 

234 base_ingredient = RecipeIngredient( 

235 quantity=amount, 

236 unit=unit, 

237 food=food, 

238 note=note, 

239 original_text=og_text.strip(), 

240 disable_amount=False, 

241 ) 

242 try: 

243 _display_order = db.get_data(_ingredient_row, "DISPLAY_ORDER") 

244 ingredients_order.append(int(_display_order)) 

245 ingredients.append(base_ingredient) 

246 except ValueError: 

247 self.logger.warning("Invalid ingredient order: %s, %s", _display_order, base_ingredient.original_text) 

248 continue 

249 return [obj for _, obj in sorted(zip(ingredients_order, ingredients, strict=False))] 

250 

251 def _parse_instructions(self, instructions: str) -> list[str]: 1a

252 """ 

253 Parses recipe instructions into a list of steps. 

254 Detects numbered lists, bulleted lists, and plain new-line-separated steps. 

255 """ 

256 # Detects numbered lists (1., 1), 1-, etc.) and bulleted lists (-, *, •) 

257 numbered_pattern = re.compile(r"^(\d+)[.)-]\s*(.*)") 

258 bullet_pattern = re.compile(r"^[\-*•]\s*(.*)") 

259 

260 lines = instructions.splitlines() 

261 steps = [] 

262 current_step: list[str] = [] 

263 

264 for line in lines: 

265 line = line.strip() 

266 

267 if not line: 

268 continue # Skip empty lines 

269 

270 num_match = numbered_pattern.match(line) 

271 bullet_match = bullet_pattern.match(line) 

272 

273 if num_match: 

274 # If there's a current step, store it before starting a new one 

275 if current_step: 

276 steps.append("\n".join(current_step)) 

277 current_step = [] 

278 

279 current_step.append(num_match.group(2)) 

280 elif bullet_match: 

281 if current_step: 

282 steps.append("\n".join(current_step)) 

283 current_step = [] 

284 

285 current_step.append(bullet_match.group(1)) 

286 else: 

287 # Continuation of a previous step 

288 if current_step: 

289 current_step.append(line) 

290 else: 

291 # If no clear separator is found, treat each new line as a new step 

292 steps.append(line) 

293 

294 if current_step: 

295 steps.append(" ".join(current_step)) 

296 

297 return steps 

298 

299 def _process_recipe_document(self, _recipe_row: dict[str, Any], db: DSVParser) -> dict: 1a

300 """Parses recipe row from the Cook'n recipe table.""" 

301 recipe_data: dict[str, str | list[str] | list[RecipeIngredient]] = {} 

302 

303 # Select db values 

304 _recipe_id = db.get_data(_recipe_row, "ID") 

305 _recipe_desc_row = db.query_by_id("temp_recipe_desc", "ID", [_recipe_id])[0] 

306 _chapter_id = db.get_data(_recipe_desc_row, "PARENT") 

307 _chapter_row = db.query_by_id("temp_chapter_desc", "ID", [_chapter_id])[0] 

308 _cookbook_id = db.get_data(_chapter_row, "PARENT") 

309 _cookbook_row = db.query_by_id("temp_cookBook_desc", "ID", [_cookbook_id])[0] 

310 

311 # Parse general recipe info 

312 cookbook = db.get_data(_cookbook_row, "TITLE") 

313 chapter = db.get_data(_chapter_row, "TITLE") 

314 name = db.get_data(_recipe_desc_row, "TITLE") 

315 description = db.get_data(_recipe_desc_row, "DESCRIPTION") 

316 serves = db.get_data(_recipe_row, "SERVES") 

317 try: 

318 prep_time = int(db.get_data(_recipe_row, "PREPTIME")) 

319 except ValueError: 

320 prep_time = 0 

321 try: 

322 cook_time = int(db.get_data(_recipe_row, "COOKTIME")) 

323 except ValueError: 

324 cook_time = 0 

325 

326 recipe_data["recipeCategory"] = [cookbook + " - " + chapter] 

327 recipe_data["name"] = name 

328 recipe_data["description"] = description 

329 recipe_data["recipeYield"] = serves 

330 recipe_data["prepTime"] = format_time(prep_time) 

331 recipe_data["performTime"] = format_time(cook_time) 

332 recipe_data["totalTime"] = format_time(prep_time + cook_time) 

333 

334 # Parse image file 

335 image_path = self._parse_media(_cookbook_id, _chapter_id, _recipe_id, db) 

336 if image_path is not None: 

337 recipe_data["image"] = [image_path] 

338 

339 # Parse ingredients 

340 recipe_data["_parsed_ingredients"] = self._parse_ingredients(_recipe_id, db) 

341 

342 # Parse instructions 

343 recipe_data["recipeInstructions"] = self._parse_instructions(db.get_data(_recipe_row, "INSTRUCTIONS")) 

344 

345 return recipe_data 

346 

347 def _process_cookbook(self, path: Path) -> None: 1a

348 """Processes contents of a zip file.""" 

349 source_dir = self.get_zip_base_path(path) 

350 db = DSVParser(source_dir) 

351 # Load units and foods from Cook'n 

352 self._parse_units_table(db) 

353 self._parse_foods_table(db) 

354 # Reload DataMatcher with updated tables 

355 self.matcher = DataMatcher(self.db, food_fuzzy_match_threshold=95, unit_fuzzy_match_threshold=100) 

356 

357 # Load recipes from cookn 

358 _recipe_table = db.get_table("temp_recipe") 

359 

360 recipes_as_dicts = [] 

361 for _recipe_row in _recipe_table: 

362 try: 

363 recipes_as_dicts.append(self._process_recipe_document(_recipe_row, db)) 

364 

365 except Exception as e: 

366 self.report_entries.append( 

367 ReportEntryCreate( 

368 report_id=self.report_id, 

369 success=False, 

370 message="Failed to parse recipe", 

371 exception=f"{type(e).__name__}: {e}", 

372 ) 

373 ) 

374 

375 recipes = [] 

376 for r in recipes_as_dicts: 

377 # Clean recipes and re-add ingredient w/ amounts 

378 ingredients = r["_parsed_ingredients"] 

379 r = self.clean_recipe_dictionary(r) 

380 r.recipe_ingredient = ingredients 

381 recipes.append(r) 

382 

383 # add recipes and images to database 

384 results = self.import_recipes_to_database(recipes) 

385 recipe_lookup = {r.slug: r for r in recipes} 

386 for slug, recipe_id, status in results: 

387 if status: 

388 recipe = recipe_lookup.get(slug) 

389 if recipe: 

390 if recipe.image: 

391 self.import_image(slug, recipe.image, recipe_id) 

392 else: 

393 index_len = len(slug.split("-")[-1]) 

394 recipe = recipe_lookup.get(slug[: -(index_len + 1)]) 

395 if recipe: 

396 self.logger.warning("Duplicate recipe (%s) found! Saved as copy...", recipe.name) 

397 if recipe.image: 

398 self.import_image(slug, recipe.image, recipe_id) 

399 else: 

400 self.logger.warning("Failed to lookup recipe! (%s)", slug) 

401 

402 def _migrate(self) -> None: 1a

403 """Migrates recipes from Cook'n cookboop .zip. Also will handle a .zip folder of .zip folders""" 

404 with tempfile.TemporaryDirectory() as tmpdir: 

405 with zipfile.ZipFile(self.archive) as zip_file: 

406 zip_file.extractall(tmpdir) 

407 

408 # Process single zipped cookbook 

409 if Path(f"{tmpdir}/temp_recipe.dsv").exists(): 

410 self._process_cookbook(Path(tmpdir)) 

411 

412 # Process a zip folder of zipped cookbooks 

413 for file in Path(tmpdir).glob("*.zip"): 

414 with tempfile.TemporaryDirectory() as tmpdir2: 

415 with zipfile.ZipFile(file) as zip_file2: 

416 zip_file2.extractall(tmpdir2) 

417 

418 self._process_cookbook(Path(tmpdir2))