Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/backups_v2/alchemy_exporter.py: 19%
174 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
1import datetime 1a
2import os 1a
3import uuid 1a
4from logging import Logger 1a
5from os import path 1a
6from textwrap import dedent 1a
7from typing import Any 1a
9from alembic import command 1a
10from alembic.config import Config 1a
11from fastapi.encoders import jsonable_encoder 1a
12from pydantic import BaseModel 1a
13from sqlalchemy import Connection, ForeignKey, ForeignKeyConstraint, MetaData, Table, create_engine, insert, text 1a
14from sqlalchemy.engine import base 1a
15from sqlalchemy.orm import sessionmaker 1a
17from mealie.db import init_db 1a
18from mealie.db.fixes.fix_migration_data import fix_migration_data 1a
19from mealie.db.init_db import ALEMBIC_DIR 1a
20from mealie.db.models._model_utils.guid import GUID 1a
21from mealie.services._base_service import BaseService 1a
24class ForeignKeyDisabler: 1a
25 def __init__(self, connection: Connection, dialect_name: str, *, logger: Logger | None = None): 1a
26 self.connection = connection
27 self.is_postgres = dialect_name == "postgresql"
28 self.logger = logger
30 self._initial_fk_state: str | None = None
32 def __enter__(self): 1a
33 if self.is_postgres:
34 self._initial_fk_state = self.connection.execute(text("SHOW session_replication_role;")).scalar()
35 self.connection.execute(text("SET session_replication_role = 'replica';"))
36 else:
37 self._initial_fk_state = self.connection.execute(text("PRAGMA foreign_keys;")).scalar()
38 self.connection.execute(text("PRAGMA foreign_keys = OFF;"))
40 def __exit__(self, exc_type, exc_val, exc_tb): 1a
41 try:
42 if self.is_postgres:
43 initial_state = self._initial_fk_state or "origin"
44 self.connection.execute(text(f"SET session_replication_role = '{initial_state}';"))
45 else:
46 initial_state = self._initial_fk_state or "ON"
47 self.connection.execute(text(f"PRAGMA foreign_keys = {initial_state};"))
48 except Exception:
49 if self.logger:
50 self.logger.exception("Error when re-enabling foreign keys")
51 raise
54class AlchemyExporter(BaseService): 1a
55 connection_str: str 1a
56 engine: base.Engine 1a
57 meta: MetaData 1a
59 look_for_datetime = {"created_at", "update_at", "date_updated", "timestamp", "expires_at", "locked_at", "last_made"} 1a
60 look_for_date = {"date_added", "date"} 1a
61 look_for_time = {"scheduled_time"} 1a
63 class DateTimeParser(BaseModel): 1a
64 date: datetime.date | None = None 1a
65 dt: datetime.datetime | None = None 1a
66 time: datetime.time | None = None 1a
68 def __init__(self, connection_str: str) -> None: 1a
69 super().__init__()
71 self.connection_str = connection_str
72 self.engine = create_engine(connection_str)
73 self.meta = MetaData()
74 self.session_maker = sessionmaker(bind=self.engine)
76 @staticmethod 1a
77 def is_uuid(value: Any) -> bool: 1a
78 try:
79 uuid.UUID(value)
80 return True
81 except ValueError:
82 return False
84 @staticmethod 1a
85 def is_valid_foreign_key(db_dump: dict[str, list[dict]], fk: ForeignKey, fk_value: Any) -> bool: 1a
86 if not fk_value:
87 return True
89 foreign_table_name = fk.column.table.name
90 foreign_field_name = fk.column.name
92 for row in db_dump.get(foreign_table_name, []):
93 if row[foreign_field_name] == fk_value:
94 return True
96 return False
98 def convert_types(self, data: dict) -> dict: 1a
99 """
100 walks the dictionary to restore all things that look like string representations of their complex types
101 used in the context of reading a json file into a database via SQLAlchemy.
102 """
103 for key, value in data.items():
104 if isinstance(value, dict):
105 data = self.convert_types(value)
106 elif isinstance(value, list): # assume that this is a list of dictionaries
107 data[key] = [self.convert_types(item) for item in value]
108 elif isinstance(value, str):
109 if self.is_uuid(value):
110 # convert the data to the current database's native GUID type
111 data[key] = GUID.convert_value_to_guid(value, self.engine.dialect)
112 if key in self.look_for_datetime:
113 data[key] = self.DateTimeParser(dt=value).dt
114 if key in self.look_for_date:
115 data[key] = self.DateTimeParser(date=value).date
116 if key in self.look_for_time:
117 data[key] = self.DateTimeParser(time=value).time
118 return data
120 def clean_rows(self, db_dump: dict[str, list[dict]], table: Table, rows: list[dict]) -> list[dict]: 1a
121 """
122 Checks rows against foreign key restraints and removes any rows that would violate them
123 """
125 fks = table.foreign_keys
127 valid_rows = []
128 for row in rows:
129 is_valid_row = True
130 for fk in fks:
131 fk_value = row.get(fk.parent.name)
132 if self.is_valid_foreign_key(db_dump, fk, row.get(fk.parent.name)):
133 continue
135 is_valid_row = False
136 self.logger.warning(
137 f"Removing row from table {table.name} because of invalid foreign key {fk.parent.name}: {fk_value}"
138 )
139 self.logger.warning(f"Row: {row}")
140 break
142 if is_valid_row:
143 valid_rows.append(row)
145 return valid_rows
147 def dump_schema(self) -> dict: 1a
148 """
149 Returns the schema of the SQLAlchemy database as a python dictionary. This dictionary is wrapped by
150 jsonable_encoder to ensure that the object can be converted to a json string.
151 """
152 with self.engine.connect() as connection:
153 self.meta.reflect(bind=self.engine)
155 all_tables = self.meta.tables.values()
157 results = {
158 **{table.name: [] for table in all_tables},
159 "alembic_version": [
160 dict(row) for row in connection.execute(text("SELECT * FROM alembic_version")).mappings()
161 ],
162 }
164 return jsonable_encoder(results)
166 def dump(self) -> dict[str, list[dict]]: 1a
167 """
168 Returns the entire SQLAlchemy database as a python dictionary. This dictionary is wrapped by
169 jsonable_encoder to ensure that the object can be converted to a json string.
170 """
172 # run database fixes first so we aren't backing up bad data
173 with self.session_maker() as session:
174 try:
175 fix_migration_data(session)
176 except Exception:
177 self.logger.error("Error fixing migration data during export; continuing anyway")
179 with self.engine.connect() as connection:
180 self.meta.reflect(bind=self.engine) # http://docs.sqlalchemy.org/en/rel_0_9/core/reflection.html
182 result = {
183 table.name: [dict(row) for row in connection.execute(table.select()).mappings()]
184 for table in self.meta.sorted_tables
185 }
187 return jsonable_encoder(result)
189 def restore(self, db_dump: dict) -> None: 1a
190 # setup alembic to run migrations up the version of the backup
191 alembic_data = db_dump["alembic_version"]
192 alembic_version = alembic_data[0]["version_num"]
194 alembic_cfg_path = os.getenv("ALEMBIC_CONFIG_FILE", default=str(ALEMBIC_DIR / "alembic.ini"))
196 if not path.isfile(alembic_cfg_path):
197 raise Exception("Provided alembic config path doesn't exist")
199 alembic_cfg = Config(alembic_cfg_path)
200 command.upgrade(alembic_cfg, alembic_version)
202 del db_dump["alembic_version"]
203 """Restores all data from dictionary into the database"""
204 with self.engine.begin() as connection:
205 with ForeignKeyDisabler(connection, self.engine.dialect.name, logger=self.logger):
206 data = self.convert_types(db_dump)
208 self.meta.reflect(bind=self.engine)
209 for table_name, rows in data.items():
210 if not rows:
211 continue
212 table = self.meta.tables[table_name]
213 rows = self.clean_rows(db_dump, table, rows)
215 connection.execute(table.delete())
216 connection.execute(insert(table), rows)
217 if self.engine.dialect.name == "postgresql":
218 # Restore postgres sequence numbers
219 sequences = [
220 ("api_extras_id_seq", "api_extras"),
221 ("group_meal_plans_id_seq", "group_meal_plans"),
222 ("ingredient_food_extras_id_seq", "ingredient_food_extras"),
223 ("invite_tokens_id_seq", "invite_tokens"),
224 ("long_live_tokens_id_seq", "long_live_tokens"),
225 ("notes_id_seq", "notes"),
226 ("password_reset_tokens_id_seq", "password_reset_tokens"),
227 ("recipe_assets_id_seq", "recipe_assets"),
228 ("recipe_ingredient_ref_link_id_seq", "recipe_ingredient_ref_link"),
229 ("recipe_nutrition_id_seq", "recipe_nutrition"),
230 ("recipe_settings_id_seq", "recipe_settings"),
231 ("recipes_ingredients_id_seq", "recipes_ingredients"),
232 ("server_tasks_id_seq", "server_tasks"),
233 ("shopping_list_extras_id_seq", "shopping_list_extras"),
234 ("shopping_list_item_extras_id_seq", "shopping_list_item_extras"),
235 ]
237 sql = "\n".join(
238 [f"SELECT SETVAL('{seq}', (SELECT MAX(id) FROM {table}));" for seq, table in sequences]
239 )
240 connection.execute(text(dedent(sql)))
242 # Re-init database to finish migrations
243 init_db.main()
245 def drop_all(self) -> None: 1a
246 """Drops all data from the database"""
247 from sqlalchemy.engine.reflection import Inspector
248 from sqlalchemy.schema import DropConstraint, DropTable, MetaData, Table
250 with self.engine.begin() as connection:
251 inspector = Inspector.from_engine(self.engine)
253 # We need to re-create a minimal metadata with only the required things to
254 # successfully emit drop constraints and tables commands for postgres (based
255 # on the actual schema of the running instance)
256 meta = MetaData()
257 tables = []
258 all_fkeys = []
259 for table_name in inspector.get_table_names():
260 fkeys = []
262 for fkey in inspector.get_foreign_keys(table_name):
263 if not fkey["name"]:
264 continue
266 fkeys.append(ForeignKeyConstraint((), (), name=fkey["name"]))
268 tables.append(Table(table_name, meta, *fkeys))
269 all_fkeys.extend(fkeys)
271 if self.engine.dialect.name == "postgresql":
272 # Only pg needs foreign key dropping
273 for fkey in all_fkeys:
274 connection.execute(DropConstraint(fkey))
276 for table in tables:
277 connection.execute(DropTable(table))
278 # I have no idea how to drop all custom types with sqlalchemy
279 # Since we only have one, this will have to do for now
280 connection.execute(text("DROP TYPE authmethod"))
281 else:
282 for table in tables:
283 connection.execute(DropTable(table))