Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/ordering/memory.py: 13%

161 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 

2 

3import asyncio 

4import logging 

5from contextlib import asynccontextmanager 

6from datetime import datetime, timedelta 

7from typing import Any, AsyncGenerator 

8from uuid import UUID 

9 

10import anyio 

11from cachetools import TTLCache 

12 

13import prefect.types._datetime 

14from prefect.logging import get_logger 

15from prefect.server.events.ordering import ( 

16 MAX_DEPTH_OF_PRECEDING_EVENT, 

17 PRECEDING_EVENT_LOOKBACK, 

18 SEEN_EXPIRATION, 

19 EventArrivedEarly, 

20 MaxDepthExceeded, 

21 event_handler, 

22) 

23from prefect.server.events.ordering import CausalOrdering as _CausalOrdering 

24from prefect.server.events.schemas.events import Event, ReceivedEvent 

25 

26logger: logging.Logger = get_logger(__name__) 

27 

28# How long we'll wait for an in-flight event to be processed for follower handling 

29IN_FLIGHT_EVENT_TIMEOUT = timedelta(seconds=8) 

30 

31 

32class EventBeingProcessed(Exception): 

33 """Indicates that an event is currently being processed and should not be processed 

34 until it is finished. This may happen due to concurrent processing.""" 

35 

36 def __init__(self, event: ReceivedEvent): 

37 self.event = event 

38 

39 

40class CausalOrdering(_CausalOrdering): 

41 # Class-level storage for different scopes 

42 _instances: dict[str, "CausalOrdering"] = {} 

43 _locks: dict[str, asyncio.Lock] = {} 

44 

45 def __new__(cls, scope: str) -> "CausalOrdering": 

46 if scope not in cls._instances: 1a

47 cls._instances[scope] = super().__new__(cls) 1a

48 return cls._instances[scope] 1a

49 

50 def __init__(self, scope: str): 

51 # Only initialize once per scope 

52 if hasattr(self, "_initialized") and self._initialized: 1a

53 return 1a

54 

55 self.scope: str = scope 1a

56 self._processing_events: set[UUID] = set() 1a

57 self._seen_events: TTLCache[UUID, bool] = TTLCache( 1a

58 maxsize=10000, ttl=SEEN_EXPIRATION.total_seconds() 

59 ) 

60 self._followers: dict[UUID, set[UUID]] = {} # leader_id -> set of follower_ids 1a

61 self._events: dict[UUID, ReceivedEvent] = {} # event_id -> event 1a

62 self._waitlist: dict[UUID, datetime] = {} # event_id -> received_time 1a

63 

64 # Each scope gets its own lock 

65 if scope not in self.__class__._locks: 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was always true1a

66 self.__class__._locks[scope] = asyncio.Lock() 1a

67 self._lock = self.__class__._locks[scope] 1a

68 

69 self._initialized = True 1a

70 

71 def clear(self) -> None: 

72 """Clear all data for this scope.""" 

73 self._processing_events.clear() 

74 self._seen_events.clear() 

75 self._followers.clear() 

76 self._events.clear() 

77 self._waitlist.clear() 

78 

79 @classmethod 

80 def clear_all_scopes(cls) -> None: 

81 """Clear all data for all scopes - useful for testing.""" 

82 for instance in cls._instances.values(): 

83 instance.clear() 

84 cls._instances.clear() 

85 cls._locks.clear() 

86 

87 async def record_event_as_processing(self, event: ReceivedEvent) -> bool: 

88 """Record that an event is being processed, returning False if already processing.""" 

89 async with self._lock: 

90 if event.id in self._processing_events: 

91 return False 

92 self._processing_events.add(event.id) 

93 return True 

94 

95 async def event_has_started_processing(self, event: UUID | Event) -> bool: 

96 event_id = event.id if isinstance(event, Event) else event 

97 async with self._lock: 

98 return event_id in self._processing_events 

99 

100 async def forget_event_is_processing(self, event: ReceivedEvent) -> None: 

101 async with self._lock: 

102 self._processing_events.discard(event.id) 

103 

104 async def event_has_been_seen(self, event: UUID | Event) -> bool: 

105 event_id = event.id if isinstance(event, Event) else event 

106 async with self._lock: 

107 return event_id in self._seen_events 

108 

109 async def record_event_as_seen(self, event: ReceivedEvent) -> None: 

110 async with self._lock: 

111 self._seen_events[event.id] = True 

112 

113 async def record_follower(self, event: ReceivedEvent) -> None: 

114 """Remember that this event is waiting on another event to arrive.""" 

115 assert event.follows 

116 

117 async with self._lock: 

118 self._events[event.id] = event 

119 if event.follows not in self._followers: 

120 self._followers[event.follows] = set() 

121 self._followers[event.follows].add(event.id) 

122 self._waitlist[event.id] = event.received 

123 

124 async def forget_follower(self, follower: ReceivedEvent) -> None: 

125 """Forget that this event is waiting on another event to arrive.""" 

126 assert follower.follows 

127 

128 async with self._lock: 

129 self._waitlist.pop(follower.id, None) 

130 if follower.follows in self._followers: 

131 self._followers[follower.follows].discard(follower.id) 

132 if not self._followers[follower.follows]: 

133 del self._followers[follower.follows] 

134 self._events.pop(follower.id, None) 

135 

136 async def get_followers(self, leader: ReceivedEvent) -> list[ReceivedEvent]: 

137 """Returns events that were waiting on this leader event to arrive.""" 

138 async with self._lock: 

139 follower_ids = self._followers.get(leader.id, set()).copy() 

140 

141 follower_events: list[ReceivedEvent] = [] 

142 for follower_id in follower_ids: 

143 if follower_id in self._events: 

144 follower_events.append(self._events[follower_id]) 

145 

146 # Sort by occurred time to maintain causal order 

147 return sorted(follower_events, key=lambda f: f.occurred) 

148 

149 async def followers_by_id(self, follower_ids: list[UUID]) -> list[ReceivedEvent]: 

150 """Returns the events with the given IDs, in the order they occurred.""" 

151 async with self._lock: 

152 follower_events = [ 

153 self._events[fid] for fid in follower_ids if fid in self._events 

154 ] 

155 

156 return sorted(follower_events, key=lambda f: f.occurred) 

157 

158 async def get_lost_followers(self) -> list[ReceivedEvent]: 

159 """Returns events that were waiting on a leader event that never arrived.""" 

160 cutoff_time = prefect.types._datetime.now("UTC") - PRECEDING_EVENT_LOOKBACK 1a

161 

162 async with self._lock: 1a

163 lost_ids = [ 1a

164 event_id 

165 for event_id, received_time in self._waitlist.items() 

166 if received_time <= cutoff_time 

167 ] 

168 

169 # Remove lost followers from our tracking 

170 lost_events: list[ReceivedEvent] = [] 1a

171 for event_id in lost_ids: 171 ↛ 172line 171 didn't jump to line 172 because the loop on line 171 never started1a

172 if event_id in self._events: 

173 event = self._events[event_id] 

174 lost_events.append(event) 

175 

176 # Clean up tracking for this lost event 

177 if event.follows and event.follows in self._followers: 

178 self._followers[event.follows].discard(event_id) 

179 if not self._followers[event.follows]: 

180 del self._followers[event.follows] 

181 

182 del self._events[event_id] 

183 del self._waitlist[event_id] 

184 

185 return sorted(lost_events, key=lambda f: f.occurred) 1a

186 

187 @asynccontextmanager 

188 async def event_is_processing( 

189 self, event: ReceivedEvent 

190 ) -> AsyncGenerator[None, None]: 

191 """Mark an event as being processed for the duration of its lifespan through 

192 the ordering system.""" 

193 if not await self.record_event_as_processing(event): 

194 self._log(event, "is already being processed") 

195 raise EventBeingProcessed(event) 

196 

197 try: 

198 yield 

199 await self.record_event_as_seen(event) 

200 finally: 

201 await self.forget_event_is_processing(event) 

202 

203 async def wait_for_leader(self, event: ReceivedEvent) -> None: 

204 """Given an event, wait for its leader to be processed before proceeding, or 

205 raise EventArrivedEarly if we would wait too long in this attempt.""" 

206 # If this event doesn't follow anything (meaningfully), it's ready to go now 

207 if not event.follows or event.follows == event.id: 

208 return 

209 

210 # If this is an old event, we won't have accurate bookkeeping for its leader 

211 # so we're just going to send it 

212 age = prefect.types._datetime.now("UTC") - event.received 

213 if age >= PRECEDING_EVENT_LOOKBACK: 

214 return 

215 

216 # If the leader has already been seen, keep on trucking 

217 if await self.event_has_been_seen(event.follows): 

218 return 

219 

220 # Check if the leader is currently being processed, and if so, poll until it's 

221 # done being processed as a quicker alternative to sitting on the waitlist 

222 if await self.event_has_started_processing(event.follows): 

223 try: 

224 with anyio.fail_after(IN_FLIGHT_EVENT_TIMEOUT.total_seconds()): 

225 while not await self.event_has_been_seen(event.follows): 

226 await asyncio.sleep(0.25) 

227 return 

228 except asyncio.TimeoutError: 

229 self._log( 

230 event, 

231 "timed out waiting for its in-flight leader %s, will treat as lost", 

232 event.follows, 

233 ) 

234 

235 # Otherwise, we'll stop processing now and sit on the waitlist until the leader 

236 # eventually comes through the system 

237 self._log(event, "arrived before the event it follows %s", event.follows) 

238 

239 await self.record_follower(event) 

240 raise EventArrivedEarly(event) 

241 

242 @asynccontextmanager 

243 async def preceding_event_confirmed( 

244 self, 

245 handler: event_handler, 

246 event: ReceivedEvent, 

247 depth: int = 0, 

248 ) -> AsyncGenerator[None, None]: 

249 """ 

250 Events may optionally declare that they logically follow another event, so that 

251 we can preserve important event orderings in the face of unreliable delivery and 

252 ordering of messages from the queues. 

253 

254 This function keeps track of the ID of each event that this shard has 

255 successfully processed going back to the PRECEDING_EVENT_LOOKBACK period. If an 

256 event arrives that must follow another one, confirm that we have recently seen 

257 and processed that event before proceeding. 

258 

259 Args: 

260 handler: The function to call when an out-of-order event is 

261 ready to be processed 

262 event: The event to be processed. This object should include 

263 metadata indicating if and what event it follows. 

264 depth: The current recursion depth, used to prevent infinite 

265 recursion due to cyclic dependencies between events. Defaults to 0. 

266 

267 Raises EventArrivedEarly if the current event shouldn't be processed yet. 

268 """ 

269 if depth > MAX_DEPTH_OF_PRECEDING_EVENT: 

270 # There is either a cyclic set of events or a chain 

271 # of events that is too long 

272 self._log( 

273 event, 

274 "has exceeded the maximum recursion depth %s", 

275 MAX_DEPTH_OF_PRECEDING_EVENT, 

276 ) 

277 raise MaxDepthExceeded(event) 

278 

279 async with self.event_is_processing(event): 

280 await self.wait_for_leader(event) 

281 yield 

282 

283 # We have just processed an event that other events may have been waiting 

284 # on, so let's react to them now in the order they occurred 

285 try: 

286 for waiter in await self.get_followers(event): 

287 await handler(waiter, depth=depth + 1) 

288 except MaxDepthExceeded: 

289 # We'll only process the first MAX_DEPTH_OF_PRECEDING_EVENT followers. 

290 # If we hit this limit, we'll just log and move on. 

291 self._log( 

292 event, 

293 "reached its max depth of %s followers processed.", 

294 MAX_DEPTH_OF_PRECEDING_EVENT, 

295 ) 

296 

297 # If this event was itself waiting on a leader, let's consider it as 

298 # resolved now that it has been processed 

299 if event.follows and event.follows != event.id: 

300 await self.forget_follower(event) 

301 

302 def _log(self, event: ReceivedEvent, message: str, *args: Any) -> None: 

303 logger.info( 

304 "Event %r (%s) for %r " + message, 

305 event.event, 

306 event.id, 

307 event.resource.id, 

308 *args, 

309 extra={ 

310 "event_id": event.id, 

311 "follows": event.follows, 

312 "resource_id": event.resource.id, 

313 }, 

314 )