Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/ordering/db.py: 0%

69 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from collections import defaultdict 

2from contextlib import asynccontextmanager 

3from typing import ( 

4 TYPE_CHECKING, 

5 List, 

6 Mapping, 

7 MutableMapping, 

8 Union, 

9) 

10from uuid import UUID 

11 

12import sqlalchemy as sa 

13from cachetools import TTLCache 

14 

15import prefect.types._datetime 

16from prefect.logging import get_logger 

17from prefect.server.database import PrefectDBInterface, db_injector 

18from prefect.server.events.ordering import ( 

19 MAX_DEPTH_OF_PRECEDING_EVENT, 

20 PRECEDING_EVENT_LOOKBACK, 

21 SEEN_EXPIRATION, 

22 EventArrivedEarly, 

23 MaxDepthExceeded, 

24 event_handler, 

25) 

26from prefect.server.events.ordering import CausalOrdering as _CausalOrdering 

27from prefect.server.events.schemas.events import Event, ReceivedEvent 

28 

29if TYPE_CHECKING: 

30 import logging 

31 

32logger: "logging.Logger" = get_logger(__name__) 

33 

34 

35class CausalOrdering(_CausalOrdering): 

36 _seen_events: Mapping[str, MutableMapping[UUID, bool]] = defaultdict( 

37 lambda: TTLCache(maxsize=10000, ttl=SEEN_EXPIRATION.total_seconds()) 

38 ) 

39 

40 scope: str 

41 

42 def __init__(self, scope: str): 

43 self.scope = scope 

44 

45 async def event_has_been_seen(self, event: Union[UUID, Event]) -> bool: 

46 id = event.id if isinstance(event, Event) else event 

47 return self._seen_events[self.scope].get(id, False) 

48 

49 async def record_event_as_seen(self, event: ReceivedEvent) -> None: 

50 self._seen_events[self.scope][event.id] = True 

51 

52 @db_injector 

53 async def record_follower( 

54 self, db: PrefectDBInterface, event: ReceivedEvent 

55 ) -> None: 

56 """Remember that this event is waiting on another event to arrive""" 

57 assert event.follows 

58 

59 async with db.session_context(begin_transaction=True) as session: 

60 await session.execute( 

61 sa.insert(db.AutomationEventFollower).values( 

62 scope=self.scope, 

63 leader_event_id=event.follows, 

64 follower_event_id=event.id, 

65 received=event.received, 

66 follower=event, 

67 ) 

68 ) 

69 

70 @db_injector 

71 async def forget_follower( 

72 self, db: PrefectDBInterface, follower: ReceivedEvent 

73 ) -> None: 

74 """Forget that this event is waiting on another event to arrive""" 

75 assert follower.follows 

76 

77 async with db.session_context(begin_transaction=True) as session: 

78 await session.execute( 

79 sa.delete(db.AutomationEventFollower).where( 

80 db.AutomationEventFollower.scope == self.scope, 

81 db.AutomationEventFollower.follower_event_id == follower.id, 

82 ) 

83 ) 

84 

85 @db_injector 

86 async def get_followers( 

87 self, db: PrefectDBInterface, leader: ReceivedEvent 

88 ) -> List[ReceivedEvent]: 

89 """Returns events that were waiting on this leader event to arrive""" 

90 async with db.session_context() as session: 

91 query = sa.select(db.AutomationEventFollower.follower).where( 

92 db.AutomationEventFollower.scope == self.scope, 

93 db.AutomationEventFollower.leader_event_id == leader.id, 

94 ) 

95 result = await session.execute(query) 

96 followers = result.scalars().all() 

97 return sorted(followers, key=lambda e: e.occurred) 

98 

99 @db_injector 

100 async def get_lost_followers(self, db: PrefectDBInterface) -> List[ReceivedEvent]: 

101 """Returns events that were waiting on a leader event that never arrived""" 

102 earlier = prefect.types._datetime.now("UTC") - PRECEDING_EVENT_LOOKBACK 

103 

104 async with db.session_context(begin_transaction=True) as session: 

105 query = sa.select(db.AutomationEventFollower.follower).where( 

106 db.AutomationEventFollower.scope == self.scope, 

107 db.AutomationEventFollower.received < earlier, 

108 ) 

109 result = await session.execute(query) 

110 followers = result.scalars().all() 

111 

112 # forget these followers, since they are never going to see their leader event 

113 

114 await session.execute( 

115 sa.delete(db.AutomationEventFollower).where( 

116 db.AutomationEventFollower.scope == self.scope, 

117 db.AutomationEventFollower.received < earlier, 

118 ) 

119 ) 

120 

121 return sorted(followers, key=lambda e: e.occurred) 

122 

123 @asynccontextmanager 

124 async def preceding_event_confirmed( 

125 self, handler: event_handler, event: ReceivedEvent, depth: int = 0 

126 ): 

127 """Events may optionally declare that they logically follow another event, so that 

128 we can preserve important event orderings in the face of unreliable delivery and 

129 ordering of messages from the queues. 

130 

131 This function keeps track of the ID of each event that this shard has successfully 

132 processed going back to the PRECEDING_EVENT_LOOKBACK period. If an event arrives 

133 that must follow another one, confirm that we have recently seen and processed that 

134 event before proceeding. 

135 

136 Args: 

137 event (ReceivedEvent): The event to be processed. This object should include metadata indicating 

138 if and what event it follows. 

139 depth (int, optional): The current recursion depth, used to prevent infinite recursion due to 

140 cyclic dependencies between events. Defaults to 0. 

141 

142 

143 Raises EventArrivedEarly if the current event shouldn't be processed yet.""" 

144 

145 if depth > MAX_DEPTH_OF_PRECEDING_EVENT: 

146 logger.exception( 

147 "Event %r (%s) for %r has exceeded the maximum recursion depth of %s", 

148 event.event, 

149 event.id, 

150 event.resource.id, 

151 MAX_DEPTH_OF_PRECEDING_EVENT, 

152 ) 

153 raise MaxDepthExceeded(event) 

154 

155 if event.follows: 

156 if not await self.event_has_been_seen(event.follows): 

157 age = prefect.types._datetime.now("UTC") - event.received 

158 if age < PRECEDING_EVENT_LOOKBACK: 

159 logger.debug( 

160 "Event %r (%s) for %r arrived before the event it follows %s", 

161 event.event, 

162 event.id, 

163 event.resource.id, 

164 event.follows, 

165 ) 

166 

167 # record this follower for safe-keeping 

168 await self.record_follower(event) 

169 raise EventArrivedEarly(event) 

170 

171 yield 

172 

173 await self.record_event_as_seen(event) 

174 

175 # we have just processed an event that other events were waiting on, so let's 

176 # react to them now in the order they occurred 

177 for waiter in await self.get_followers(event): 

178 await handler(waiter, depth + 1) 

179 

180 # if this event was itself waiting on something, let's consider it as resolved now 

181 # that it has been processed 

182 if event.follows: 

183 await self.forget_follower(event)