Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/storage/__init__.py: 39%
32 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from base64 import b64decode, b64encode 1a
2import json 1a
3from typing import TYPE_CHECKING, List, Optional, Tuple 1a
4from prefect.server.events.counting import TimeUnit 1a
5from prefect.server.events.schemas.events import EventCount 1a
8if TYPE_CHECKING: # pragma: no cover 8 ↛ 9line 8 didn't jump to line 9 because the condition on line 8 was never true1a
9 from prefect.server.events.filters import EventFilter
12INTERACTIVE_PAGE_SIZE: int = 50 1a
15class InvalidTokenError(ValueError): 1a
16 pass 1a
19def to_page_token( 1a
20 filter: "EventFilter", count: int, page_size: int, current_offset: int
21) -> Optional[str]:
22 if current_offset + page_size >= count:
23 return None
25 return b64encode(
26 json.dumps(
27 {
28 "filter": filter.model_dump(mode="json"),
29 "count": count,
30 "page_size": page_size,
31 "offset": current_offset + page_size,
32 }
33 ).encode()
34 ).decode()
37def from_page_token(page_token: str) -> Tuple["EventFilter", int, int, int]: 1a
38 from prefect.server.events.filters import EventFilter
40 try:
41 parameters = json.loads(b64decode(page_token))
42 except Exception:
43 # If we can't parse the page token, this likely indicates that something was
44 # wrong with the query parameters (perhaps truncated or otherwise manipulated).
45 # Treat this as a request for nothing
46 raise InvalidTokenError("Unable to parse page token")
48 return (
49 EventFilter.model_validate(parameters["filter"]),
50 parameters["count"],
51 parameters["page_size"],
52 parameters["offset"],
53 )
56def process_time_based_counts( 1a
57 filter: "EventFilter",
58 time_unit: TimeUnit,
59 time_interval: float,
60 counts: List[EventCount],
61) -> List[EventCount]:
62 """
63 Common logic for processing time-based counts across different event backends.
65 When doing time-based counting we want to do two things:
67 1. Backfill any missing intervals with 0 counts.
68 2. Update the start/end times that are emitted to match the beginning and
69 end of the intervals rather than having them reflect the true max/min
70 occurred time of the events themselves.
71 """
73 span_generator = time_unit.get_interval_spans(
74 filter.occurred.since, filter.occurred.until, time_interval
75 )
77 spans_since_pivot = next(span_generator)
78 assert isinstance(spans_since_pivot, int)
80 backfilled_counts = [
81 EventCount(
82 value=str(i),
83 count=0,
84 label=start_time.isoformat(),
85 start_time=start_time,
86 end_time=end_time,
87 )
88 for i, (start_time, end_time) in enumerate(span_generator)
89 ]
91 for count in counts:
92 index = int(float(count.value)) - spans_since_pivot
93 backfilled_counts[index].count = count.count
95 return backfilled_counts
98class QueryRangeTooLarge(Exception): 1a
99 pass 1a