Coverage for opt/mealie/lib/python3.12/site-packages/mealie/db/fixes/fix_migration_data.py: 9%

140 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:32 +0000

1from uuid import uuid4 1a

2 

3from slugify import slugify 1a

4from sqlalchemy import and_, update 1a

5from sqlalchemy.orm import Session 1a

6 

7from mealie.core import root_logger 1a

8from mealie.db.models._model_base import SqlAlchemyBase 1a

9from mealie.db.models.group.group import Group 1a

10from mealie.db.models.household.shopping_list import ShoppingList, ShoppingListMultiPurposeLabel 1a

11from mealie.db.models.labels import MultiPurposeLabel 1a

12from mealie.db.models.recipe.ingredient import IngredientFoodModel, IngredientUnitModel 1a

13from mealie.db.models.recipe.recipe import RecipeModel 1a

14from mealie.db.models.users.users import User 1a

15 

16logger = root_logger.get_logger("init_db") 1a

17 

18 

19def fix_dangling_refs(session: Session): 1a

20 REASSIGN_REF_TABLES = ["group_meal_plans", "recipes", "shopping_lists"] 

21 DELETE_REF_TABLES = ["long_live_tokens", "password_reset_tokens", "recipe_comments", "recipe_timeline_events"] 

22 

23 all_groups = session.query(Group).all() 

24 all_users = session.query(User).all() 

25 for group in all_groups: 

26 # Find an arbitrary admin user in the group 

27 default_user = session.query(User).filter(User.group_id == group.id, User.admin == True).first() # noqa: E712 - required for SQLAlchemy comparison 

28 if not default_user: 

29 # If there is no admin user, just pick the first user 

30 default_user = session.query(User).filter(User.group_id == group.id).first() 

31 

32 # If there are no users in the group, we can't do anything 

33 if not default_user: 

34 continue 

35 

36 valid_group_user_ids = {user.id for user in all_users if user.group_id == group.id} 

37 

38 for table_name in REASSIGN_REF_TABLES: 

39 table = SqlAlchemyBase.metadata.tables[table_name] 

40 update_stmt = ( 

41 update(table) 

42 .where( 

43 and_( 

44 table.c.user_id.notin_(valid_group_user_ids), 

45 table.c.group_id == group.id, 

46 ) 

47 ) 

48 .values(user_id=default_user.id) 

49 ) 

50 result = session.execute(update_stmt) 

51 

52 if result.rowcount: 

53 logger.info( 

54 f"Reassigned {result.rowcount} {'row' if result.rowcount == 1 else 'rows'} " 

55 f'in "{table_name}" table to default user ({default_user.id})' 

56 ) 

57 

58 valid_user_ids_all_groups = {user.id for user in all_users} 

59 for table_name in DELETE_REF_TABLES: 

60 table = SqlAlchemyBase.metadata.tables[table_name] 

61 delete_stmt = table.delete().where(table.c.user_id.notin_(valid_user_ids_all_groups)) 

62 result = session.execute(delete_stmt) 

63 

64 if result.rowcount: 

65 logger.info( 

66 f"Deleted {result.rowcount} {'row' if result.rowcount == 1 else 'rows'} " 

67 f'in "{table_name}" table with invalid user ids' 

68 ) 

69 

70 session.commit() 

71 

72 

73def fix_recipe_normalized_search_properties(session: Session): 1a

74 recipes = session.query(RecipeModel).all() 

75 recipes_fixed = False 

76 

77 for recipe in recipes: 

78 add_to_session = False 

79 if recipe.name and not recipe.name_normalized: 

80 recipe.name_normalized = RecipeModel.normalize(recipe.name) 

81 add_to_session = True 

82 if recipe.description and not recipe.description_normalized: 

83 recipe.description_normalized = RecipeModel.normalize(recipe.description) 

84 add_to_session = True 

85 

86 for ingredient in recipe.recipe_ingredient: 

87 if ingredient.note and not ingredient.note_normalized: 

88 ingredient.note_normalized = RecipeModel.normalize(ingredient.note) 

89 add_to_session = True 

90 if ingredient.original_text and not ingredient.original_text_normalized: 

91 ingredient.original_text = RecipeModel.normalize(ingredient.original_text_normalized) 

92 add_to_session = True 

93 

94 if add_to_session: 

95 recipes_fixed = True 

96 session.add(recipe) 

97 

98 if recipes_fixed: 

99 logger.info("Updating recipe normalized search properties") 

100 session.commit() 

101 

102 

103def fix_shopping_list_label_settings(session: Session): 1a

104 shopping_lists = session.query(ShoppingList).all() 

105 labels = session.query(MultiPurposeLabel).all() 

106 label_settings_fixed = False 

107 

108 for shopping_list in shopping_lists: 

109 labels_by_id = {label.id: label for label in labels if label.group_id == shopping_list.group_id} 

110 for label_setting in shopping_list.label_settings: 

111 if not labels_by_id.pop(label_setting.label_id, None): 

112 # label setting is no longer valid, so delete it 

113 session.delete(label_setting) 

114 label_settings_fixed = True 

115 

116 if not labels_by_id: 

117 # all labels are accounted for, so we don't need to add any 

118 continue 

119 

120 label_settings_fixed = True 

121 for i, label in enumerate(labels_by_id.values()): 

122 new_label_setting = ShoppingListMultiPurposeLabel( 

123 id=uuid4(), 

124 shopping_list_id=shopping_list.id, 

125 label_id=label.id, 

126 position=i + len(shopping_list.label_settings), 

127 ) 

128 

129 session.add(new_label_setting) 

130 

131 if label_settings_fixed: 

132 logger.info("Fixing shopping list label settings") 

133 session.commit() 

134 

135 

136def fix_group_slugs(session: Session): 1a

137 groups = session.query(Group).all() 

138 seen_slugs: set[str] = set() 

139 groups_fixed = False 

140 

141 for group in groups: 

142 if not group.slug: 

143 original_name = group.name 

144 new_name = original_name 

145 attempts = 0 

146 while True: 

147 slug = slugify(group.name) 

148 if slug not in seen_slugs: 

149 break 

150 

151 attempts += 1 

152 new_name = f"{original_name} ({attempts})" 

153 

154 groups_fixed = True 

155 group.name = new_name 

156 group.slug = slug 

157 

158 if groups_fixed: 

159 logger.info("Adding missing group slugs") 

160 session.commit() 

161 

162 

163def fix_normalized_unit_and_food_names(session: Session): 1a

164 units = session.query(IngredientUnitModel).all() 

165 units_fixed = False 

166 

167 for unit in units: 

168 add_to_session = False 

169 if unit.name and not unit.name_normalized: 

170 unit.name_normalized = IngredientUnitModel.normalize(unit.name) 

171 add_to_session = True 

172 if unit.abbreviation and not unit.abbreviation_normalized: 

173 unit.abbreviation_normalized = IngredientUnitModel.normalize(unit.abbreviation) 

174 add_to_session = True 

175 

176 if add_to_session: 

177 units_fixed = True 

178 session.add(unit) 

179 

180 if units_fixed: 

181 logger.info("Updating unit normalized search properties") 

182 session.commit() 

183 

184 foods = session.query(IngredientFoodModel).all() 

185 foods_fixed = False 

186 

187 for food in foods: 

188 add_to_session = False 

189 if food.name and not food.name_normalized: 

190 food.name_normalized = IngredientFoodModel.normalize(food.name) 

191 add_to_session = True 

192 

193 if add_to_session: 

194 foods_fixed = True 

195 session.add(food) 

196 

197 if foods_fixed: 

198 logger.info("Updating food normalized search properties") 

199 session.commit() 

200 

201 

202def fix_migration_data(session: Session): 1a

203 logger.info("Checking for migration data fixes") 

204 

205 fix_dangling_refs(session) 

206 fix_recipe_normalized_search_properties(session) 

207 fix_shopping_list_label_settings(session) 

208 fix_group_slugs(session) 

209 fix_normalized_unit_and_food_names(session)