Coverage for opt/mealie/lib/python3.12/site-packages/mealie/schema/_mealie/datetime_parse.py: 23%
138 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:48 +0000
1"""
2From Pydantic V1: https://github.com/pydantic/pydantic/blob/abcf81ec104d2da70894ac0402ae11a7186c5e47/pydantic/datetime_parse.py
3"""
5import re 1a
6from datetime import UTC, date, datetime, time, timedelta, timezone 1a
8date_expr = r"(?P<year>\d{4})-(?P<month>\d{1,2})-(?P<day>\d{1,2})" 1a
9time_expr = ( 1a
10 r"(?P<hour>\d{1,2}):(?P<minute>\d{1,2})"
11 r"(?::(?P<second>\d{1,2})(?:\.(?P<microsecond>\d{1,6})\d{0,6})?)?"
12 r"(?P<tzinfo>Z|[+-]\d{2}(?::?\d{2})?)?$"
13)
15date_re = re.compile(f"{date_expr}$") 1a
16time_re = re.compile(time_expr) 1a
17datetime_re = re.compile(f"{date_expr}[T ]{time_expr}") 1a
19standard_duration_re = re.compile( 1a
20 r"^"
21 r"(?:(?P<days>-?\d+) (days?, )?)?"
22 r"((?:(?P<hours>-?\d+):)(?=\d+:\d+))?"
23 r"(?:(?P<minutes>-?\d+):)?"
24 r"(?P<seconds>-?\d+)"
25 r"(?:\.(?P<microseconds>\d{1,6})\d{0,6})?"
26 r"$"
27)
29# Support the sections of ISO 8601 date representation that are accepted by timedelta
30iso8601_duration_re = re.compile( 1a
31 r"^(?P<sign>[-+]?)"
32 r"P"
33 r"(?:(?P<days>\d+(.\d+)?)D)?"
34 r"(?:T"
35 r"(?:(?P<hours>\d+(.\d+)?)H)?"
36 r"(?:(?P<minutes>\d+(.\d+)?)M)?"
37 r"(?:(?P<seconds>\d+(.\d+)?)S)?"
38 r")?"
39 r"$"
40)
42EPOCH = datetime(1970, 1, 1, tzinfo=UTC) 1a
43# if greater than this, the number is in ms, if less than or equal it's in seconds
44# (in seconds this is 11th October 2603, in ms it's 20th August 1970)
45MS_WATERSHED = int(2e10) 1a
46# slightly more than datetime.max in ns - (datetime.max - EPOCH).total_seconds() * 1e9
47MAX_NUMBER = int(3e20) 1a
50class DateError(ValueError): 1a
51 def __init__(self, *args: object) -> None: 1a
52 super().__init__("invalid date format")
55class TimeError(ValueError): 1a
56 def __init__(self, *args: object) -> None: 1a
57 super().__init__("invalid time format")
60class DateTimeError(ValueError): 1a
61 def __init__(self, *args: object) -> None: 1a
62 super().__init__("invalid datetime format") 1bcdefghijklmno
65class DurationError(ValueError): 1a
66 def __init__(self, *args: object) -> None: 1a
67 super().__init__("invalid duration format")
70def get_numeric(value: str | bytes | int | float, native_expected_type: str) -> None | int | float: 1a
71 if isinstance(value, int | float): 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true1bcdefghijklmno
72 return value
73 try: 1bcdefghijklmno
74 return float(value) 1bcdefghijklmno
75 except ValueError: 1bcdefghijklmno
76 return None 1bcdefghijklmno
77 except TypeError as e:
78 raise TypeError(f"invalid type; expected {native_expected_type}, string, bytes, int or float") from e
81def from_unix_seconds(seconds: int | float) -> datetime: 1a
82 if seconds > MAX_NUMBER:
83 return datetime.max.replace(tzinfo=UTC)
84 elif seconds < -MAX_NUMBER:
85 return datetime.min.replace(tzinfo=UTC)
87 while abs(seconds) > MS_WATERSHED:
88 seconds /= 1000
89 dt = EPOCH + timedelta(seconds=seconds)
90 return dt.replace(tzinfo=UTC)
93def _parse_timezone(value: str | None, error: type[Exception]) -> None | int | timezone: 1a
94 if value == "Z":
95 return UTC
96 elif value is not None:
97 offset_mins = int(value[-2:]) if len(value) > 3 else 0
98 offset = 60 * int(value[1:3]) + offset_mins
99 if value[0] == "-":
100 offset = -offset
101 try:
102 return timezone(timedelta(minutes=offset))
103 except ValueError as e:
104 raise error() from e
105 else:
106 return None
109def parse_date(value: date | str | bytes | int | float) -> date: 1a
110 """
111 Parse a date/int/float/string and return a datetime.date.
113 Raise ValueError if the input is well formatted but not a valid date.
114 Raise ValueError if the input isn't well formatted.
115 """
116 if isinstance(value, date):
117 if isinstance(value, datetime):
118 return value.date()
119 else:
120 return value
122 number = get_numeric(value, "date")
123 if number is not None:
124 return from_unix_seconds(number).date()
126 if isinstance(value, bytes):
127 value = value.decode()
129 match = date_re.match(value) # type: ignore
130 if match is None:
131 raise DateError()
133 kw = {k: int(v) for k, v in match.groupdict().items()}
135 try:
136 return date(**kw)
137 except ValueError as e:
138 raise DateError() from e
141def parse_time(value: time | str | bytes | int | float) -> time: 1a
142 """
143 Parse a time/string and return a datetime.time.
145 Raise ValueError if the input is well formatted but not a valid time.
146 Raise ValueError if the input isn't well formatted, in particular if it contains an offset.
147 """
148 if isinstance(value, time):
149 return value
151 number = get_numeric(value, "time")
152 if number is not None:
153 if number >= 86400:
154 # doesn't make sense since the time time loop back around to 0
155 raise TimeError()
156 return (datetime.min.replace(tzinfo=UTC) + timedelta(seconds=number)).time()
158 if isinstance(value, bytes):
159 value = value.decode()
161 match = time_re.match(value) # type: ignore
162 if match is None:
163 raise TimeError()
165 kw = match.groupdict()
166 if kw["microsecond"]:
167 kw["microsecond"] = kw["microsecond"].ljust(6, "0")
169 tzinfo = _parse_timezone(kw.pop("tzinfo"), TimeError)
170 kw_: dict[str, None | int | timezone] = {k: int(v) for k, v in kw.items() if v is not None}
171 kw_["tzinfo"] = tzinfo
173 try:
174 return time(**kw_) # type: ignore
175 except ValueError as e:
176 raise TimeError() from e
179def parse_datetime(value: datetime | str | bytes | int | float) -> datetime: 1a
180 """
181 Parse a datetime/int/float/string and return a datetime.datetime.
183 This function supports time zone offsets. When the input contains one,
184 the output uses a timezone with a fixed offset from UTC.
186 Raise ValueError if the input is well formatted but not a valid datetime.
187 Raise ValueError if the input isn't well formatted.
188 """
189 if isinstance(value, datetime): 189 ↛ 190line 189 didn't jump to line 190 because the condition on line 189 was never true1bcdefghijklmno
190 return value
192 number = get_numeric(value, "datetime") 1bcdefghijklmno
193 if number is not None: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true1bcdefghijklmno
194 return from_unix_seconds(number)
196 if isinstance(value, bytes): 196 ↛ 197line 196 didn't jump to line 197 because the condition on line 196 was never true1bcdefghijklmno
197 value = value.decode()
199 match = datetime_re.match(value) # type: ignore 1bcdefghijklmno
200 if match is None: 200 ↛ 203line 200 didn't jump to line 203 because the condition on line 200 was always true1bcdefghijklmno
201 raise DateTimeError() 1bcdefghijklmno
203 kw = match.groupdict()
204 if kw["microsecond"]:
205 kw["microsecond"] = kw["microsecond"].ljust(6, "0")
207 tzinfo = _parse_timezone(kw.pop("tzinfo"), DateTimeError)
208 kw_: dict[str, None | int | timezone] = {k: int(v) for k, v in kw.items() if v is not None}
209 kw_["tzinfo"] = tzinfo
211 try:
212 return datetime(**kw_) # type: ignore # noqa DTZ001
213 except ValueError as e:
214 raise DateTimeError() from e
217def parse_duration(value: str | bytes | int | float) -> timedelta: 1a
218 """
219 Parse a duration int/float/string and return a datetime.timedelta.
221 The preferred format for durations in Django is '%d %H:%M:%S.%f'.
223 Also supports ISO 8601 representation.
224 """
225 if isinstance(value, timedelta):
226 return value
228 if isinstance(value, int | float):
229 # below code requires a string
230 value = f"{value:f}"
231 elif isinstance(value, bytes):
232 value = value.decode()
234 try:
235 match = standard_duration_re.match(value) or iso8601_duration_re.match(value)
236 except TypeError as e:
237 raise TypeError("invalid type; expected timedelta, string, bytes, int or float") from e
239 if not match:
240 raise DurationError()
242 kw = match.groupdict()
243 sign = -1 if kw.pop("sign", "+") == "-" else 1
244 if kw.get("microseconds"):
245 kw["microseconds"] = kw["microseconds"].ljust(6, "0")
247 if kw.get("seconds") and kw.get("microseconds") and kw["seconds"].startswith("-"):
248 kw["microseconds"] = "-" + kw["microseconds"]
250 kw_ = {k: float(v) for k, v in kw.items() if v is not None}
252 return sign * timedelta(**kw_)