Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/counting.py: 77%
121 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1c
3import datetime 1c
4import math 1c
5from datetime import timedelta 1c
6from typing import TYPE_CHECKING, Generator 1c
7from zoneinfo import ZoneInfo 1c
9import sqlalchemy as sa 1c
10from sqlalchemy.sql.selectable import Select 1c
12from prefect.server.database import PrefectDBInterface, provide_database_interface 1c
13from prefect.types import DateTime 1c
14from prefect.types._datetime import Duration, end_of_period, now 1c
15from prefect.utilities.collections import AutoEnum 1c
17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1c
18 from prefect.server.events.filters import EventFilter
21# The earliest possible event.occurred date in any Prefect environment is
22# 2024-04-04, so we use the Monday before that as our pivot date.
23PIVOT_DATETIME = datetime.datetime(2024, 4, 1, tzinfo=ZoneInfo("UTC")) 1c
26class InvalidEventCountParameters(ValueError): 1c
27 """Raised when the given parameters are invalid for counting events."""
29 def __init__(self, message: str): 1c
30 super().__init__(message)
31 self.message = message
34class TimeUnit(AutoEnum): 1c
35 week = AutoEnum.auto() 1c
36 day = AutoEnum.auto() 1c
37 hour = AutoEnum.auto() 1c
38 minute = AutoEnum.auto() 1c
39 second = AutoEnum.auto() 1c
41 def as_timedelta(self, interval: float) -> Duration: 1c
42 if self == self.week: 42 ↛ 43line 42 didn't jump to line 43 because the condition on line 42 was never true1ab
43 return Duration(days=7 * interval)
44 elif self == self.day: 44 ↛ 46line 44 didn't jump to line 46 because the condition on line 44 was always true1ab
45 return Duration(days=1 * interval) 1ab
46 elif self == self.hour:
47 return Duration(hours=1 * interval)
48 elif self == self.minute:
49 return Duration(minutes=1 * interval)
50 elif self == self.second:
51 return Duration(seconds=1 * interval)
52 else:
53 raise NotImplementedError()
55 def validate_buckets( 1c
56 self,
57 start_datetime: datetime.datetime,
58 end_datetime: datetime.datetime,
59 interval: float,
60 ) -> None:
61 MAX_ALLOWED_BUCKETS = 1000 1ab
63 delta = self.as_timedelta(interval) 1ab
64 start_in_utc = start_datetime.astimezone(ZoneInfo("UTC")) 1ab
65 end_in_utc = end_datetime.astimezone(ZoneInfo("UTC")) 1ab
67 if interval < 0.01: 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true1ab
68 raise InvalidEventCountParameters("The minimum interval is 0.01")
70 number_of_buckets = math.ceil((end_in_utc - start_in_utc) / delta) 1ab
71 if number_of_buckets > MAX_ALLOWED_BUCKETS: 71 ↛ 72line 71 didn't jump to line 72 because the condition on line 71 was never true1ab
72 raise InvalidEventCountParameters(
73 f"The given interval would create {number_of_buckets} buckets, "
74 "which is too many. Please increase the interval or reduce the "
75 f"time range to produce {MAX_ALLOWED_BUCKETS} buckets or fewer."
76 )
78 def get_interval_spans( 1c
79 self,
80 start_datetime: datetime.datetime,
81 end_datetime: datetime.datetime,
82 interval: float,
83 ) -> Generator[int | tuple[datetime.datetime, datetime.datetime], None, None]:
84 """Divide the given range of dates into evenly-sized spans of interval units"""
85 self.validate_buckets(start_datetime, end_datetime, interval) 1ab
87 # Our universe began on PIVOT_DATETIME and all time after that is
88 # divided into `delta`-sized buckets. We want to find the bucket that
89 # contains `start_datetime` and then find the all of the buckets
90 # that come after it until the bucket that contains `end_datetime`.
92 delta = self.as_timedelta(interval) 1ab
93 start_in_utc = start_datetime.astimezone(ZoneInfo("UTC")) 1ab
94 end_in_utc = end_datetime.astimezone(ZoneInfo("UTC")) 1ab
96 if end_in_utc > now("UTC"): 96 ↛ 97line 96 didn't jump to line 97 because the condition on line 96 was never true1ab
97 end_in_utc = end_of_period(now("UTC"), self.value)
99 first_span_index = math.floor((start_in_utc - PIVOT_DATETIME) / delta) 1ab
101 yield first_span_index 1ab
103 span_start = PIVOT_DATETIME + delta * first_span_index 1ab
105 while span_start < end_in_utc: 1ab
106 next_span_start = span_start + delta 1ab
107 yield (span_start, next_span_start - timedelta(microseconds=1)) 1ab
108 span_start = next_span_start 1ab
110 def database_value_expression(self, time_interval: float) -> sa.Cast[str]: 1c
111 """Returns the SQL expression to place an event in a time bucket"""
112 # The date_bin function can do the bucketing for us:
113 # https://www.postgresql.org/docs/14/functions-datetime.html#FUNCTIONS-DATETIME-BIN
114 db = provide_database_interface() 1ab
115 delta = self.as_timedelta(time_interval) 1ab
116 if db.dialect.name == "postgresql": 116 ↛ 117line 116 didn't jump to line 117 because the condition on line 116 was never true1ab
117 return sa.cast(
118 sa.func.floor(
119 sa.extract(
120 "epoch",
121 (
122 sa.func.date_bin(delta, db.Event.occurred, PIVOT_DATETIME)
123 - PIVOT_DATETIME
124 ),
125 )
126 / delta.total_seconds(),
127 ),
128 sa.Text,
129 )
130 elif db.dialect.name == "sqlite": 130 ↛ 143line 130 didn't jump to line 143 because the condition on line 130 was always true1ab
131 # Convert pivot date and event date to strings formatted as seconds since the epoch
132 pivot_timestamp = sa.func.strftime( 1ab
133 "%s", PIVOT_DATETIME.strftime("%Y-%m-%d %H:%M:%S")
134 )
135 event_timestamp = sa.func.strftime("%s", db.Event.occurred) 1ab
136 seconds_since_pivot = event_timestamp - pivot_timestamp 1ab
137 # Calculate the bucket index by dividing by the interval in seconds and flooring the result
138 bucket_index = sa.func.floor( 1ab
139 sa.cast(seconds_since_pivot, sa.Integer) / delta.total_seconds()
140 )
141 return sa.cast(bucket_index, sa.Text) 1ab
142 else:
143 raise NotImplementedError(f"Dialect {db.dialect.name} is not supported.")
145 def database_label_expression( 1c
146 self, db: PrefectDBInterface, time_interval: float
147 ) -> sa.Function[str]:
148 """Returns the SQL expression to label a time bucket"""
149 time_delta = self.as_timedelta(time_interval) 1ab
150 if db.dialect.name == "postgresql": 150 ↛ 153line 150 didn't jump to line 153 because the condition on line 150 was never true1ab
151 # The date_bin function can do the bucketing for us:
152 # https://www.postgresql.org/docs/14/functions-datetime.html#FUNCTIONS-DATETIME-BIN
153 return sa.func.to_char(
154 sa.func.date_bin(time_delta, db.Event.occurred, PIVOT_DATETIME),
155 'YYYY-MM-DD"T"HH24:MI:SSTZH:TZM',
156 )
157 elif db.dialect.name == "sqlite": 157 ↛ 172line 157 didn't jump to line 172 because the condition on line 157 was always true1ab
158 # We can't use date_bin in SQLite, so we have to do the bucketing manually
159 seconds_since_epoch = sa.func.strftime("%s", db.Event.occurred) 1ab
160 # Convert the total seconds of the timedelta to a constant in SQL
161 bucket_size = time_delta.total_seconds() 1ab
162 # Perform integer division and multiplication to find the bucket start epoch using SQL functions
163 bucket_start_epoch = sa.func.cast( 1ab
164 (sa.cast(seconds_since_epoch, sa.Integer) / bucket_size) * bucket_size,
165 sa.Integer,
166 )
167 bucket_datetime = sa.func.strftime( 1ab
168 "%Y-%m-%dT%H:%M:%SZ", sa.func.datetime(bucket_start_epoch, "unixepoch")
169 )
170 return bucket_datetime 1ab
171 else:
172 raise NotImplementedError(f"Dialect {db.dialect.name} is not supported.")
175class Countable(AutoEnum): 1c
176 day = AutoEnum.auto() # `day` will be translated into an equivalent `time` 1c
177 time = AutoEnum.auto() 1c
178 event = AutoEnum.auto() 1c
179 resource = AutoEnum.auto() 1c
181 # Implementations for storage backend
183 def get_database_query( 1c
184 self,
185 filter: "EventFilter",
186 time_unit: TimeUnit,
187 time_interval: float,
188 ) -> Select[tuple[str, str, DateTime, DateTime, int]]:
189 db = provide_database_interface() 1ab
190 # The innermost SELECT pulls the matching events and groups them up by their
191 # buckets. At this point, there may be duplicate buckets for each value, since
192 # the label of the thing referred to might have changed
193 fundamental_counts = ( 1ab
194 sa.select(
195 (
196 self._database_value_expression(
197 db,
198 time_unit=time_unit,
199 time_interval=time_interval,
200 ).label("value")
201 ),
202 (
203 self._database_label_expression(
204 db,
205 time_unit=time_unit,
206 time_interval=time_interval,
207 ).label("label")
208 ),
209 sa.func.max(db.Event.occurred).label("latest"),
210 sa.func.min(db.Event.occurred).label("oldest"),
211 sa.func.count().label("count"),
212 )
213 .where(sa.and_(*filter.build_where_clauses()))
214 .group_by("value", "label")
215 )
217 fundamental_counts = fundamental_counts.select_from(db.Event) 1ab
219 # An intermediate SELECT takes the fundamental counts and reprojects it with the
220 # most recent value for the labels of that bucket.
221 fundamental = fundamental_counts.subquery("fundamental_counts") 1ab
222 with_latest_labels = ( 1ab
223 sa.select(
224 fundamental.c.value,
225 (
226 sa.func.first_value(fundamental.c.label).over(
227 partition_by=fundamental.c.value,
228 order_by=sa.desc(fundamental.c.latest),
229 )
230 ).label("label"),
231 fundamental.c.latest,
232 fundamental.c.oldest,
233 fundamental.c.count,
234 )
235 .select_from(fundamental)
236 .subquery("with_latest_labels")
237 )
239 # The final SELECT re-sums with the latest labels, ensuring that we get one
240 # row back for each value. This handles the final ordering as well.
241 count = sa.func.sum(with_latest_labels.c.count).label("count") 1ab
243 reaggregated = ( 1ab
244 sa.select(
245 with_latest_labels.c.value.label("value"),
246 with_latest_labels.c.label.label("label"),
247 count,
248 sa.func.min(with_latest_labels.c.oldest).label("start_time"),
249 sa.func.max(with_latest_labels.c.latest).label("end_time"),
250 )
251 .select_from(with_latest_labels)
252 .group_by(with_latest_labels.c.value, with_latest_labels.c.label)
253 )
255 if self in (self.day, self.time): 1ab
256 reaggregated = reaggregated.order_by( 1ab
257 sa.asc("start_time"),
258 )
259 else:
260 reaggregated = reaggregated.order_by( 1a
261 sa.desc(count),
262 sa.asc(with_latest_labels.c.label),
263 )
265 return reaggregated 1ab
267 def _database_value_expression( 1c
268 self,
269 db: PrefectDBInterface,
270 time_unit: TimeUnit,
271 time_interval: float,
272 ):
273 if self == self.day: 1ab
274 # The legacy `day` Countable is just a special case of the `time` one
275 return TimeUnit.day.database_value_expression(1) 1ab
276 elif self == self.time: 1a
277 return time_unit.database_value_expression(time_interval) 1a
278 elif self == self.event: 1a
279 return db.Event.event 1a
280 elif self == self.resource: 280 ↛ 283line 280 didn't jump to line 283 because the condition on line 280 was always true1a
281 return db.Event.resource_id 1a
282 else:
283 raise NotImplementedError()
285 def _database_label_expression( 1c
286 self,
287 db: PrefectDBInterface,
288 time_unit: TimeUnit,
289 time_interval: float,
290 ):
291 if self == self.day: 1ab
292 # The legacy `day` Countable is just a special case of the `time` one
293 return TimeUnit.day.database_label_expression(db, 1) 1ab
294 elif self == self.time: 1a
295 return time_unit.database_label_expression(db, time_interval) 1a
296 elif self == self.event: 1a
297 return db.Event.event 1a
298 elif self == self.resource: 298 ↛ 305line 298 didn't jump to line 305 because the condition on line 298 was always true1a
299 return sa.func.coalesce( 1a
300 db.Event.resource["prefect.resource.name"].astext,
301 db.Event.resource["prefect.name"].astext,
302 db.Event.resource_id,
303 )
304 else:
305 raise NotImplementedError()