Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/schemas/schedules.py: 25%

162 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Schedule schemas 

3""" 

4 

5import datetime 1a

6from typing import TYPE_CHECKING, Annotated, Any, ClassVar, Optional, Union 1a

7from zoneinfo import ZoneInfo 1a

8 

9import dateutil 1a

10import dateutil.rrule 1a

11import dateutil.tz 1a

12from pydantic import AfterValidator, ConfigDict, Field, field_validator, model_validator 1a

13from typing_extensions import TypeAlias, TypeGuard 1a

14 

15from prefect._internal.schemas.bases import PrefectBaseModel 1a

16from prefect._internal.schemas.validators import ( 1a

17 default_anchor_date, 

18 default_timezone, 

19 validate_cron_string, 

20 validate_rrule_string, 

21) 

22from prefect.types._datetime import Date, DateTime, now 1a

23 

24MAX_ITERATIONS = 1000 1a

25# approx. 1 years worth of RDATEs + buffer 

26MAX_RRULE_LENGTH = 6500 1a

27 

28 

29def is_valid_timezone(v: str) -> bool: 1a

30 """ 

31 Validate that the provided timezone is a valid IANA timezone. 

32 

33 Unfortunately this list is slightly different from the list of valid 

34 timezones we use for cron and interval timezone validation. 

35 """ 

36 from prefect._internal.pytz import HAS_PYTZ 

37 

38 if HAS_PYTZ: 

39 import pytz 

40 else: 

41 from prefect._internal import pytz 

42 

43 return v in pytz.all_timezones_set 

44 

45 

46class IntervalSchedule(PrefectBaseModel): 1a

47 """ 

48 A schedule formed by adding `interval` increments to an `anchor_date`. If no 

49 `anchor_date` is supplied, the current UTC time is used. If a 

50 timezone-naive datetime is provided for `anchor_date`, it is assumed to be 

51 in the schedule's timezone (or UTC). Even if supplied with an IANA timezone, 

52 anchor dates are always stored as UTC offsets, so a `timezone` can be 

53 provided to determine localization behaviors like DST boundary handling. If 

54 none is provided it will be inferred from the anchor date. 

55 

56 NOTE: If the `IntervalSchedule` `anchor_date` or `timezone` is provided in a 

57 DST-observing timezone, then the schedule will adjust itself appropriately. 

58 Intervals greater than 24 hours will follow DST conventions, while intervals 

59 of less than 24 hours will follow UTC intervals. For example, an hourly 

60 schedule will fire every UTC hour, even across DST boundaries. When clocks 

61 are set back, this will result in two runs that *appear* to both be 

62 scheduled for 1am local time, even though they are an hour apart in UTC 

63 time. For longer intervals, like a daily schedule, the interval schedule 

64 will adjust for DST boundaries so that the clock-hour remains constant. This 

65 means that a daily schedule that always fires at 9am will observe DST and 

66 continue to fire at 9am in the local time zone. 

67 

68 Args: 

69 interval (datetime.timedelta): an interval to schedule on 

70 anchor_date (DateTime, optional): an anchor date to schedule increments against; 

71 if not provided, the current timestamp will be used 

72 timezone (str, optional): a valid timezone string 

73 """ 

74 

75 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

76 

77 interval: datetime.timedelta = Field(gt=datetime.timedelta(0)) 1a

78 anchor_date: Annotated[DateTime, AfterValidator(default_anchor_date)] = Field( # pyright: ignore[reportAssignmentType] DateTime is split into two types depending on Python version 1a

79 default_factory=lambda: now("UTC"), 

80 examples=["2020-01-01T00:00:00Z"], 

81 ) 

82 timezone: Optional[str] = Field(default=None, examples=["America/New_York"]) 1a

83 

84 @model_validator(mode="after") 1a

85 def validate_timezone(self): 1a

86 self.timezone = default_timezone(self.timezone, self.model_dump()) 

87 return self 

88 

89 if TYPE_CHECKING: 89 ↛ 91line 89 didn't jump to line 91 because the condition on line 89 was never true1a

90 # The model accepts str or datetime values for `anchor_date` 

91 def __init__( 

92 self, 

93 /, 

94 interval: datetime.timedelta, 

95 anchor_date: Optional[Union[DateTime, datetime.datetime, str]] = None, 

96 timezone: Optional[str] = None, 

97 ) -> None: ... 

98 

99 

100class CronSchedule(PrefectBaseModel): 1a

101 """ 

102 Cron schedule 

103 

104 NOTE: If the timezone is a DST-observing one, then the schedule will adjust 

105 itself appropriately. Cron's rules for DST are based on schedule times, not 

106 intervals. This means that an hourly cron schedule will fire on every new 

107 schedule hour, not every elapsed hour; for example, when clocks are set back 

108 this will result in a two-hour pause as the schedule will fire *the first 

109 time* 1am is reached and *the first time* 2am is reached, 120 minutes later. 

110 Longer schedules, such as one that fires at 9am every morning, will 

111 automatically adjust for DST. 

112 

113 Args: 

114 cron (str): a valid cron string 

115 timezone (str): a valid timezone string in IANA tzdata format (for example, 

116 America/New_York). 

117 day_or (bool, optional): Control how croniter handles `day` and `day_of_week` 

118 entries. Defaults to True, matching cron which connects those values using 

119 OR. If the switch is set to False, the values are connected using AND. This 

120 behaves like fcron and enables you to e.g. define a job that executes each 

121 2nd friday of a month by setting the days of month and the weekday. 

122 

123 """ 

124 

125 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

126 

127 cron: str = Field(default=..., examples=["0 0 * * *"]) 1a

128 timezone: Optional[str] = Field(default=None, examples=["America/New_York"]) 1a

129 day_or: bool = Field( 1a

130 default=True, 

131 description=( 

132 "Control croniter behavior for handling day and day_of_week entries." 

133 ), 

134 ) 

135 

136 @field_validator("timezone") 1a

137 @classmethod 1a

138 def valid_timezone(cls, v: Optional[str]) -> Optional[str]: 1a

139 return default_timezone(v) 

140 

141 @field_validator("cron") 1a

142 @classmethod 1a

143 def valid_cron_string(cls, v: str) -> str: 1a

144 return validate_cron_string(v) 

145 

146 

147DEFAULT_ANCHOR_DATE = Date(2020, 1, 1) 1a

148 

149 

150def _rrule_dt( 1a

151 rrule: dateutil.rrule.rrule, name: str = "_dtstart" 

152) -> Optional[datetime.datetime]: 

153 return getattr(rrule, name, None) 

154 

155 

156def _rrule( 1a

157 rruleset: dateutil.rrule.rruleset, name: str = "_rrule" 

158) -> list[dateutil.rrule.rrule]: 

159 return getattr(rruleset, name, []) 

160 

161 

162def _rdates( 1a

163 rrule: dateutil.rrule.rruleset, name: str = "_rdate" 

164) -> list[datetime.datetime]: 

165 return getattr(rrule, name, []) 

166 

167 

168class RRuleSchedule(PrefectBaseModel): 1a

169 """ 

170 RRule schedule, based on the iCalendar standard 

171 ([RFC 5545](https://datatracker.ietf.org/doc/html/rfc5545)) as 

172 implemented in `dateutils.rrule`. 

173 

174 RRules are appropriate for any kind of calendar-date manipulation, including 

175 irregular intervals, repetition, exclusions, week day or day-of-month 

176 adjustments, and more. 

177 

178 Note that as a calendar-oriented standard, `RRuleSchedules` are sensitive to 

179 to the initial timezone provided. A 9am daily schedule with a daylight saving 

180 time-aware start date will maintain a local 9am time through DST boundaries; 

181 a 9am daily schedule with a UTC start date will maintain a 9am UTC time. 

182 

183 Args: 

184 rrule (str): a valid RRule string 

185 timezone (str, optional): a valid timezone string 

186 """ 

187 

188 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

189 

190 rrule: str 1a

191 timezone: Optional[str] = Field( 1a

192 default="UTC", examples=["America/New_York"], validate_default=True 

193 ) 

194 

195 @field_validator("rrule") 1a

196 @classmethod 1a

197 def validate_rrule_str(cls, v: str) -> str: 1a

198 return validate_rrule_string(v) 

199 

200 @classmethod 1a

201 def from_rrule( 1a

202 cls, rrule: Union[dateutil.rrule.rrule, dateutil.rrule.rruleset] 

203 ) -> "RRuleSchedule": 

204 if isinstance(rrule, dateutil.rrule.rrule): 

205 dtstart = _rrule_dt(rrule) 

206 if dtstart and dtstart.tzinfo is not None: 

207 timezone = dtstart.tzinfo.tzname(dtstart) 

208 else: 

209 timezone = "UTC" 

210 return RRuleSchedule(rrule=str(rrule), timezone=timezone) 

211 rrules = _rrule(rrule) 

212 dtstarts = [dts for rr in rrules if (dts := _rrule_dt(rr)) is not None] 

213 unique_dstarts = set(d.astimezone(ZoneInfo("UTC")) for d in dtstarts) 

214 unique_timezones = set(d.tzinfo for d in dtstarts if d.tzinfo is not None) 

215 

216 if len(unique_timezones) > 1: 

217 raise ValueError( 

218 f"rruleset has too many dtstart timezones: {unique_timezones}" 

219 ) 

220 

221 if len(unique_dstarts) > 1: 

222 raise ValueError(f"rruleset has too many dtstarts: {unique_dstarts}") 

223 

224 if unique_dstarts and unique_timezones: 

225 [unique_tz] = unique_timezones 

226 timezone = unique_tz.tzname(dtstarts[0]) 

227 else: 

228 timezone = "UTC" 

229 

230 rruleset_string = "" 

231 if rrules: 

232 rruleset_string += "\n".join(str(r) for r in rrules) 

233 if exrule := _rrule(rrule, "_exrule"): 

234 rruleset_string += "\n" if rruleset_string else "" 

235 rruleset_string += "\n".join(str(r) for r in exrule).replace( 

236 "RRULE", "EXRULE" 

237 ) 

238 if rdates := _rdates(rrule): 

239 rruleset_string += "\n" if rruleset_string else "" 

240 rruleset_string += "RDATE:" + ",".join( 

241 rd.strftime("%Y%m%dT%H%M%SZ") for rd in rdates 

242 ) 

243 if exdates := _rdates(rrule, "_exdate"): 

244 rruleset_string += "\n" if rruleset_string else "" 

245 rruleset_string += "EXDATE:" + ",".join( 

246 exd.strftime("%Y%m%dT%H%M%SZ") for exd in exdates 

247 ) 

248 return RRuleSchedule(rrule=rruleset_string, timezone=timezone) 

249 

250 def to_rrule(self) -> Union[dateutil.rrule.rrule, dateutil.rrule.rruleset]: 1a

251 """ 

252 Since rrule doesn't properly serialize/deserialize timezones, we localize dates 

253 here 

254 """ 

255 rrule = dateutil.rrule.rrulestr( 

256 self.rrule, 

257 dtstart=DEFAULT_ANCHOR_DATE, 

258 cache=True, 

259 ) 

260 timezone = dateutil.tz.gettz(self.timezone) 

261 if isinstance(rrule, dateutil.rrule.rrule): 

262 dtstart = _rrule_dt(rrule) 

263 assert dtstart is not None 

264 kwargs: dict[str, Any] = dict(dtstart=dtstart.replace(tzinfo=timezone)) 

265 if until := _rrule_dt(rrule, "_until"): 

266 kwargs.update( 

267 until=until.replace(tzinfo=timezone), 

268 ) 

269 return rrule.replace(**kwargs) # pyright: ignore[reportUnknownVariableType, reportUnknownMemberType] missing type hints 

270 

271 # update rrules 

272 localized_rrules: list[dateutil.rrule.rrule] = [] 

273 for rr in _rrule(rrule): 

274 dtstart = _rrule_dt(rr) 

275 assert dtstart is not None 

276 kwargs: dict[str, Any] = dict(dtstart=dtstart.replace(tzinfo=timezone)) 

277 if until := _rrule_dt(rr, "_until"): 

278 kwargs.update(until=until.replace(tzinfo=timezone)) 

279 localized_rrules.append(rr.replace(**kwargs)) # pyright: ignore[reportUnknownArgumentType, reportUnknownMemberType] missing type hints 

280 setattr(rrule, "_rrule", localized_rrules) 

281 

282 # update exrules 

283 localized_exrules: list[dateutil.rrule.rruleset] = [] 

284 for exr in _rrule(rrule, "_exrule"): 

285 dtstart = _rrule_dt(exr) 

286 assert dtstart is not None 

287 kwargs = dict(dtstart=dtstart.replace(tzinfo=timezone)) 

288 if until := _rrule_dt(exr, "_until"): 

289 kwargs.update(until=until.replace(tzinfo=timezone)) 

290 localized_exrules.append(exr.replace(**kwargs)) # pyright: ignore[reportArgumentType, reportUnknownArgumentType, reportUnknownMemberType] missing type hints 

291 setattr(rrule, "_exrule", localized_exrules) 

292 

293 # update rdates 

294 localized_rdates: list[datetime.datetime] = [] 

295 for rd in _rdates(rrule): 

296 localized_rdates.append(rd.replace(tzinfo=timezone)) 

297 setattr(rrule, "_rdate", localized_rdates) 

298 

299 # update exdates 

300 localized_exdates: list[datetime.datetime] = [] 

301 for exd in _rdates(rrule, "_exdate"): 

302 localized_exdates.append(exd.replace(tzinfo=timezone)) 

303 setattr(rrule, "_exdate", localized_exdates) 

304 

305 return rrule 

306 

307 @field_validator("timezone") 1a

308 def valid_timezone(cls, v: Optional[str]) -> str: 1a

309 """ 

310 Validate that the provided timezone is a valid IANA timezone. 

311 

312 Unfortunately this list is slightly different from the list of valid 

313 timezones we use for cron and interval timezone validation. 

314 """ 

315 if v is None: 

316 return "UTC" 

317 

318 if is_valid_timezone(v): 

319 return v 

320 

321 raise ValueError(f'Invalid timezone: "{v}"') 

322 

323 

324class NoSchedule(PrefectBaseModel): 1a

325 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

326 

327 

328SCHEDULE_TYPES: TypeAlias = Union[ 1a

329 IntervalSchedule, CronSchedule, RRuleSchedule, NoSchedule 

330] 

331 

332 

333def is_schedule_type(obj: Any) -> TypeGuard[SCHEDULE_TYPES]: 1a

334 return isinstance(obj, (IntervalSchedule, CronSchedule, RRuleSchedule, NoSchedule)) 

335 

336 

337def construct_schedule( 1a

338 interval: Optional[Union[int, float, datetime.timedelta]] = None, 

339 anchor_date: Optional[Union[datetime.datetime, str]] = None, 

340 cron: Optional[str] = None, 

341 rrule: Optional[str] = None, 

342 timezone: Optional[str] = None, 

343) -> SCHEDULE_TYPES: 

344 """ 

345 Construct a schedule from the provided arguments. 

346 

347 Args: 

348 interval: An interval on which to schedule runs. Accepts either a number 

349 or a timedelta object. If a number is given, it will be interpreted as seconds. 

350 anchor_date: The start date for an interval schedule. 

351 cron: A cron schedule for runs. 

352 rrule: An rrule schedule of when to execute runs of this flow. 

353 timezone: A timezone to use for the schedule. Defaults to UTC. 

354 """ 

355 num_schedules = sum(1 for entry in (interval, cron, rrule) if entry is not None) 

356 if num_schedules > 1: 

357 raise ValueError("Only one of interval, cron, or rrule can be provided.") 

358 

359 if anchor_date and not interval: 

360 raise ValueError( 

361 "An anchor date can only be provided with an interval schedule" 

362 ) 

363 

364 if timezone and not (interval or cron or rrule): 

365 raise ValueError( 

366 "A timezone can only be provided with interval, cron, or rrule" 

367 ) 

368 

369 schedule = None 

370 if interval: 

371 if isinstance(interval, (int, float)): 

372 interval = datetime.timedelta(seconds=interval) 

373 if not anchor_date: 

374 anchor_date = now() 

375 schedule = IntervalSchedule( 

376 interval=interval, anchor_date=anchor_date, timezone=timezone 

377 ) 

378 elif cron: 

379 schedule = CronSchedule(cron=cron, timezone=timezone) 

380 elif rrule: 

381 schedule = RRuleSchedule(rrule=rrule, timezone=timezone) 

382 

383 if schedule is None: 

384 raise ValueError("Either interval, cron, or rrule must be provided") 

385 

386 return schedule