Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/counting.py: 77%

121 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1c

2 

3import datetime 1c

4import math 1c

5from datetime import timedelta 1c

6from typing import TYPE_CHECKING, Generator 1c

7from zoneinfo import ZoneInfo 1c

8 

9import sqlalchemy as sa 1c

10from sqlalchemy.sql.selectable import Select 1c

11 

12from prefect.server.database import PrefectDBInterface, provide_database_interface 1c

13from prefect.types import DateTime 1c

14from prefect.types._datetime import Duration, end_of_period, now 1c

15from prefect.utilities.collections import AutoEnum 1c

16 

17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1c

18 from prefect.server.events.filters import EventFilter 

19 

20 

21# The earliest possible event.occurred date in any Prefect environment is 

22# 2024-04-04, so we use the Monday before that as our pivot date. 

23PIVOT_DATETIME = datetime.datetime(2024, 4, 1, tzinfo=ZoneInfo("UTC")) 1c

24 

25 

26class InvalidEventCountParameters(ValueError): 1c

27 """Raised when the given parameters are invalid for counting events.""" 

28 

29 def __init__(self, message: str): 1c

30 super().__init__(message) 

31 self.message = message 

32 

33 

34class TimeUnit(AutoEnum): 1c

35 week = AutoEnum.auto() 1c

36 day = AutoEnum.auto() 1c

37 hour = AutoEnum.auto() 1c

38 minute = AutoEnum.auto() 1c

39 second = AutoEnum.auto() 1c

40 

41 def as_timedelta(self, interval: float) -> Duration: 1c

42 if self == self.week: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true1ab

43 return Duration(days=7 * interval) 

44 elif self == self.day: 44 ↛ 46line 44 didn't jump to line 46 because the condition on line 44 was always true1ab

45 return Duration(days=1 * interval) 1ab

46 elif self == self.hour: 

47 return Duration(hours=1 * interval) 

48 elif self == self.minute: 

49 return Duration(minutes=1 * interval) 

50 elif self == self.second: 

51 return Duration(seconds=1 * interval) 

52 else: 

53 raise NotImplementedError() 

54 

55 def validate_buckets( 1c

56 self, 

57 start_datetime: datetime.datetime, 

58 end_datetime: datetime.datetime, 

59 interval: float, 

60 ) -> None: 

61 MAX_ALLOWED_BUCKETS = 1000 1ab

62 

63 delta = self.as_timedelta(interval) 1ab

64 start_in_utc = start_datetime.astimezone(ZoneInfo("UTC")) 1ab

65 end_in_utc = end_datetime.astimezone(ZoneInfo("UTC")) 1ab

66 

67 if interval < 0.01: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true1ab

68 raise InvalidEventCountParameters("The minimum interval is 0.01") 

69 

70 number_of_buckets = math.ceil((end_in_utc - start_in_utc) / delta) 1ab

71 if number_of_buckets > MAX_ALLOWED_BUCKETS: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true1ab

72 raise InvalidEventCountParameters( 

73 f"The given interval would create {number_of_buckets} buckets, " 

74 "which is too many. Please increase the interval or reduce the " 

75 f"time range to produce {MAX_ALLOWED_BUCKETS} buckets or fewer." 

76 ) 

77 

78 def get_interval_spans( 1c

79 self, 

80 start_datetime: datetime.datetime, 

81 end_datetime: datetime.datetime, 

82 interval: float, 

83 ) -> Generator[int | tuple[datetime.datetime, datetime.datetime], None, None]: 

84 """Divide the given range of dates into evenly-sized spans of interval units""" 

85 self.validate_buckets(start_datetime, end_datetime, interval) 1ab

86 

87 # Our universe began on PIVOT_DATETIME and all time after that is 

88 # divided into `delta`-sized buckets. We want to find the bucket that 

89 # contains `start_datetime` and then find the all of the buckets 

90 # that come after it until the bucket that contains `end_datetime`. 

91 

92 delta = self.as_timedelta(interval) 1ab

93 start_in_utc = start_datetime.astimezone(ZoneInfo("UTC")) 1ab

94 end_in_utc = end_datetime.astimezone(ZoneInfo("UTC")) 1ab

95 

96 if end_in_utc > now("UTC"): 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true1ab

97 end_in_utc = end_of_period(now("UTC"), self.value) 

98 

99 first_span_index = math.floor((start_in_utc - PIVOT_DATETIME) / delta) 1ab

100 

101 yield first_span_index 1ab

102 

103 span_start = PIVOT_DATETIME + delta * first_span_index 1ab

104 

105 while span_start < end_in_utc: 1ab

106 next_span_start = span_start + delta 1ab

107 yield (span_start, next_span_start - timedelta(microseconds=1)) 1ab

108 span_start = next_span_start 1ab

109 

110 def database_value_expression(self, time_interval: float) -> sa.Cast[str]: 1c

111 """Returns the SQL expression to place an event in a time bucket""" 

112 # The date_bin function can do the bucketing for us: 

113 # https://www.postgresql.org/docs/14/functions-datetime.html#FUNCTIONS-DATETIME-BIN 

114 db = provide_database_interface() 1ab

115 delta = self.as_timedelta(time_interval) 1ab

116 if db.dialect.name == "postgresql": 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true1ab

117 return sa.cast( 

118 sa.func.floor( 

119 sa.extract( 

120 "epoch", 

121 ( 

122 sa.func.date_bin(delta, db.Event.occurred, PIVOT_DATETIME) 

123 - PIVOT_DATETIME 

124 ), 

125 ) 

126 / delta.total_seconds(), 

127 ), 

128 sa.Text, 

129 ) 

130 elif db.dialect.name == "sqlite": 130 ↛ 143line 130 didn't jump to line 143 because the condition on line 130 was always true1ab

131 # Convert pivot date and event date to strings formatted as seconds since the epoch 

132 pivot_timestamp = sa.func.strftime( 1ab

133 "%s", PIVOT_DATETIME.strftime("%Y-%m-%d %H:%M:%S") 

134 ) 

135 event_timestamp = sa.func.strftime("%s", db.Event.occurred) 1ab

136 seconds_since_pivot = event_timestamp - pivot_timestamp 1ab

137 # Calculate the bucket index by dividing by the interval in seconds and flooring the result 

138 bucket_index = sa.func.floor( 1ab

139 sa.cast(seconds_since_pivot, sa.Integer) / delta.total_seconds() 

140 ) 

141 return sa.cast(bucket_index, sa.Text) 1ab

142 else: 

143 raise NotImplementedError(f"Dialect {db.dialect.name} is not supported.") 

144 

145 def database_label_expression( 1c

146 self, db: PrefectDBInterface, time_interval: float 

147 ) -> sa.Function[str]: 

148 """Returns the SQL expression to label a time bucket""" 

149 time_delta = self.as_timedelta(time_interval) 1ab

150 if db.dialect.name == "postgresql": 150 ↛ 153line 150 didn't jump to line 153 because the condition on line 150 was never true1ab

151 # The date_bin function can do the bucketing for us: 

152 # https://www.postgresql.org/docs/14/functions-datetime.html#FUNCTIONS-DATETIME-BIN 

153 return sa.func.to_char( 

154 sa.func.date_bin(time_delta, db.Event.occurred, PIVOT_DATETIME), 

155 'YYYY-MM-DD"T"HH24:MI:SSTZH:TZM', 

156 ) 

157 elif db.dialect.name == "sqlite": 157 ↛ 172line 157 didn't jump to line 172 because the condition on line 157 was always true1ab

158 # We can't use date_bin in SQLite, so we have to do the bucketing manually 

159 seconds_since_epoch = sa.func.strftime("%s", db.Event.occurred) 1ab

160 # Convert the total seconds of the timedelta to a constant in SQL 

161 bucket_size = time_delta.total_seconds() 1ab

162 # Perform integer division and multiplication to find the bucket start epoch using SQL functions 

163 bucket_start_epoch = sa.func.cast( 1ab

164 (sa.cast(seconds_since_epoch, sa.Integer) / bucket_size) * bucket_size, 

165 sa.Integer, 

166 ) 

167 bucket_datetime = sa.func.strftime( 1ab

168 "%Y-%m-%dT%H:%M:%SZ", sa.func.datetime(bucket_start_epoch, "unixepoch") 

169 ) 

170 return bucket_datetime 1ab

171 else: 

172 raise NotImplementedError(f"Dialect {db.dialect.name} is not supported.") 

173 

174 

175class Countable(AutoEnum): 1c

176 day = AutoEnum.auto() # `day` will be translated into an equivalent `time` 1c

177 time = AutoEnum.auto() 1c

178 event = AutoEnum.auto() 1c

179 resource = AutoEnum.auto() 1c

180 

181 # Implementations for storage backend 

182 

183 def get_database_query( 1c

184 self, 

185 filter: "EventFilter", 

186 time_unit: TimeUnit, 

187 time_interval: float, 

188 ) -> Select[tuple[str, str, DateTime, DateTime, int]]: 

189 db = provide_database_interface() 1ab

190 # The innermost SELECT pulls the matching events and groups them up by their 

191 # buckets. At this point, there may be duplicate buckets for each value, since 

192 # the label of the thing referred to might have changed 

193 fundamental_counts = ( 1ab

194 sa.select( 

195 ( 

196 self._database_value_expression( 

197 db, 

198 time_unit=time_unit, 

199 time_interval=time_interval, 

200 ).label("value") 

201 ), 

202 ( 

203 self._database_label_expression( 

204 db, 

205 time_unit=time_unit, 

206 time_interval=time_interval, 

207 ).label("label") 

208 ), 

209 sa.func.max(db.Event.occurred).label("latest"), 

210 sa.func.min(db.Event.occurred).label("oldest"), 

211 sa.func.count().label("count"), 

212 ) 

213 .where(sa.and_(*filter.build_where_clauses())) 

214 .group_by("value", "label") 

215 ) 

216 

217 fundamental_counts = fundamental_counts.select_from(db.Event) 1ab

218 

219 # An intermediate SELECT takes the fundamental counts and reprojects it with the 

220 # most recent value for the labels of that bucket. 

221 fundamental = fundamental_counts.subquery("fundamental_counts") 1ab

222 with_latest_labels = ( 1ab

223 sa.select( 

224 fundamental.c.value, 

225 ( 

226 sa.func.first_value(fundamental.c.label).over( 

227 partition_by=fundamental.c.value, 

228 order_by=sa.desc(fundamental.c.latest), 

229 ) 

230 ).label("label"), 

231 fundamental.c.latest, 

232 fundamental.c.oldest, 

233 fundamental.c.count, 

234 ) 

235 .select_from(fundamental) 

236 .subquery("with_latest_labels") 

237 ) 

238 

239 # The final SELECT re-sums with the latest labels, ensuring that we get one 

240 # row back for each value. This handles the final ordering as well. 

241 count = sa.func.sum(with_latest_labels.c.count).label("count") 1ab

242 

243 reaggregated = ( 1ab

244 sa.select( 

245 with_latest_labels.c.value.label("value"), 

246 with_latest_labels.c.label.label("label"), 

247 count, 

248 sa.func.min(with_latest_labels.c.oldest).label("start_time"), 

249 sa.func.max(with_latest_labels.c.latest).label("end_time"), 

250 ) 

251 .select_from(with_latest_labels) 

252 .group_by(with_latest_labels.c.value, with_latest_labels.c.label) 

253 ) 

254 

255 if self in (self.day, self.time): 1ab

256 reaggregated = reaggregated.order_by( 1ab

257 sa.asc("start_time"), 

258 ) 

259 else: 

260 reaggregated = reaggregated.order_by( 1a

261 sa.desc(count), 

262 sa.asc(with_latest_labels.c.label), 

263 ) 

264 

265 return reaggregated 1ab

266 

267 def _database_value_expression( 1c

268 self, 

269 db: PrefectDBInterface, 

270 time_unit: TimeUnit, 

271 time_interval: float, 

272 ): 

273 if self == self.day: 1ab

274 # The legacy `day` Countable is just a special case of the `time` one 

275 return TimeUnit.day.database_value_expression(1) 1ab

276 elif self == self.time: 1a

277 return time_unit.database_value_expression(time_interval) 1a

278 elif self == self.event: 1a

279 return db.Event.event 1a

280 elif self == self.resource: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was always true1a

281 return db.Event.resource_id 1a

282 else: 

283 raise NotImplementedError() 

284 

285 def _database_label_expression( 1c

286 self, 

287 db: PrefectDBInterface, 

288 time_unit: TimeUnit, 

289 time_interval: float, 

290 ): 

291 if self == self.day: 1ab

292 # The legacy `day` Countable is just a special case of the `time` one 

293 return TimeUnit.day.database_label_expression(db, 1) 1ab

294 elif self == self.time: 1a

295 return time_unit.database_label_expression(db, time_interval) 1a

296 elif self == self.event: 1a

297 return db.Event.event 1a

298 elif self == self.resource: 298 ↛ 305line 298 didn't jump to line 305 because the condition on line 298 was always true1a

299 return sa.func.coalesce( 1a

300 db.Event.resource["prefect.resource.name"].astext, 

301 db.Event.resource["prefect.name"].astext, 

302 db.Event.resource_id, 

303 ) 

304 else: 

305 raise NotImplementedError()