Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/storage/__init__.py: 39%

32 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from base64 import b64decode, b64encode 1a

2import json 1a

3from typing import TYPE_CHECKING, List, Optional, Tuple 1a

4from prefect.server.events.counting import TimeUnit 1a

5from prefect.server.events.schemas.events import EventCount 1a

6 

7 

8if TYPE_CHECKING: # pragma: no cover 8 ↛ 9line 8 didn't jump to line 9 because the condition on line 8 was never true1a

9 from prefect.server.events.filters import EventFilter 

10 

11 

12INTERACTIVE_PAGE_SIZE: int = 50 1a

13 

14 

15class InvalidTokenError(ValueError): 1a

16 pass 1a

17 

18 

19def to_page_token( 1a

20 filter: "EventFilter", count: int, page_size: int, current_offset: int 

21) -> Optional[str]: 

22 if current_offset + page_size >= count: 

23 return None 

24 

25 return b64encode( 

26 json.dumps( 

27 { 

28 "filter": filter.model_dump(mode="json"), 

29 "count": count, 

30 "page_size": page_size, 

31 "offset": current_offset + page_size, 

32 } 

33 ).encode() 

34 ).decode() 

35 

36 

37def from_page_token(page_token: str) -> Tuple["EventFilter", int, int, int]: 1a

38 from prefect.server.events.filters import EventFilter 

39 

40 try: 

41 parameters = json.loads(b64decode(page_token)) 

42 except Exception: 

43 # If we can't parse the page token, this likely indicates that something was 

44 # wrong with the query parameters (perhaps truncated or otherwise manipulated). 

45 # Treat this as a request for nothing 

46 raise InvalidTokenError("Unable to parse page token") 

47 

48 return ( 

49 EventFilter.model_validate(parameters["filter"]), 

50 parameters["count"], 

51 parameters["page_size"], 

52 parameters["offset"], 

53 ) 

54 

55 

56def process_time_based_counts( 1a

57 filter: "EventFilter", 

58 time_unit: TimeUnit, 

59 time_interval: float, 

60 counts: List[EventCount], 

61) -> List[EventCount]: 

62 """ 

63 Common logic for processing time-based counts across different event backends. 

64 

65 When doing time-based counting we want to do two things: 

66 

67 1. Backfill any missing intervals with 0 counts. 

68 2. Update the start/end times that are emitted to match the beginning and 

69 end of the intervals rather than having them reflect the true max/min 

70 occurred time of the events themselves. 

71 """ 

72 

73 span_generator = time_unit.get_interval_spans( 

74 filter.occurred.since, filter.occurred.until, time_interval 

75 ) 

76 

77 spans_since_pivot = next(span_generator) 

78 assert isinstance(spans_since_pivot, int) 

79 

80 backfilled_counts = [ 

81 EventCount( 

82 value=str(i), 

83 count=0, 

84 label=start_time.isoformat(), 

85 start_time=start_time, 

86 end_time=end_time, 

87 ) 

88 for i, (start_time, end_time) in enumerate(span_generator) 

89 ] 

90 

91 for count in counts: 

92 index = int(float(count.value)) - spans_since_pivot 

93 backfilled_counts[index].count = count.count 

94 

95 return backfilled_counts 

96 

97 

98class QueryRangeTooLarge(Exception): 1a

99 pass 1a