Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/counting.py: 22%
121 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import datetime 1a
4import math 1a
5from datetime import timedelta 1a
6from typing import TYPE_CHECKING, Generator 1a
7from zoneinfo import ZoneInfo 1a
9import sqlalchemy as sa 1a
10from sqlalchemy.sql.selectable import Select 1a
12from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
13from prefect.types import DateTime 1a
14from prefect.types._datetime import Duration, end_of_period, now 1a
15from prefect.utilities.collections import AutoEnum 1a
17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1a
18 from prefect.server.events.filters import EventFilter
21# The earliest possible event.occurred date in any Prefect environment is
22# 2024-04-04, so we use the Monday before that as our pivot date.
23PIVOT_DATETIME = datetime.datetime(2024, 4, 1, tzinfo=ZoneInfo("UTC")) 1a
26class InvalidEventCountParameters(ValueError): 1a
27 """Raised when the given parameters are invalid for counting events."""
29 def __init__(self, message: str): 1a
30 super().__init__(message)
31 self.message = message
34class TimeUnit(AutoEnum): 1a
35 week = AutoEnum.auto() 1a
36 day = AutoEnum.auto() 1a
37 hour = AutoEnum.auto() 1a
38 minute = AutoEnum.auto() 1a
39 second = AutoEnum.auto() 1a
41 def as_timedelta(self, interval: float) -> Duration: 1a
42 if self == self.week:
43 return Duration(days=7 * interval)
44 elif self == self.day:
45 return Duration(days=1 * interval)
46 elif self == self.hour:
47 return Duration(hours=1 * interval)
48 elif self == self.minute:
49 return Duration(minutes=1 * interval)
50 elif self == self.second:
51 return Duration(seconds=1 * interval)
52 else:
53 raise NotImplementedError()
55 def validate_buckets( 1a
56 self,
57 start_datetime: datetime.datetime,
58 end_datetime: datetime.datetime,
59 interval: float,
60 ) -> None:
61 MAX_ALLOWED_BUCKETS = 1000
63 delta = self.as_timedelta(interval)
64 start_in_utc = start_datetime.astimezone(ZoneInfo("UTC"))
65 end_in_utc = end_datetime.astimezone(ZoneInfo("UTC"))
67 if interval < 0.01:
68 raise InvalidEventCountParameters("The minimum interval is 0.01")
70 number_of_buckets = math.ceil((end_in_utc - start_in_utc) / delta)
71 if number_of_buckets > MAX_ALLOWED_BUCKETS:
72 raise InvalidEventCountParameters(
73 f"The given interval would create {number_of_buckets} buckets, "
74 "which is too many. Please increase the interval or reduce the "
75 f"time range to produce {MAX_ALLOWED_BUCKETS} buckets or fewer."
76 )
78 def get_interval_spans( 1a
79 self,
80 start_datetime: datetime.datetime,
81 end_datetime: datetime.datetime,
82 interval: float,
83 ) -> Generator[int | tuple[datetime.datetime, datetime.datetime], None, None]:
84 """Divide the given range of dates into evenly-sized spans of interval units"""
85 self.validate_buckets(start_datetime, end_datetime, interval)
87 # Our universe began on PIVOT_DATETIME and all time after that is
88 # divided into `delta`-sized buckets. We want to find the bucket that
89 # contains `start_datetime` and then find the all of the buckets
90 # that come after it until the bucket that contains `end_datetime`.
92 delta = self.as_timedelta(interval)
93 start_in_utc = start_datetime.astimezone(ZoneInfo("UTC"))
94 end_in_utc = end_datetime.astimezone(ZoneInfo("UTC"))
96 if end_in_utc > now("UTC"):
97 end_in_utc = end_of_period(now("UTC"), self.value)
99 first_span_index = math.floor((start_in_utc - PIVOT_DATETIME) / delta)
101 yield first_span_index
103 span_start = PIVOT_DATETIME + delta * first_span_index
105 while span_start < end_in_utc:
106 next_span_start = span_start + delta
107 yield (span_start, next_span_start - timedelta(microseconds=1))
108 span_start = next_span_start
110 def database_value_expression(self, time_interval: float) -> sa.Cast[str]: 1a
111 """Returns the SQL expression to place an event in a time bucket"""
112 # The date_bin function can do the bucketing for us:
113 # https://www.postgresql.org/docs/14/functions-datetime.html#FUNCTIONS-DATETIME-BIN
114 db = provide_database_interface()
115 delta = self.as_timedelta(time_interval)
116 if db.dialect.name == "postgresql":
117 return sa.cast(
118 sa.func.floor(
119 sa.extract(
120 "epoch",
121 (
122 sa.func.date_bin(delta, db.Event.occurred, PIVOT_DATETIME)
123 - PIVOT_DATETIME
124 ),
125 )
126 / delta.total_seconds(),
127 ),
128 sa.Text,
129 )
130 elif db.dialect.name == "sqlite":
131 # Convert pivot date and event date to strings formatted as seconds since the epoch
132 pivot_timestamp = sa.func.strftime(
133 "%s", PIVOT_DATETIME.strftime("%Y-%m-%d %H:%M:%S")
134 )
135 event_timestamp = sa.func.strftime("%s", db.Event.occurred)
136 seconds_since_pivot = event_timestamp - pivot_timestamp
137 # Calculate the bucket index by dividing by the interval in seconds and flooring the result
138 bucket_index = sa.func.floor(
139 sa.cast(seconds_since_pivot, sa.Integer) / delta.total_seconds()
140 )
141 return sa.cast(bucket_index, sa.Text)
142 else:
143 raise NotImplementedError(f"Dialect {db.dialect.name} is not supported.")
145 def database_label_expression( 1a
146 self, db: PrefectDBInterface, time_interval: float
147 ) -> sa.Function[str]:
148 """Returns the SQL expression to label a time bucket"""
149 time_delta = self.as_timedelta(time_interval)
150 if db.dialect.name == "postgresql":
151 # The date_bin function can do the bucketing for us:
152 # https://www.postgresql.org/docs/14/functions-datetime.html#FUNCTIONS-DATETIME-BIN
153 return sa.func.to_char(
154 sa.func.date_bin(time_delta, db.Event.occurred, PIVOT_DATETIME),
155 'YYYY-MM-DD"T"HH24:MI:SSTZH:TZM',
156 )
157 elif db.dialect.name == "sqlite":
158 # We can't use date_bin in SQLite, so we have to do the bucketing manually
159 seconds_since_epoch = sa.func.strftime("%s", db.Event.occurred)
160 # Convert the total seconds of the timedelta to a constant in SQL
161 bucket_size = time_delta.total_seconds()
162 # Perform integer division and multiplication to find the bucket start epoch using SQL functions
163 bucket_start_epoch = sa.func.cast(
164 (sa.cast(seconds_since_epoch, sa.Integer) / bucket_size) * bucket_size,
165 sa.Integer,
166 )
167 bucket_datetime = sa.func.strftime(
168 "%Y-%m-%dT%H:%M:%SZ", sa.func.datetime(bucket_start_epoch, "unixepoch")
169 )
170 return bucket_datetime
171 else:
172 raise NotImplementedError(f"Dialect {db.dialect.name} is not supported.")
175class Countable(AutoEnum): 1a
176 day = AutoEnum.auto() # `day` will be translated into an equivalent `time` 1a
177 time = AutoEnum.auto() 1a
178 event = AutoEnum.auto() 1a
179 resource = AutoEnum.auto() 1a
181 # Implementations for storage backend
183 def get_database_query( 1a
184 self,
185 filter: "EventFilter",
186 time_unit: TimeUnit,
187 time_interval: float,
188 ) -> Select[tuple[str, str, DateTime, DateTime, int]]:
189 db = provide_database_interface()
190 # The innermost SELECT pulls the matching events and groups them up by their
191 # buckets. At this point, there may be duplicate buckets for each value, since
192 # the label of the thing referred to might have changed
193 fundamental_counts = (
194 sa.select(
195 (
196 self._database_value_expression(
197 db,
198 time_unit=time_unit,
199 time_interval=time_interval,
200 ).label("value")
201 ),
202 (
203 self._database_label_expression(
204 db,
205 time_unit=time_unit,
206 time_interval=time_interval,
207 ).label("label")
208 ),
209 sa.func.max(db.Event.occurred).label("latest"),
210 sa.func.min(db.Event.occurred).label("oldest"),
211 sa.func.count().label("count"),
212 )
213 .where(sa.and_(*filter.build_where_clauses()))
214 .group_by("value", "label")
215 )
217 fundamental_counts = fundamental_counts.select_from(db.Event)
219 # An intermediate SELECT takes the fundamental counts and reprojects it with the
220 # most recent value for the labels of that bucket.
221 fundamental = fundamental_counts.subquery("fundamental_counts")
222 with_latest_labels = (
223 sa.select(
224 fundamental.c.value,
225 (
226 sa.func.first_value(fundamental.c.label).over(
227 partition_by=fundamental.c.value,
228 order_by=sa.desc(fundamental.c.latest),
229 )
230 ).label("label"),
231 fundamental.c.latest,
232 fundamental.c.oldest,
233 fundamental.c.count,
234 )
235 .select_from(fundamental)
236 .subquery("with_latest_labels")
237 )
239 # The final SELECT re-sums with the latest labels, ensuring that we get one
240 # row back for each value. This handles the final ordering as well.
241 count = sa.func.sum(with_latest_labels.c.count).label("count")
243 reaggregated = (
244 sa.select(
245 with_latest_labels.c.value.label("value"),
246 with_latest_labels.c.label.label("label"),
247 count,
248 sa.func.min(with_latest_labels.c.oldest).label("start_time"),
249 sa.func.max(with_latest_labels.c.latest).label("end_time"),
250 )
251 .select_from(with_latest_labels)
252 .group_by(with_latest_labels.c.value, with_latest_labels.c.label)
253 )
255 if self in (self.day, self.time):
256 reaggregated = reaggregated.order_by(
257 sa.asc("start_time"),
258 )
259 else:
260 reaggregated = reaggregated.order_by(
261 sa.desc(count),
262 sa.asc(with_latest_labels.c.label),
263 )
265 return reaggregated
267 def _database_value_expression( 1a
268 self,
269 db: PrefectDBInterface,
270 time_unit: TimeUnit,
271 time_interval: float,
272 ):
273 if self == self.day:
274 # The legacy `day` Countable is just a special case of the `time` one
275 return TimeUnit.day.database_value_expression(1)
276 elif self == self.time:
277 return time_unit.database_value_expression(time_interval)
278 elif self == self.event:
279 return db.Event.event
280 elif self == self.resource:
281 return db.Event.resource_id
282 else:
283 raise NotImplementedError()
285 def _database_label_expression( 1a
286 self,
287 db: PrefectDBInterface,
288 time_unit: TimeUnit,
289 time_interval: float,
290 ):
291 if self == self.day:
292 # The legacy `day` Countable is just a special case of the `time` one
293 return TimeUnit.day.database_label_expression(db, 1)
294 elif self == self.time:
295 return time_unit.database_label_expression(db, time_interval)
296 elif self == self.event:
297 return db.Event.event
298 elif self == self.resource:
299 return sa.func.coalesce(
300 db.Event.resource["prefect.resource.name"].astext,
301 db.Event.resource["prefect.name"].astext,
302 db.Event.resource_id,
303 )
304 else:
305 raise NotImplementedError()