Coverage for opt/mealie/lib/python3.12/site-packages/mealie/schema/_mealie/mealie_model.py: 74%

90 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1from __future__ import annotations 1a

2 

3import re 1a

4from collections.abc import Sequence 1a

5from datetime import UTC, datetime 1a

6from enum import Enum 1a

7from typing import ClassVar, Protocol, Self 1a

8 

9from humps.main import camelize 1a

10from pydantic import UUID4, AliasChoices, BaseModel, ConfigDict, Field, model_validator 1a

11from sqlalchemy import Select, desc, func, or_, text 1a

12from sqlalchemy.orm import InstrumentedAttribute, Session 1a

13from sqlalchemy.orm.interfaces import LoaderOption 1a

14 

15from mealie.db.models._model_base import SqlAlchemyBase 1a

16 

17HOUR_ONLY_TZ_PATTERN = re.compile(r"[+-]\d{2}$") 1a

18 

19 

20def UpdatedAtField(*args, **kwargs): 1a

21 """ 

22 Wrapper for Pydantic's Field, which sets default values for our update_at aliases 

23 

24 Since the database stores this value as update_at, we want to accept this as a possible value 

25 """ 

26 

27 kwargs.pop("alias", None) 1a

28 kwargs.pop("alias_generator", None) 1a

29 

30 # ensure the alias is not overwritten by the generator, if there is one 

31 # https://docs.pydantic.dev/latest/concepts/alias/#alias-priority 

32 kwargs["alias_priority"] = 2 1a

33 

34 kwargs["validation_alias"] = AliasChoices("update_at", "updateAt", "updated_at", "updatedAt") 1a

35 kwargs["serialization_alias"] = "updatedAt" 1a

36 

37 return Field(*args, **kwargs) 1a

38 

39 

40class SearchType(Enum): 1a

41 fuzzy = "fuzzy" 1a

42 tokenized = "tokenized" 1a

43 

44 

45class MealieModel(BaseModel): 1a

46 _fuzzy_similarity_threshold: ClassVar[float] = 0.5 1a

47 _normalize_search: ClassVar[bool] = False 1a

48 _searchable_properties: ClassVar[list[str]] = [] 1a

49 """ 1a

50 Searchable properties for the search API. 

51 The first property will be used for sorting (order_by) 

52 """ 

53 model_config = ConfigDict(alias_generator=camelize, populate_by_name=True) 1a

54 

55 @model_validator(mode="before") 1a

56 @classmethod 1a

57 def fix_hour_only_tz[T: BaseModel](cls, data: T) -> T: 1a

58 """ 

59 Fixes datetimes with timezones that only have the hour portion. 

60 

61 Pydantic assumes timezones are in the format +HH:MM, but postgres returns +HH. 

62 https://github.com/pydantic/pydantic/issues/8609 

63 """ 

64 for field, field_info in cls.model_fields.items(): 2a 5 tbJ ub# z K vbQ wbxbyb2 zbH R S Ab$ 6 7 O % BbCbP Db' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

65 if field_info.annotation != datetime: 2a 5 tbJ ub# z K vbQ wbxbyb2 zbH R S Ab$ 6 7 O % BbCbP Db' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

66 continue 2a 5 tbJ ub# z K vbQ wbxbyb2 zbH R S Ab$ 6 7 O % BbCbP Db' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

67 try: 1zQHOPTUVWXYZ0cnopdefqrsgDhijtEkuFvlwLxmIMNGyb

68 if not isinstance(val := getattr(data, field), str): 1zQHOPTUVWXYZ0cnopdefqrsgDhijtEkuFvlwLxmIMNGyb

69 continue 1OPcdefgDhijEkFlmGb

70 except AttributeError: 1zQHOPTUVWXYZ0cnopdefqrsgDhijtEkuFvlwLxmIMNGyb

71 continue 1zQHOPTUVWXYZ0cnopdefqrsgDhijtEkuFvlwLxmIMNGyb

72 if re.search(HOUR_ONLY_TZ_PATTERN, val): 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 setattr(data, field, val + ":00") 

74 

75 return data 2a 5 tbJ ub# z K vbQ wbxbyb2 zbH R S Ab$ 6 7 O % BbCbP Db' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

76 

77 @model_validator(mode="after") 1a

78 def set_tz_info(self) -> Self: 1a

79 """ 

80 Adds UTC timezone information to all datetimes in the model. 

81 The server stores everything in UTC without timezone info. 

82 """ 

83 for field in self.__class__.model_fields: 2a 5 tbJ ub# z K vbQ wbxbyb2 zbH R S Ab$ 6 7 O % BbCbP Db' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

84 val = getattr(self, field) 2a 5 tbJ ub# z K vbQ wbxbyb2 zbH R S Ab$ 6 7 O % BbCbP Db' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

85 if not isinstance(val, datetime): 2a 5 tbJ ub# z K vbQ wbxbyb2 zbH R S Ab$ 6 7 O % BbCbP Db' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

86 continue 2a 5 tbJ ub# z K vbQ wbxbyb2 zbH R S Ab$ 6 7 O % BbCbP Db' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

87 if not val.tzinfo: 2z Q 2 H R S $ 6 7 O % P ' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

88 setattr(self, field, val.replace(tzinfo=UTC)) 

89 

90 return self 2a 5 tbJ ub# z K vbQ wbxbyb2 zbH R S Ab$ 6 7 O % BbCbP Db' ( 8 ) * T U + , - . / 9 : ; = V ? @ [ ] ^ _ ` W { | } ~ abbbcbdbebX fb! gbhbibjbY kbZ lbmbnbobpb0 qbrbsbc n o p d e f q r s A g D h i j t E k u F v l w B L x 1 m I C M N G y 3 4 b

91 

92 def cast[T: BaseModel](self, cls: type[T], **kwargs) -> T: 1a

93 """ 

94 Cast the current model to another with additional arguments. Useful for 

95 transforming DTOs into models that are saved to a database 

96 """ 

97 create_data = { 1a52HRScnopdefqrsAgDhijtEkuFvlwBLx1mICMNGy34b

98 field: getattr(self, field) for field in self.__class__.model_fields if field in cls.model_fields 

99 } 

100 create_data.update(kwargs or {}) 1a52HRScnopdefqrsAgDhijtEkuFvlwBLx1mICMNGy34b

101 return cls(**create_data) 1a52HRScnopdefqrsAgDhijtEkuFvlwBLx1mICMNGy34b

102 

103 def map_to[T: BaseModel](self, dest: T) -> T: 1a

104 """ 

105 Map matching values from the current model to another model. Model returned 

106 for method chaining. 

107 """ 

108 

109 for field in self.__class__.model_fields: 

110 if field in dest.__class__.model_fields: 

111 setattr(dest, field, getattr(self, field)) 

112 

113 return dest 

114 

115 def map_from(self, src: BaseModel): 1a

116 """ 

117 Map matching values from another model to the current model. 

118 """ 

119 

120 for field in src.__class__.model_fields: 

121 if field in self.__class__.model_fields: 

122 setattr(self, field, getattr(src, field)) 

123 

124 def merge[T: BaseModel](self, src: T, replace_null=False): 1a

125 """ 

126 Replace matching values from another instance to the current instance. 

127 """ 

128 

129 for field in src.__class__.model_fields: 

130 val = getattr(src, field) 

131 if field in self.__class__.model_fields and (val is not None or replace_null): 

132 setattr(self, field, val) 

133 

134 @classmethod 1a

135 def loader_options(cls) -> list[LoaderOption]: 1a

136 return [] 1J#zKHRS6789!cnopdefqrsAgDhijtEkuFvlwBx1mICGyb

137 

138 @classmethod 1a

139 def filter_search_query( 1a

140 cls, 

141 db_model: type[SqlAlchemyBase], 

142 query: Select, 

143 session: Session, 

144 search_type: SearchType, 

145 search: str, 

146 search_list: list[str], 

147 ) -> Select: 

148 """ 

149 Filters a search query based on model attributes 

150 

151 Can be overridden to support a more advanced search 

152 """ 

153 

154 if not cls._searchable_properties: 154 ↛ 155line 154 didn't jump to line 155 because the condition on line 154 was never true1JzKcnopdefqrsAghijtkuvlwBxmCyb

155 raise AttributeError("Not Implemented") 

156 

157 model_properties: list[InstrumentedAttribute] = [getattr(db_model, prop) for prop in cls._searchable_properties] 1JzKcnopdefqrsAghijtkuvlwBxmCyb

158 if search_type is SearchType.fuzzy: 158 ↛ 159line 158 didn't jump to line 159 because the condition on line 158 was never true1JzKcnopdefqrsAghijtkuvlwBxmCyb

159 session.execute(text(f"set pg_trgm.word_similarity_threshold = {cls._fuzzy_similarity_threshold};")) 

160 filters = [prop.op("%>")(search) for prop in model_properties] 

161 

162 # trigram ordering by the first searchable property 

163 return query.filter(or_(*filters)).order_by(func.least(model_properties[0].op("<->>")(search))) 

164 else: 

165 filters = [] 1JzKcnopdefqrsAghijtkuvlwBxmCyb

166 for prop in model_properties: 1JzKcnopdefqrsAghijtkuvlwBxmCyb

167 filters.extend([prop.like(f"%{s}%") for s in search_list]) 1JzKcnopdefqrsAghijtkuvlwBxmCyb

168 

169 # order by how close the result is to the first searchable property 

170 return query.filter(or_(*filters)).order_by(desc(model_properties[0].like(f"%{search}%"))) 1JzKcnopdefqrsAghijtkuvlwBxmCyb

171 

172 

173class HasUUID(Protocol): 1a

174 id: UUID4 1a

175 

176 

177def extract_uuids(models: Sequence[HasUUID]) -> list[UUID4]: 1a

178 return [x.id for x in models]