Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/backups_v2/alchemy_exporter.py: 19%

174 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:48 +0000

1import datetime 1a

2import os 1a

3import uuid 1a

4from logging import Logger 1a

5from os import path 1a

6from textwrap import dedent 1a

7from typing import Any 1a

8 

9from alembic import command 1a

10from alembic.config import Config 1a

11from fastapi.encoders import jsonable_encoder 1a

12from pydantic import BaseModel 1a

13from sqlalchemy import Connection, ForeignKey, ForeignKeyConstraint, MetaData, Table, create_engine, insert, text 1a

14from sqlalchemy.engine import base 1a

15from sqlalchemy.orm import sessionmaker 1a

16 

17from mealie.db import init_db 1a

18from mealie.db.fixes.fix_migration_data import fix_migration_data 1a

19from mealie.db.init_db import ALEMBIC_DIR 1a

20from mealie.db.models._model_utils.guid import GUID 1a

21from mealie.services._base_service import BaseService 1a

22 

23 

24class ForeignKeyDisabler: 1a

25 def __init__(self, connection: Connection, dialect_name: str, *, logger: Logger | None = None): 1a

26 self.connection = connection 

27 self.is_postgres = dialect_name == "postgresql" 

28 self.logger = logger 

29 

30 self._initial_fk_state: str | None = None 

31 

32 def __enter__(self): 1a

33 if self.is_postgres: 

34 self._initial_fk_state = self.connection.execute(text("SHOW session_replication_role;")).scalar() 

35 self.connection.execute(text("SET session_replication_role = 'replica';")) 

36 else: 

37 self._initial_fk_state = self.connection.execute(text("PRAGMA foreign_keys;")).scalar() 

38 self.connection.execute(text("PRAGMA foreign_keys = OFF;")) 

39 

40 def __exit__(self, exc_type, exc_val, exc_tb): 1a

41 try: 

42 if self.is_postgres: 

43 initial_state = self._initial_fk_state or "origin" 

44 self.connection.execute(text(f"SET session_replication_role = '{initial_state}';")) 

45 else: 

46 initial_state = self._initial_fk_state or "ON" 

47 self.connection.execute(text(f"PRAGMA foreign_keys = {initial_state};")) 

48 except Exception: 

49 if self.logger: 

50 self.logger.exception("Error when re-enabling foreign keys") 

51 raise 

52 

53 

54class AlchemyExporter(BaseService): 1a

55 connection_str: str 1a

56 engine: base.Engine 1a

57 meta: MetaData 1a

58 

59 look_for_datetime = {"created_at", "update_at", "date_updated", "timestamp", "expires_at", "locked_at", "last_made"} 1a

60 look_for_date = {"date_added", "date"} 1a

61 look_for_time = {"scheduled_time"} 1a

62 

63 class DateTimeParser(BaseModel): 1a

64 date: datetime.date | None = None 1a

65 dt: datetime.datetime | None = None 1a

66 time: datetime.time | None = None 1a

67 

68 def __init__(self, connection_str: str) -> None: 1a

69 super().__init__() 

70 

71 self.connection_str = connection_str 

72 self.engine = create_engine(connection_str) 

73 self.meta = MetaData() 

74 self.session_maker = sessionmaker(bind=self.engine) 

75 

76 @staticmethod 1a

77 def is_uuid(value: Any) -> bool: 1a

78 try: 

79 uuid.UUID(value) 

80 return True 

81 except ValueError: 

82 return False 

83 

84 @staticmethod 1a

85 def is_valid_foreign_key(db_dump: dict[str, list[dict]], fk: ForeignKey, fk_value: Any) -> bool: 1a

86 if not fk_value: 

87 return True 

88 

89 foreign_table_name = fk.column.table.name 

90 foreign_field_name = fk.column.name 

91 

92 for row in db_dump.get(foreign_table_name, []): 

93 if row[foreign_field_name] == fk_value: 

94 return True 

95 

96 return False 

97 

98 def convert_types(self, data: dict) -> dict: 1a

99 """ 

100 walks the dictionary to restore all things that look like string representations of their complex types 

101 used in the context of reading a json file into a database via SQLAlchemy. 

102 """ 

103 for key, value in data.items(): 

104 if isinstance(value, dict): 

105 data = self.convert_types(value) 

106 elif isinstance(value, list): # assume that this is a list of dictionaries 

107 data[key] = [self.convert_types(item) for item in value] 

108 elif isinstance(value, str): 

109 if self.is_uuid(value): 

110 # convert the data to the current database's native GUID type 

111 data[key] = GUID.convert_value_to_guid(value, self.engine.dialect) 

112 if key in self.look_for_datetime: 

113 data[key] = self.DateTimeParser(dt=value).dt 

114 if key in self.look_for_date: 

115 data[key] = self.DateTimeParser(date=value).date 

116 if key in self.look_for_time: 

117 data[key] = self.DateTimeParser(time=value).time 

118 return data 

119 

120 def clean_rows(self, db_dump: dict[str, list[dict]], table: Table, rows: list[dict]) -> list[dict]: 1a

121 """ 

122 Checks rows against foreign key restraints and removes any rows that would violate them 

123 """ 

124 

125 fks = table.foreign_keys 

126 

127 valid_rows = [] 

128 for row in rows: 

129 is_valid_row = True 

130 for fk in fks: 

131 fk_value = row.get(fk.parent.name) 

132 if self.is_valid_foreign_key(db_dump, fk, row.get(fk.parent.name)): 

133 continue 

134 

135 is_valid_row = False 

136 self.logger.warning( 

137 f"Removing row from table {table.name} because of invalid foreign key {fk.parent.name}: {fk_value}" 

138 ) 

139 self.logger.warning(f"Row: {row}") 

140 break 

141 

142 if is_valid_row: 

143 valid_rows.append(row) 

144 

145 return valid_rows 

146 

147 def dump_schema(self) -> dict: 1a

148 """ 

149 Returns the schema of the SQLAlchemy database as a python dictionary. This dictionary is wrapped by 

150 jsonable_encoder to ensure that the object can be converted to a json string. 

151 """ 

152 with self.engine.connect() as connection: 

153 self.meta.reflect(bind=self.engine) 

154 

155 all_tables = self.meta.tables.values() 

156 

157 results = { 

158 **{table.name: [] for table in all_tables}, 

159 "alembic_version": [ 

160 dict(row) for row in connection.execute(text("SELECT * FROM alembic_version")).mappings() 

161 ], 

162 } 

163 

164 return jsonable_encoder(results) 

165 

166 def dump(self) -> dict[str, list[dict]]: 1a

167 """ 

168 Returns the entire SQLAlchemy database as a python dictionary. This dictionary is wrapped by 

169 jsonable_encoder to ensure that the object can be converted to a json string. 

170 """ 

171 

172 # run database fixes first so we aren't backing up bad data 

173 with self.session_maker() as session: 

174 try: 

175 fix_migration_data(session) 

176 except Exception: 

177 self.logger.error("Error fixing migration data during export; continuing anyway") 

178 

179 with self.engine.connect() as connection: 

180 self.meta.reflect(bind=self.engine) # http://docs.sqlalchemy.org/en/rel_0_9/core/reflection.html 

181 

182 result = { 

183 table.name: [dict(row) for row in connection.execute(table.select()).mappings()] 

184 for table in self.meta.sorted_tables 

185 } 

186 

187 return jsonable_encoder(result) 

188 

189 def restore(self, db_dump: dict) -> None: 1a

190 # setup alembic to run migrations up the version of the backup 

191 alembic_data = db_dump["alembic_version"] 

192 alembic_version = alembic_data[0]["version_num"] 

193 

194 alembic_cfg_path = os.getenv("ALEMBIC_CONFIG_FILE", default=str(ALEMBIC_DIR / "alembic.ini")) 

195 

196 if not path.isfile(alembic_cfg_path): 

197 raise Exception("Provided alembic config path doesn't exist") 

198 

199 alembic_cfg = Config(alembic_cfg_path) 

200 command.upgrade(alembic_cfg, alembic_version) 

201 

202 del db_dump["alembic_version"] 

203 """Restores all data from dictionary into the database""" 

204 with self.engine.begin() as connection: 

205 with ForeignKeyDisabler(connection, self.engine.dialect.name, logger=self.logger): 

206 data = self.convert_types(db_dump) 

207 

208 self.meta.reflect(bind=self.engine) 

209 for table_name, rows in data.items(): 

210 if not rows: 

211 continue 

212 table = self.meta.tables[table_name] 

213 rows = self.clean_rows(db_dump, table, rows) 

214 

215 connection.execute(table.delete()) 

216 connection.execute(insert(table), rows) 

217 if self.engine.dialect.name == "postgresql": 

218 # Restore postgres sequence numbers 

219 sequences = [ 

220 ("api_extras_id_seq", "api_extras"), 

221 ("group_meal_plans_id_seq", "group_meal_plans"), 

222 ("ingredient_food_extras_id_seq", "ingredient_food_extras"), 

223 ("invite_tokens_id_seq", "invite_tokens"), 

224 ("long_live_tokens_id_seq", "long_live_tokens"), 

225 ("notes_id_seq", "notes"), 

226 ("password_reset_tokens_id_seq", "password_reset_tokens"), 

227 ("recipe_assets_id_seq", "recipe_assets"), 

228 ("recipe_ingredient_ref_link_id_seq", "recipe_ingredient_ref_link"), 

229 ("recipe_nutrition_id_seq", "recipe_nutrition"), 

230 ("recipe_settings_id_seq", "recipe_settings"), 

231 ("recipes_ingredients_id_seq", "recipes_ingredients"), 

232 ("server_tasks_id_seq", "server_tasks"), 

233 ("shopping_list_extras_id_seq", "shopping_list_extras"), 

234 ("shopping_list_item_extras_id_seq", "shopping_list_item_extras"), 

235 ] 

236 

237 sql = "\n".join( 

238 [f"SELECT SETVAL('{seq}', (SELECT MAX(id) FROM {table}));" for seq, table in sequences] 

239 ) 

240 connection.execute(text(dedent(sql))) 

241 

242 # Re-init database to finish migrations 

243 init_db.main() 

244 

245 def drop_all(self) -> None: 1a

246 """Drops all data from the database""" 

247 from sqlalchemy.engine.reflection import Inspector 

248 from sqlalchemy.schema import DropConstraint, DropTable, MetaData, Table 

249 

250 with self.engine.begin() as connection: 

251 inspector = Inspector.from_engine(self.engine) 

252 

253 # We need to re-create a minimal metadata with only the required things to 

254 # successfully emit drop constraints and tables commands for postgres (based 

255 # on the actual schema of the running instance) 

256 meta = MetaData() 

257 tables = [] 

258 all_fkeys = [] 

259 for table_name in inspector.get_table_names(): 

260 fkeys = [] 

261 

262 for fkey in inspector.get_foreign_keys(table_name): 

263 if not fkey["name"]: 

264 continue 

265 

266 fkeys.append(ForeignKeyConstraint((), (), name=fkey["name"])) 

267 

268 tables.append(Table(table_name, meta, *fkeys)) 

269 all_fkeys.extend(fkeys) 

270 

271 if self.engine.dialect.name == "postgresql": 

272 # Only pg needs foreign key dropping 

273 for fkey in all_fkeys: 

274 connection.execute(DropConstraint(fkey)) 

275 

276 for table in tables: 

277 connection.execute(DropTable(table)) 

278 # I have no idea how to drop all custom types with sqlalchemy 

279 # Since we only have one, this will have to do for now 

280 connection.execute(text("DROP TYPE authmethod")) 

281 else: 

282 for table in tables: 

283 connection.execute(DropTable(table))