Coverage for opt/mealie/lib/python3.12/site-packages/mealie/db/models/_model_utils/auto_init.py: 80%

104 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:32 +0000

1from functools import wraps 1j

2from uuid import UUID 1j

3 

4from pydantic import BaseModel, ConfigDict, Field 1j

5from sqlalchemy import select 1j

6from sqlalchemy.orm import MANYTOMANY, MANYTOONE, ONETOMANY, Session 1j

7from sqlalchemy.orm.mapper import Mapper 1j

8from sqlalchemy.orm.relationships import RelationshipProperty 1j

9from sqlalchemy.sql.base import ColumnCollection 1j

10 

11from .._model_base import SqlAlchemyBase 1j

12from .helpers import safe_call 1j

13 

14 

15def _default_exclusion() -> set[str]: 1j

16 return {"id"} 1jpklmoehifgqbrncdsa

17 

18 

19class AutoInitConfig(BaseModel): 1j

20 """ 

21 Config class for `auto_init` decorator. 

22 """ 

23 

24 get_attr: str | None = None 1j

25 exclude: set = Field(default_factory=_default_exclusion) 1j

26 # auto_create: bool = False 

27 

28 

29def _get_config(relation_cls: type[SqlAlchemyBase]) -> AutoInitConfig: 1j

30 """ 

31 Returns the config for the given class. 

32 """ 

33 cfg = AutoInitConfig() 1jpklmoehifgqbrncdsa

34 cfgKeys = cfg.model_dump().keys() 1jpklmoehifgqbrncdsa

35 # Get the config for the class 

36 try: 1jpklmoehifgqbrncdsa

37 class_config: ConfigDict = relation_cls.model_config 1jpklmoehifgqbrncdsa

38 except AttributeError: 1jpklmoehifgbrncdsa

39 return cfg 1jpklmoehifgbrncdsa

40 # Map all matching attributes in Config to all AutoInitConfig attributes 

41 for attr in class_config: 1jpklmehifgqbncda

42 if attr in cfgKeys: 42 ↛ 41line 42 didn't jump to line 41 because the condition on line 42 was always true1jpklmehifgqbncda

43 setattr(cfg, attr, class_config[attr]) 1jpklmehifgqbncda

44 

45 return cfg 1jpklmehifgqbncda

46 

47 

48def get_lookup_attr(relation_cls: type[SqlAlchemyBase]) -> str: 1j

49 """Returns the primary key attribute of the related class as a string. 

50 

51 Args: 

52 relation_cls (DeclarativeMeta): The SQLAlchemy class to get the primary_key from 

53 

54 Returns: 

55 Any: [description] 

56 """ 

57 

58 cfg = _get_config(relation_cls) 1klmoehifgbncda

59 

60 try: 1klmoehifgbncda

61 get_attr = cfg.get_attr 1klmoehifgbncda

62 if get_attr is None: 62 ↛ 66line 62 didn't jump to line 66 because the condition on line 62 was always true1klmoehifgbncda

63 get_attr = relation_cls.__table__.primary_key.columns.keys()[0] 1klmoehifgbncda

64 except Exception: 

65 get_attr = "id" 

66 return get_attr 1klmoehifgbncda

67 

68 

69def handle_many_to_many(session, get_attr, relation_cls, all_elements: list[dict]): 1j

70 """ 

71 Proxy call to `handle_one_to_many_list` for many-to-many relationships. Because functionally, they do the same 

72 """ 

73 return handle_one_to_many_list(session, get_attr, relation_cls, all_elements) 1bcda

74 

75 

76def handle_one_to_many_list( 1j

77 session: Session, get_attr, relation_cls: type[SqlAlchemyBase], all_elements: list[dict] | list[str] 

78): 

79 elems_to_create: list[dict] = [] 1klmehifgbncda

80 updated_elems: list[dict] = [] 1klmehifgbncda

81 

82 cfg = _get_config(relation_cls) 1klmehifgbncda

83 

84 for elem in all_elements: 1klmehifgbncda

85 elem_id = elem.get(get_attr, None) if isinstance(elem, dict) else elem 1ehifga

86 stmt = select(relation_cls).filter_by(**{get_attr: elem_id}) 1ehifga

87 existing_elem = session.execute(stmt).scalars().one_or_none() 1ehifga

88 

89 if existing_elem is None and isinstance(elem, dict): 89 ↛ 93line 89 didn't jump to line 93 because the condition on line 89 was always true1ehifga

90 elems_to_create.append(elem) 1ehifga

91 continue 1ehifga

92 

93 elif isinstance(elem, dict): 

94 for key, value in elem.items(): 

95 if key not in cfg.exclude: 

96 setattr(existing_elem, key, value) 

97 

98 updated_elems.append(existing_elem) 

99 

100 new_elems = [safe_call(relation_cls, elem.copy(), session=session) for elem in elems_to_create] 1klmehifgbncda

101 return new_elems + updated_elems 1klmehifgbncda

102 

103 

104def auto_init(): # sourcery no-metrics 1j

105 """Wraps the `__init__` method of a class to automatically set the common 

106 attributes. 

107 

108 Args: 

109 exclude (Union[set, list], optional): [description]. Defaults to None. 

110 """ 

111 

112 def decorator(init): 1j

113 @wraps(init) 1j

114 def wrapper(self: SqlAlchemyBase, *args, **kwargs): # sourcery no-metrics 1j

115 """ 

116 Custom initializer that allows nested children initialization. 

117 Only keys that are present as instance's class attributes are allowed. 

118 These could be, for example, any mapped columns or relationships. 

119 

120 Code inspired from GitHub. 

121 Ref: https://github.com/tiangolo/fastapi/issues/2194 

122 """ 

123 cls = self.__class__ 1jpklmoehifgqbrncdsa

124 config = _get_config(cls) 1jpklmoehifgqbrncdsa

125 exclude = config.exclude 1jpklmoehifgqbrncdsa

126 

127 alchemy_mapper: Mapper = self.__mapper__ 1jpklmoehifgqbrncdsa

128 model_columns: ColumnCollection = alchemy_mapper.columns 1jpklmoehifgqbrncdsa

129 relationships = alchemy_mapper.relationships 1jpklmoehifgqbrncdsa

130 

131 session: Session = kwargs.get("session", None) 1jpklmoehifgqbrncdsa

132 

133 if session is None: 133 ↛ 134line 133 didn't jump to line 134 because the condition on line 133 was never true1jpklmoehifgqbrncdsa

134 raise ValueError("Session is required to initialize the model with `auto_init`") 

135 

136 for key, val in kwargs.items(): 1jpklmoehifgqbrncdsa

137 if key in exclude: 1jpklmoehifgqbrncdsa

138 continue 1jpklmefgqbrncda

139 

140 if not hasattr(cls, key): 1jpklmoehifgqbrncdsa

141 continue 1jpklmoehifgqbrncdsa

142 # raise TypeError(f"Invalid keyword argument: {key}") 

143 

144 if key in model_columns: 1jpklmoehifgqbrncdsa

145 setattr(self, key, val) 1jpklmoehifgqbrncdsa

146 continue 1jpklmoehifgqbrncdsa

147 

148 if key in relationships: 1klmoehifgbncda

149 prop: RelationshipProperty = relationships[key] 1klmoehifgbncda

150 

151 # Identifies the type of relationship (ONETOMANY, MANYTOONE, many-to-one, many-to-many) 

152 relation_dir = prop.direction 1klmoehifgbncda

153 

154 # Identifies the parent class of the related object. 

155 relation_cls: type[SqlAlchemyBase] = prop.mapper.entity 1klmoehifgbncda

156 

157 # Identifies if the relationship was declared with use_list=True 

158 use_list: bool = prop.uselist 1klmoehifgbncda

159 

160 get_attr = get_lookup_attr(relation_cls) 1klmoehifgbncda

161 

162 if relation_dir == ONETOMANY and use_list: 1klmoehifgbncda

163 instances = handle_one_to_many_list(session, get_attr, relation_cls, val) 1klmehifgbncda

164 setattr(self, key, instances) 1klmehifgbncda

165 

166 elif relation_dir == ONETOMANY: 1obcda

167 instance = safe_call(relation_cls, val.copy() if val else None, session=session) 1oa

168 setattr(self, key, instance) 1oa

169 

170 elif relation_dir == MANYTOONE and not use_list: 1bcda

171 if isinstance(val, dict): 171 ↛ 172line 171 didn't jump to line 172 because the condition on line 171 was never true1bcd

172 val = val.get(get_attr) 

173 

174 if val is None: 

175 raise ValueError(f"Expected 'id' to be provided for {key}") 

176 

177 if isinstance(val, str | int | UUID): 177 ↛ 178line 177 didn't jump to line 178 because the condition on line 177 was never true1bcd

178 stmt = select(relation_cls).filter_by(**{get_attr: val}) 

179 instance = session.execute(stmt).scalars().one_or_none() 

180 setattr(self, key, instance) 

181 else: 

182 # If the value is not of the type defined above we assume that it isn't a valid id 

183 # and try a different approach. 

184 pass 1bcd

185 

186 elif relation_dir == MANYTOMANY: 186 ↛ 136line 186 didn't jump to line 136 because the condition on line 186 was always true1bcda

187 instances = handle_many_to_many(session, get_attr, relation_cls, val) 1bcda

188 setattr(self, key, instances) 1bcda

189 

190 return init(self, *args, **kwargs) 1jpklmoehifgqbrncdsa

191 

192 return wrapper 1j

193 

194 return decorator 1j