Coverage for opt/mealie/lib/python3.12/site-packages/mealie/schema/_mealie/datetime_parse.py: 23%

138 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1""" 

2From Pydantic V1: https://github.com/pydantic/pydantic/blob/abcf81ec104d2da70894ac0402ae11a7186c5e47/pydantic/datetime_parse.py 

3""" 

4 

5import re 1a

6from datetime import UTC, date, datetime, time, timedelta, timezone 1a

7 

8date_expr = r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})" 1a

9time_expr = ( 1a

10 r"(?P<hour>\d{1,2}):(?P<minute>\d{1,2})" 

11 r"(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d{0,6})?)?" 

12 r"(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$" 

13) 

14 

15date_re = re.compile(f"{date_expr}$") 1a

16time_re = re.compile(time_expr) 1a

17datetime_re = re.compile(f"{date_expr}[T ]{time_expr}") 1a

18 

19standard_duration_re = re.compile( 1a

20 r"^" 

21 r"(?:(?P<days>-?\d+) (days?, )?)?" 

22 r"((?:(?P<hours>-?\d+):)(?=\d+:\d+))?" 

23 r"(?:(?P<minutes>-?\d+):)?" 

24 r"(?P<seconds>-?\d+)" 

25 r"(?:\.(?P<microseconds>\d{1,6})\d{0,6})?" 

26 r"$" 

27) 

28 

29# Support the sections of ISO 8601 date representation that are accepted by timedelta 

30iso8601_duration_re = re.compile( 1a

31 r"^(?P<sign>[-+]?)" 

32 r"P" 

33 r"(?:(?P<days>\d+(.\d+)?)D)?" 

34 r"(?:T" 

35 r"(?:(?P<hours>\d+(.\d+)?)H)?" 

36 r"(?:(?P<minutes>\d+(.\d+)?)M)?" 

37 r"(?:(?P<seconds>\d+(.\d+)?)S)?" 

38 r")?" 

39 r"$" 

40) 

41 

42EPOCH = datetime(1970, 1, 1, tzinfo=UTC) 1a

43# if greater than this, the number is in ms, if less than or equal it's in seconds 

44# (in seconds this is 11th October 2603, in ms it's 20th August 1970) 

45MS_WATERSHED = int(2e10) 1a

46# slightly more than datetime.max in ns - (datetime.max - EPOCH).total_seconds() * 1e9 

47MAX_NUMBER = int(3e20) 1a

48 

49 

50class DateError(ValueError): 1a

51 def __init__(self, *args: object) -> None: 1a

52 super().__init__("invalid date format") 

53 

54 

55class TimeError(ValueError): 1a

56 def __init__(self, *args: object) -> None: 1a

57 super().__init__("invalid time format") 

58 

59 

60class DateTimeError(ValueError): 1a

61 def __init__(self, *args: object) -> None: 1a

62 super().__init__("invalid datetime format") 1bcdefghijklmnopqrstuvwxyzABCDEF

63 

64 

65class DurationError(ValueError): 1a

66 def __init__(self, *args: object) -> None: 1a

67 super().__init__("invalid duration format") 

68 

69 

70def get_numeric(value: str | bytes | int | float, native_expected_type: str) -> None | int | float: 1a

71 if isinstance(value, int | float): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true1bcdefghijklmnopqrstuvwxyzABCDEF

72 return value 

73 try: 1bcdefghijklmnopqrstuvwxyzABCDEF

74 return float(value) 1bcdefghijklmnopqrstuvwxyzABCDEF

75 except ValueError: 1bcdefghijklmnopqrstuvwxyzABCDEF

76 return None 1bcdefghijklmnopqrstuvwxyzABCDEF

77 except TypeError as e: 

78 raise TypeError(f"invalid type; expected {native_expected_type}, string, bytes, int or float") from e 

79 

80 

81def from_unix_seconds(seconds: int | float) -> datetime: 1a

82 if seconds > MAX_NUMBER: 

83 return datetime.max.replace(tzinfo=UTC) 

84 elif seconds < -MAX_NUMBER: 

85 return datetime.min.replace(tzinfo=UTC) 

86 

87 while abs(seconds) > MS_WATERSHED: 

88 seconds /= 1000 

89 dt = EPOCH + timedelta(seconds=seconds) 

90 return dt.replace(tzinfo=UTC) 

91 

92 

93def _parse_timezone(value: str | None, error: type[Exception]) -> None | int | timezone: 1a

94 if value == "Z": 

95 return UTC 

96 elif value is not None: 

97 offset_mins = int(value[-2:]) if len(value) > 3 else 0 

98 offset = 60 * int(value[1:3]) + offset_mins 

99 if value[0] == "-": 

100 offset = -offset 

101 try: 

102 return timezone(timedelta(minutes=offset)) 

103 except ValueError as e: 

104 raise error() from e 

105 else: 

106 return None 

107 

108 

109def parse_date(value: date | str | bytes | int | float) -> date: 1a

110 """ 

111 Parse a date/int/float/string and return a datetime.date. 

112 

113 Raise ValueError if the input is well formatted but not a valid date. 

114 Raise ValueError if the input isn't well formatted. 

115 """ 

116 if isinstance(value, date): 

117 if isinstance(value, datetime): 

118 return value.date() 

119 else: 

120 return value 

121 

122 number = get_numeric(value, "date") 

123 if number is not None: 

124 return from_unix_seconds(number).date() 

125 

126 if isinstance(value, bytes): 

127 value = value.decode() 

128 

129 match = date_re.match(value) # type: ignore 

130 if match is None: 

131 raise DateError() 

132 

133 kw = {k: int(v) for k, v in match.groupdict().items()} 

134 

135 try: 

136 return date(**kw) 

137 except ValueError as e: 

138 raise DateError() from e 

139 

140 

141def parse_time(value: time | str | bytes | int | float) -> time: 1a

142 """ 

143 Parse a time/string and return a datetime.time. 

144 

145 Raise ValueError if the input is well formatted but not a valid time. 

146 Raise ValueError if the input isn't well formatted, in particular if it contains an offset. 

147 """ 

148 if isinstance(value, time): 

149 return value 

150 

151 number = get_numeric(value, "time") 

152 if number is not None: 

153 if number >= 86400: 

154 # doesn't make sense since the time time loop back around to 0 

155 raise TimeError() 

156 return (datetime.min.replace(tzinfo=UTC) + timedelta(seconds=number)).time() 

157 

158 if isinstance(value, bytes): 

159 value = value.decode() 

160 

161 match = time_re.match(value) # type: ignore 

162 if match is None: 

163 raise TimeError() 

164 

165 kw = match.groupdict() 

166 if kw["microsecond"]: 

167 kw["microsecond"] = kw["microsecond"].ljust(6, "0") 

168 

169 tzinfo = _parse_timezone(kw.pop("tzinfo"), TimeError) 

170 kw_: dict[str, None | int | timezone] = {k: int(v) for k, v in kw.items() if v is not None} 

171 kw_["tzinfo"] = tzinfo 

172 

173 try: 

174 return time(**kw_) # type: ignore 

175 except ValueError as e: 

176 raise TimeError() from e 

177 

178 

179def parse_datetime(value: datetime | str | bytes | int | float) -> datetime: 1a

180 """ 

181 Parse a datetime/int/float/string and return a datetime.datetime. 

182 

183 This function supports time zone offsets. When the input contains one, 

184 the output uses a timezone with a fixed offset from UTC. 

185 

186 Raise ValueError if the input is well formatted but not a valid datetime. 

187 Raise ValueError if the input isn't well formatted. 

188 """ 

189 if isinstance(value, datetime): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true1bcdefghijklmnopqrstuvwxyzABCDEF

190 return value 

191 

192 number = get_numeric(value, "datetime") 1bcdefghijklmnopqrstuvwxyzABCDEF

193 if number is not None: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true1bcdefghijklmnopqrstuvwxyzABCDEF

194 return from_unix_seconds(number) 

195 

196 if isinstance(value, bytes): 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true1bcdefghijklmnopqrstuvwxyzABCDEF

197 value = value.decode() 

198 

199 match = datetime_re.match(value) # type: ignore 1bcdefghijklmnopqrstuvwxyzABCDEF

200 if match is None: 200 ↛ 203line 200 didn't jump to line 203 because the condition on line 200 was always true1bcdefghijklmnopqrstuvwxyzABCDEF

201 raise DateTimeError() 1bcdefghijklmnopqrstuvwxyzABCDEF

202 

203 kw = match.groupdict() 

204 if kw["microsecond"]: 

205 kw["microsecond"] = kw["microsecond"].ljust(6, "0") 

206 

207 tzinfo = _parse_timezone(kw.pop("tzinfo"), DateTimeError) 

208 kw_: dict[str, None | int | timezone] = {k: int(v) for k, v in kw.items() if v is not None} 

209 kw_["tzinfo"] = tzinfo 

210 

211 try: 

212 return datetime(**kw_) # type: ignore # noqa DTZ001 

213 except ValueError as e: 

214 raise DateTimeError() from e 

215 

216 

217def parse_duration(value: str | bytes | int | float) -> timedelta: 1a

218 """ 

219 Parse a duration int/float/string and return a datetime.timedelta. 

220 

221 The preferred format for durations in Django is '%d %H:%M:%S.%f'. 

222 

223 Also supports ISO 8601 representation. 

224 """ 

225 if isinstance(value, timedelta): 

226 return value 

227 

228 if isinstance(value, int | float): 

229 # below code requires a string 

230 value = f"{value:f}" 

231 elif isinstance(value, bytes): 

232 value = value.decode() 

233 

234 try: 

235 match = standard_duration_re.match(value) or iso8601_duration_re.match(value) 

236 except TypeError as e: 

237 raise TypeError("invalid type; expected timedelta, string, bytes, int or float") from e 

238 

239 if not match: 

240 raise DurationError() 

241 

242 kw = match.groupdict() 

243 sign = -1 if kw.pop("sign", "+") == "-" else 1 

244 if kw.get("microseconds"): 

245 kw["microseconds"] = kw["microseconds"].ljust(6, "0") 

246 

247 if kw.get("seconds") and kw.get("microseconds") and kw["seconds"].startswith("-"): 

248 kw["microseconds"] = "-" + kw["microseconds"] 

249 

250 kw_ = {k: float(v) for k, v in kw.items() if v is not None} 

251 

252 return sign * timedelta(**kw_)