Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/migrations/_migration_base.py: 17%
152 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1import contextlib 1a
2from pathlib import Path 1a
4from PIL import UnidentifiedImageError 1a
5from pydantic import UUID4 1a
7from mealie.core import root_logger 1a
8from mealie.core.exceptions import UnexpectedNone 1a
9from mealie.lang.providers import Translator 1a
10from mealie.repos.all_repositories import AllRepositories 1a
11from mealie.schema.recipe import Recipe 1a
12from mealie.schema.recipe.recipe_settings import RecipeSettings 1a
13from mealie.schema.reports.reports import ( 1a
14 ReportCategory,
15 ReportCreate,
16 ReportEntryCreate,
17 ReportEntryOut,
18 ReportOut,
19 ReportSummary,
20 ReportSummaryStatus,
21)
22from mealie.services.recipe.recipe_service import RecipeService 1a
23from mealie.services.scraper import cleaner 1a
25from .._base_service import BaseService 1a
26from .utils.database_helpers import DatabaseMigrationHelpers 1a
27from .utils.migration_alias import MigrationAlias 1a
28from .utils.migration_helpers import import_image 1a
31class BaseMigrator(BaseService): 1a
32 key_aliases: list[MigrationAlias] 1a
34 report_entries: list[ReportEntryCreate] 1a
35 report_id: UUID4 1a
36 report: ReportOut 1a
38 helpers: DatabaseMigrationHelpers 1a
40 def __init__( 1a
41 self,
42 archive: Path,
43 db: AllRepositories,
44 session,
45 user_id: UUID4,
46 household_id: UUID4,
47 group_id: UUID4,
48 add_migration_tag: bool,
49 translator: Translator,
50 ):
51 self.archive = archive
52 self.db = db
53 self.session = session
54 self.add_migration_tag = add_migration_tag
55 self.translator = translator
57 user = db.users.get_one(user_id)
58 if not user:
59 raise UnexpectedNone(f"Cannot find user {user_id}")
61 household = db.households.get_one(household_id)
62 if not household:
63 raise UnexpectedNone(f"Cannot find household {household_id}")
65 group = db.groups.get_one(group_id)
66 if not group:
67 raise UnexpectedNone(f"Cannot find group {group_id}")
69 self.user = user
70 self.household = household
71 self.group = group
73 self.name = "migration"
75 self.report_entries = []
77 self.logger = root_logger.get_logger()
79 self.helpers = DatabaseMigrationHelpers(self.db, self.session)
80 self.recipe_service = RecipeService(db, user, household, translator=self.translator)
82 super().__init__()
84 @classmethod 1a
85 def get_zip_base_path(cls, path: Path) -> Path: 1a
86 # Safari mangles our ZIP structure and adds a "__MACOSX" directory at the root along with
87 # an arbitrarily-named directory containing the actual contents. So, if we find a dunder directory
88 # at the root (i.e. __MACOSX) we traverse down the first non-dunder directory and assume this is the base.
89 # We assume migration exports never contain a directory that starts with "__".
90 normal_dirs: list[Path] = []
91 dunder_dirs: list[Path] = []
92 for dir in path.iterdir():
93 if not dir.is_dir():
94 continue
96 if dir.name.startswith("__"):
97 dunder_dirs.append(dir)
98 else:
99 normal_dirs.append(dir)
101 if len(normal_dirs) == 1 and len(dunder_dirs) == 1:
102 return normal_dirs[0]
103 else:
104 return path
106 def _migrate(self) -> None: 1a
107 raise NotImplementedError
109 def _create_report(self, report_name: str) -> None: 1a
110 report_to_save = ReportCreate(
111 name=report_name,
112 category=ReportCategory.migration,
113 status=ReportSummaryStatus.in_progress,
114 group_id=self.group.id,
115 )
117 self.report = self.db.group_reports.create(report_to_save)
118 self.report_id = self.report.id
120 def _save_all_entries(self) -> None: 1a
121 is_success = True
122 is_failure = True
124 new_entries: list[ReportEntryOut] = []
125 for entry in self.report_entries:
126 if is_failure and entry.success:
127 is_failure = False
129 if is_success and not entry.success:
130 is_success = False
132 new_entries.append(self.db.group_report_entries.create(entry))
134 if is_success:
135 self.report.status = ReportSummaryStatus.success
137 if is_failure:
138 self.report.status = ReportSummaryStatus.failure
140 if not is_success and not is_failure:
141 self.report.status = ReportSummaryStatus.partial
143 self.report.entries = new_entries
144 self.db.group_reports.update(self.report.id, self.report)
146 def migrate(self, report_name: str) -> ReportSummary: 1a
147 self._create_report(report_name)
148 self._migrate()
149 self._save_all_entries()
151 result = self.db.group_reports.get_one(self.report_id)
153 if not result:
154 raise ValueError("Report not found")
156 return result
158 def import_recipes_to_database(self, validated_recipes: list[Recipe]) -> list[tuple[str, UUID4, bool]]: 1a
159 """
160 Used as a single access point to process a list of Recipe objects into the
161 database in a predictable way. If an error occurs the session is rolled back
162 and the process will continue. All import information is appended to the
163 'migration_report' attribute to be returned to the frontend for display.
165 Args:
166 validated_recipes (list[Recipe]):
167 """
168 if self.add_migration_tag:
169 migration_tag = self.helpers.get_or_set_tags([self.name])[0]
171 return_vars: list[tuple[str, UUID4, bool]] = []
173 if not self.household.preferences:
174 raise ValueError("Household preferences not found")
176 default_settings = RecipeSettings(
177 public=self.household.preferences.recipe_public,
178 show_nutrition=self.household.preferences.recipe_show_nutrition,
179 show_assets=self.household.preferences.recipe_show_assets,
180 landscape_view=self.household.preferences.recipe_landscape_view,
181 disable_comments=self.household.preferences.recipe_disable_comments,
182 )
184 for recipe in validated_recipes:
185 recipe.settings = default_settings
187 recipe.user_id = self.user.id
188 recipe.group_id = self.group.id
190 if recipe.tags:
191 recipe.tags = self.helpers.get_or_set_tags(x.name for x in recipe.tags)
192 else:
193 recipe.tags = []
195 if recipe.recipe_category:
196 recipe.recipe_category = self.helpers.get_or_set_category(x.name for x in recipe.recipe_category)
198 if self.add_migration_tag:
199 recipe.tags.append(migration_tag)
201 exception: str | Exception = ""
202 status = False
203 try:
204 recipe = self.recipe_service.create_one(recipe)
205 status = True
207 except Exception as inst:
208 exception = str(inst)
209 self.logger.exception(inst)
210 self.session.rollback()
212 if status:
213 message = f"Imported {recipe.name} successfully"
214 else:
215 message = f"Failed to import {recipe.name}"
217 return_vars.append((recipe.slug, recipe.id, status)) # type: ignore
219 self.report_entries.append(
220 ReportEntryCreate(
221 report_id=self.report_id,
222 success=status,
223 message=message,
224 exception=str(exception),
225 )
226 )
228 return return_vars
230 def rewrite_alias(self, recipe_dict: dict) -> dict: 1a
231 """A helper function to reassign attributes by an alias using a list
232 of MigrationAlias objects to rewrite the alias attribute found in the recipe_dict
233 to a
235 Args:
236 recipe_dict (dict): [description]
237 key_aliases (list[MigrationAlias]): [description]
239 Returns:
240 dict: [description]
241 """
242 if not self.key_aliases:
243 return recipe_dict
245 for alias in self.key_aliases:
246 try:
247 prop_value = recipe_dict.pop(alias.alias)
248 except KeyError:
249 continue
251 if alias.func:
252 try:
253 prop_value = alias.func(prop_value)
254 except Exception as e:
255 self.logger.exception(e)
256 continue
258 recipe_dict[alias.key] = prop_value
260 return recipe_dict
262 def clean_recipe_dictionary(self, recipe_dict: dict) -> Recipe: 1a
263 """
264 Calls the rewrite_alias function and the Cleaner.clean function on a
265 dictionary and returns the result unpacked into a Recipe object
266 """
267 recipe_dict = self.rewrite_alias(recipe_dict)
269 with contextlib.suppress(KeyError):
270 del recipe_dict["id"]
272 recipe = cleaner.clean(recipe_dict, self.translator, url=recipe_dict.get("org_url", None))
273 return recipe
275 def import_image(self, slug: str, src: str | Path, recipe_id: UUID4): 1a
276 try:
277 import_image(src, recipe_id)
278 except UnidentifiedImageError as e:
279 self.logger.error(f"Failed to import image for {slug}: {e}")