Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/schemas/schedules.py: 25%
162 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Schedule schemas
3"""
5import datetime 1a
6from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Optional, Union 1a
7from zoneinfo import ZoneInfo 1a
9import dateutil 1a
10import dateutil.rrule 1a
11import dateutil.tz 1a
12from pydantic import AfterValidator, ConfigDict, Field, field_validator, model_validator 1a
13from typing_extensions import TypeAlias, TypeGuard 1a
15from prefect._internal.schemas.bases import PrefectBaseModel 1a
16from prefect._internal.schemas.validators import ( 1a
17 default_anchor_date,
18 default_timezone,
19 validate_cron_string,
20 validate_rrule_string,
21)
22from prefect.types._datetime import Date, DateTime, now 1a
24MAX_ITERATIONS = 1000 1a
25# approx. 1 years worth of RDATEs + buffer
26MAX_RRULE_LENGTH = 6500 1a
29def is_valid_timezone(v: str) -> bool: 1a
30 """
31 Validate that the provided timezone is a valid IANA timezone.
33 Unfortunately this list is slightly different from the list of valid
34 timezones we use for cron and interval timezone validation.
35 """
36 from prefect._internal.pytz import HAS_PYTZ
38 if HAS_PYTZ:
39 import pytz
40 else:
41 from prefect._internal import pytz
43 return v in pytz.all_timezones_set
46class IntervalSchedule(PrefectBaseModel): 1a
47 """
48 A schedule formed by adding `interval` increments to an `anchor_date`. If no
49 `anchor_date` is supplied, the current UTC time is used. If a
50 timezone-naive datetime is provided for `anchor_date`, it is assumed to be
51 in the schedule's timezone (or UTC). Even if supplied with an IANA timezone,
52 anchor dates are always stored as UTC offsets, so a `timezone` can be
53 provided to determine localization behaviors like DST boundary handling. If
54 none is provided it will be inferred from the anchor date.
56 NOTE: If the `IntervalSchedule` `anchor_date` or `timezone` is provided in a
57 DST-observing timezone, then the schedule will adjust itself appropriately.
58 Intervals greater than 24 hours will follow DST conventions, while intervals
59 of less than 24 hours will follow UTC intervals. For example, an hourly
60 schedule will fire every UTC hour, even across DST boundaries. When clocks
61 are set back, this will result in two runs that *appear* to both be
62 scheduled for 1am local time, even though they are an hour apart in UTC
63 time. For longer intervals, like a daily schedule, the interval schedule
64 will adjust for DST boundaries so that the clock-hour remains constant. This
65 means that a daily schedule that always fires at 9am will observe DST and
66 continue to fire at 9am in the local time zone.
68 Args:
69 interval (datetime.timedelta): an interval to schedule on
70 anchor_date (DateTime, optional): an anchor date to schedule increments against;
71 if not provided, the current timestamp will be used
72 timezone (str, optional): a valid timezone string
73 """
75 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
77 interval: datetime.timedelta = Field(gt=datetime.timedelta(0)) 1a
78 anchor_date: Annotated[DateTime, AfterValidator(default_anchor_date)] = Field( # pyright: ignore[reportAssignmentType] DateTime is split into two types depending on Python version 1a
79 default_factory=lambda: now("UTC"),
80 examples=["2020-01-01T00:00:00Z"],
81 )
82 timezone: Optional[str] = Field(default=None, examples=["America/New_York"]) 1a
84 @model_validator(mode="after") 1a
85 def validate_timezone(self): 1a
86 self.timezone = default_timezone(self.timezone, self.model_dump())
87 return self
89 if TYPE_CHECKING: 89 ↛ 91line 89 didn't jump to line 91 because the condition on line 89 was never true1a
90 # The model accepts str or datetime values for `anchor_date`
91 def __init__(
92 self,
93 /,
94 interval: datetime.timedelta,
95 anchor_date: Optional[Union[DateTime, datetime.datetime, str]] = None,
96 timezone: Optional[str] = None,
97 ) -> None: ...
100class CronSchedule(PrefectBaseModel): 1a
101 """
102 Cron schedule
104 NOTE: If the timezone is a DST-observing one, then the schedule will adjust
105 itself appropriately. Cron's rules for DST are based on schedule times, not
106 intervals. This means that an hourly cron schedule will fire on every new
107 schedule hour, not every elapsed hour; for example, when clocks are set back
108 this will result in a two-hour pause as the schedule will fire *the first
109 time* 1am is reached and *the first time* 2am is reached, 120 minutes later.
110 Longer schedules, such as one that fires at 9am every morning, will
111 automatically adjust for DST.
113 Args:
114 cron (str): a valid cron string
115 timezone (str): a valid timezone string in IANA tzdata format (for example,
116 America/New_York).
117 day_or (bool, optional): Control how croniter handles `day` and `day_of_week`
118 entries. Defaults to True, matching cron which connects those values using
119 OR. If the switch is set to False, the values are connected using AND. This
120 behaves like fcron and enables you to e.g. define a job that executes each
121 2nd friday of a month by setting the days of month and the weekday.
123 """
125 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
127 cron: str = Field(default=..., examples=["0 0 * * *"]) 1a
128 timezone: Optional[str] = Field(default=None, examples=["America/New_York"]) 1a
129 day_or: bool = Field( 1a
130 default=True,
131 description=(
132 "Control croniter behavior for handling day and day_of_week entries."
133 ),
134 )
136 @field_validator("timezone") 1a
137 @classmethod 1a
138 def valid_timezone(cls, v: Optional[str]) -> Optional[str]: 1a
139 return default_timezone(v)
141 @field_validator("cron") 1a
142 @classmethod 1a
143 def valid_cron_string(cls, v: str) -> str: 1a
144 return validate_cron_string(v)
147DEFAULT_ANCHOR_DATE = Date(2020, 1, 1) 1a
150def _rrule_dt( 1a
151 rrule: dateutil.rrule.rrule, name: str = "_dtstart"
152) -> Optional[datetime.datetime]:
153 return getattr(rrule, name, None)
156def _rrule( 1a
157 rruleset: dateutil.rrule.rruleset, name: str = "_rrule"
158) -> list[dateutil.rrule.rrule]:
159 return getattr(rruleset, name, [])
162def _rdates( 1a
163 rrule: dateutil.rrule.rruleset, name: str = "_rdate"
164) -> list[datetime.datetime]:
165 return getattr(rrule, name, [])
168class RRuleSchedule(PrefectBaseModel): 1a
169 """
170 RRule schedule, based on the iCalendar standard
171 ([RFC 5545](https://datatracker.ietf.org/doc/html/rfc5545)) as
172 implemented in `dateutils.rrule`.
174 RRules are appropriate for any kind of calendar-date manipulation, including
175 irregular intervals, repetition, exclusions, week day or day-of-month
176 adjustments, and more.
178 Note that as a calendar-oriented standard, `RRuleSchedules` are sensitive to
179 to the initial timezone provided. A 9am daily schedule with a daylight saving
180 time-aware start date will maintain a local 9am time through DST boundaries;
181 a 9am daily schedule with a UTC start date will maintain a 9am UTC time.
183 Args:
184 rrule (str): a valid RRule string
185 timezone (str, optional): a valid timezone string
186 """
188 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
190 rrule: str 1a
191 timezone: Optional[str] = Field( 1a
192 default="UTC", examples=["America/New_York"], validate_default=True
193 )
195 @field_validator("rrule") 1a
196 @classmethod 1a
197 def validate_rrule_str(cls, v: str) -> str: 1a
198 return validate_rrule_string(v)
200 @classmethod 1a
201 def from_rrule( 1a
202 cls, rrule: Union[dateutil.rrule.rrule, dateutil.rrule.rruleset]
203 ) -> "RRuleSchedule":
204 if isinstance(rrule, dateutil.rrule.rrule):
205 dtstart = _rrule_dt(rrule)
206 if dtstart and dtstart.tzinfo is not None:
207 timezone = dtstart.tzinfo.tzname(dtstart)
208 else:
209 timezone = "UTC"
210 return RRuleSchedule(rrule=str(rrule), timezone=timezone)
211 rrules = _rrule(rrule)
212 dtstarts = [dts for rr in rrules if (dts := _rrule_dt(rr)) is not None]
213 unique_dstarts = set(d.astimezone(ZoneInfo("UTC")) for d in dtstarts)
214 unique_timezones = set(d.tzinfo for d in dtstarts if d.tzinfo is not None)
216 if len(unique_timezones) > 1:
217 raise ValueError(
218 f"rruleset has too many dtstart timezones: {unique_timezones}"
219 )
221 if len(unique_dstarts) > 1:
222 raise ValueError(f"rruleset has too many dtstarts: {unique_dstarts}")
224 if unique_dstarts and unique_timezones:
225 [unique_tz] = unique_timezones
226 timezone = unique_tz.tzname(dtstarts[0])
227 else:
228 timezone = "UTC"
230 rruleset_string = ""
231 if rrules:
232 rruleset_string += "\n".join(str(r) for r in rrules)
233 if exrule := _rrule(rrule, "_exrule"):
234 rruleset_string += "\n" if rruleset_string else ""
235 rruleset_string += "\n".join(str(r) for r in exrule).replace(
236 "RRULE", "EXRULE"
237 )
238 if rdates := _rdates(rrule):
239 rruleset_string += "\n" if rruleset_string else ""
240 rruleset_string += "RDATE:" + ",".join(
241 rd.strftime("%Y%m%dT%H%M%SZ") for rd in rdates
242 )
243 if exdates := _rdates(rrule, "_exdate"):
244 rruleset_string += "\n" if rruleset_string else ""
245 rruleset_string += "EXDATE:" + ",".join(
246 exd.strftime("%Y%m%dT%H%M%SZ") for exd in exdates
247 )
248 return RRuleSchedule(rrule=rruleset_string, timezone=timezone)
250 def to_rrule(self) -> Union[dateutil.rrule.rrule, dateutil.rrule.rruleset]: 1a
251 """
252 Since rrule doesn't properly serialize/deserialize timezones, we localize dates
253 here
254 """
255 rrule = dateutil.rrule.rrulestr(
256 self.rrule,
257 dtstart=DEFAULT_ANCHOR_DATE,
258 cache=True,
259 )
260 timezone = dateutil.tz.gettz(self.timezone)
261 if isinstance(rrule, dateutil.rrule.rrule):
262 dtstart = _rrule_dt(rrule)
263 assert dtstart is not None
264 kwargs: dict[str, Any] = dict(dtstart=dtstart.replace(tzinfo=timezone))
265 if until := _rrule_dt(rrule, "_until"):
266 kwargs.update(
267 until=until.replace(tzinfo=timezone),
268 )
269 return rrule.replace(**kwargs) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType] missing type hints
271 # update rrules
272 localized_rrules: list[dateutil.rrule.rrule] = []
273 for rr in _rrule(rrule):
274 dtstart = _rrule_dt(rr)
275 assert dtstart is not None
276 kwargs: dict[str, Any] = dict(dtstart=dtstart.replace(tzinfo=timezone))
277 if until := _rrule_dt(rr, "_until"):
278 kwargs.update(until=until.replace(tzinfo=timezone))
279 localized_rrules.append(rr.replace(**kwargs)) # pyright: ignore[reportUnknownArgumentType, reportUnknownMemberType] missing type hints
280 setattr(rrule, "_rrule", localized_rrules)
282 # update exrules
283 localized_exrules: list[dateutil.rrule.rruleset] = []
284 for exr in _rrule(rrule, "_exrule"):
285 dtstart = _rrule_dt(exr)
286 assert dtstart is not None
287 kwargs = dict(dtstart=dtstart.replace(tzinfo=timezone))
288 if until := _rrule_dt(exr, "_until"):
289 kwargs.update(until=until.replace(tzinfo=timezone))
290 localized_exrules.append(exr.replace(**kwargs)) # pyright: ignore[reportArgumentType, reportUnknownArgumentType, reportUnknownMemberType] missing type hints
291 setattr(rrule, "_exrule", localized_exrules)
293 # update rdates
294 localized_rdates: list[datetime.datetime] = []
295 for rd in _rdates(rrule):
296 localized_rdates.append(rd.replace(tzinfo=timezone))
297 setattr(rrule, "_rdate", localized_rdates)
299 # update exdates
300 localized_exdates: list[datetime.datetime] = []
301 for exd in _rdates(rrule, "_exdate"):
302 localized_exdates.append(exd.replace(tzinfo=timezone))
303 setattr(rrule, "_exdate", localized_exdates)
305 return rrule
307 @field_validator("timezone") 1a
308 def valid_timezone(cls, v: Optional[str]) -> str: 1a
309 """
310 Validate that the provided timezone is a valid IANA timezone.
312 Unfortunately this list is slightly different from the list of valid
313 timezones we use for cron and interval timezone validation.
314 """
315 if v is None:
316 return "UTC"
318 if is_valid_timezone(v):
319 return v
321 raise ValueError(f'Invalid timezone: "{v}"')
324class NoSchedule(PrefectBaseModel): 1a
325 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
328SCHEDULE_TYPES: TypeAlias = Union[ 1a
329 IntervalSchedule, CronSchedule, RRuleSchedule, NoSchedule
330]
333def is_schedule_type(obj: Any) -> TypeGuard[SCHEDULE_TYPES]: 1a
334 return isinstance(obj, (IntervalSchedule, CronSchedule, RRuleSchedule, NoSchedule))
337def construct_schedule( 1a
338 interval: Optional[Union[int, float, datetime.timedelta]] = None,
339 anchor_date: Optional[Union[datetime.datetime, str]] = None,
340 cron: Optional[str] = None,
341 rrule: Optional[str] = None,
342 timezone: Optional[str] = None,
343) -> SCHEDULE_TYPES:
344 """
345 Construct a schedule from the provided arguments.
347 Args:
348 interval: An interval on which to schedule runs. Accepts either a number
349 or a timedelta object. If a number is given, it will be interpreted as seconds.
350 anchor_date: The start date for an interval schedule.
351 cron: A cron schedule for runs.
352 rrule: An rrule schedule of when to execute runs of this flow.
353 timezone: A timezone to use for the schedule. Defaults to UTC.
354 """
355 num_schedules = sum(1 for entry in (interval, cron, rrule) if entry is not None)
356 if num_schedules > 1:
357 raise ValueError("Only one of interval, cron, or rrule can be provided.")
359 if anchor_date and not interval:
360 raise ValueError(
361 "An anchor date can only be provided with an interval schedule"
362 )
364 if timezone and not (interval or cron or rrule):
365 raise ValueError(
366 "A timezone can only be provided with interval, cron, or rrule"
367 )
369 schedule = None
370 if interval:
371 if isinstance(interval, (int, float)):
372 interval = datetime.timedelta(seconds=interval)
373 if not anchor_date:
374 anchor_date = now()
375 schedule = IntervalSchedule(
376 interval=interval, anchor_date=anchor_date, timezone=timezone
377 )
378 elif cron:
379 schedule = CronSchedule(cron=cron, timezone=timezone)
380 elif rrule:
381 schedule = RRuleSchedule(rrule=rrule, timezone=timezone)
383 if schedule is None:
384 raise ValueError("Either interval, cron, or rrule must be provided")
386 return schedule