Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/ordering/memory.py: 13%
161 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations
3import asyncio
4import logging
5from contextlib import asynccontextmanager
6from datetime import datetime, timedelta
7from typing import Any, AsyncGenerator
8from uuid import UUID
10import anyio
11from cachetools import TTLCache
13import prefect.types._datetime
14from prefect.logging import get_logger
15from prefect.server.events.ordering import (
16 MAX_DEPTH_OF_PRECEDING_EVENT,
17 PRECEDING_EVENT_LOOKBACK,
18 SEEN_EXPIRATION,
19 EventArrivedEarly,
20 MaxDepthExceeded,
21 event_handler,
22)
23from prefect.server.events.ordering import CausalOrdering as _CausalOrdering
24from prefect.server.events.schemas.events import Event, ReceivedEvent
26logger: logging.Logger = get_logger(__name__)
28# How long we'll wait for an in-flight event to be processed for follower handling
29IN_FLIGHT_EVENT_TIMEOUT = timedelta(seconds=8)
32class EventBeingProcessed(Exception):
33 """Indicates that an event is currently being processed and should not be processed
34 until it is finished. This may happen due to concurrent processing."""
36 def __init__(self, event: ReceivedEvent):
37 self.event = event
40class CausalOrdering(_CausalOrdering):
41 # Class-level storage for different scopes
42 _instances: dict[str, "CausalOrdering"] = {}
43 _locks: dict[str, asyncio.Lock] = {}
45 def __new__(cls, scope: str) -> "CausalOrdering":
46 if scope not in cls._instances: 1a
47 cls._instances[scope] = super().__new__(cls) 1a
48 return cls._instances[scope] 1a
50 def __init__(self, scope: str):
51 # Only initialize once per scope
52 if hasattr(self, "_initialized") and self._initialized: 1a
53 return 1a
55 self.scope: str = scope 1a
56 self._processing_events: set[UUID] = set() 1a
57 self._seen_events: TTLCache[UUID, bool] = TTLCache( 1a
58 maxsize=10000, ttl=SEEN_EXPIRATION.total_seconds()
59 )
60 self._followers: dict[UUID, set[UUID]] = {} # leader_id -> set of follower_ids 1a
61 self._events: dict[UUID, ReceivedEvent] = {} # event_id -> event 1a
62 self._waitlist: dict[UUID, datetime] = {} # event_id -> received_time 1a
64 # Each scope gets its own lock
65 if scope not in self.__class__._locks: 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was always true1a
66 self.__class__._locks[scope] = asyncio.Lock() 1a
67 self._lock = self.__class__._locks[scope] 1a
69 self._initialized = True 1a
71 def clear(self) -> None:
72 """Clear all data for this scope."""
73 self._processing_events.clear()
74 self._seen_events.clear()
75 self._followers.clear()
76 self._events.clear()
77 self._waitlist.clear()
79 @classmethod
80 def clear_all_scopes(cls) -> None:
81 """Clear all data for all scopes - useful for testing."""
82 for instance in cls._instances.values():
83 instance.clear()
84 cls._instances.clear()
85 cls._locks.clear()
87 async def record_event_as_processing(self, event: ReceivedEvent) -> bool:
88 """Record that an event is being processed, returning False if already processing."""
89 async with self._lock:
90 if event.id in self._processing_events:
91 return False
92 self._processing_events.add(event.id)
93 return True
95 async def event_has_started_processing(self, event: UUID | Event) -> bool:
96 event_id = event.id if isinstance(event, Event) else event
97 async with self._lock:
98 return event_id in self._processing_events
100 async def forget_event_is_processing(self, event: ReceivedEvent) -> None:
101 async with self._lock:
102 self._processing_events.discard(event.id)
104 async def event_has_been_seen(self, event: UUID | Event) -> bool:
105 event_id = event.id if isinstance(event, Event) else event
106 async with self._lock:
107 return event_id in self._seen_events
109 async def record_event_as_seen(self, event: ReceivedEvent) -> None:
110 async with self._lock:
111 self._seen_events[event.id] = True
113 async def record_follower(self, event: ReceivedEvent) -> None:
114 """Remember that this event is waiting on another event to arrive."""
115 assert event.follows
117 async with self._lock:
118 self._events[event.id] = event
119 if event.follows not in self._followers:
120 self._followers[event.follows] = set()
121 self._followers[event.follows].add(event.id)
122 self._waitlist[event.id] = event.received
124 async def forget_follower(self, follower: ReceivedEvent) -> None:
125 """Forget that this event is waiting on another event to arrive."""
126 assert follower.follows
128 async with self._lock:
129 self._waitlist.pop(follower.id, None)
130 if follower.follows in self._followers:
131 self._followers[follower.follows].discard(follower.id)
132 if not self._followers[follower.follows]:
133 del self._followers[follower.follows]
134 self._events.pop(follower.id, None)
136 async def get_followers(self, leader: ReceivedEvent) -> list[ReceivedEvent]:
137 """Returns events that were waiting on this leader event to arrive."""
138 async with self._lock:
139 follower_ids = self._followers.get(leader.id, set()).copy()
141 follower_events: list[ReceivedEvent] = []
142 for follower_id in follower_ids:
143 if follower_id in self._events:
144 follower_events.append(self._events[follower_id])
146 # Sort by occurred time to maintain causal order
147 return sorted(follower_events, key=lambda f: f.occurred)
149 async def followers_by_id(self, follower_ids: list[UUID]) -> list[ReceivedEvent]:
150 """Returns the events with the given IDs, in the order they occurred."""
151 async with self._lock:
152 follower_events = [
153 self._events[fid] for fid in follower_ids if fid in self._events
154 ]
156 return sorted(follower_events, key=lambda f: f.occurred)
158 async def get_lost_followers(self) -> list[ReceivedEvent]:
159 """Returns events that were waiting on a leader event that never arrived."""
160 cutoff_time = prefect.types._datetime.now("UTC") - PRECEDING_EVENT_LOOKBACK 1a
162 async with self._lock: 1a
163 lost_ids = [ 1a
164 event_id
165 for event_id, received_time in self._waitlist.items()
166 if received_time <= cutoff_time
167 ]
169 # Remove lost followers from our tracking
170 lost_events: list[ReceivedEvent] = [] 1a
171 for event_id in lost_ids: 171 ↛ 172line 171 didn't jump to line 172 because the loop on line 171 never started1a
172 if event_id in self._events:
173 event = self._events[event_id]
174 lost_events.append(event)
176 # Clean up tracking for this lost event
177 if event.follows and event.follows in self._followers:
178 self._followers[event.follows].discard(event_id)
179 if not self._followers[event.follows]:
180 del self._followers[event.follows]
182 del self._events[event_id]
183 del self._waitlist[event_id]
185 return sorted(lost_events, key=lambda f: f.occurred) 1a
187 @asynccontextmanager
188 async def event_is_processing(
189 self, event: ReceivedEvent
190 ) -> AsyncGenerator[None, None]:
191 """Mark an event as being processed for the duration of its lifespan through
192 the ordering system."""
193 if not await self.record_event_as_processing(event):
194 self._log(event, "is already being processed")
195 raise EventBeingProcessed(event)
197 try:
198 yield
199 await self.record_event_as_seen(event)
200 finally:
201 await self.forget_event_is_processing(event)
203 async def wait_for_leader(self, event: ReceivedEvent) -> None:
204 """Given an event, wait for its leader to be processed before proceeding, or
205 raise EventArrivedEarly if we would wait too long in this attempt."""
206 # If this event doesn't follow anything (meaningfully), it's ready to go now
207 if not event.follows or event.follows == event.id:
208 return
210 # If this is an old event, we won't have accurate bookkeeping for its leader
211 # so we're just going to send it
212 age = prefect.types._datetime.now("UTC") - event.received
213 if age >= PRECEDING_EVENT_LOOKBACK:
214 return
216 # If the leader has already been seen, keep on trucking
217 if await self.event_has_been_seen(event.follows):
218 return
220 # Check if the leader is currently being processed, and if so, poll until it's
221 # done being processed as a quicker alternative to sitting on the waitlist
222 if await self.event_has_started_processing(event.follows):
223 try:
224 with anyio.fail_after(IN_FLIGHT_EVENT_TIMEOUT.total_seconds()):
225 while not await self.event_has_been_seen(event.follows):
226 await asyncio.sleep(0.25)
227 return
228 except asyncio.TimeoutError:
229 self._log(
230 event,
231 "timed out waiting for its in-flight leader %s, will treat as lost",
232 event.follows,
233 )
235 # Otherwise, we'll stop processing now and sit on the waitlist until the leader
236 # eventually comes through the system
237 self._log(event, "arrived before the event it follows %s", event.follows)
239 await self.record_follower(event)
240 raise EventArrivedEarly(event)
242 @asynccontextmanager
243 async def preceding_event_confirmed(
244 self,
245 handler: event_handler,
246 event: ReceivedEvent,
247 depth: int = 0,
248 ) -> AsyncGenerator[None, None]:
249 """
250 Events may optionally declare that they logically follow another event, so that
251 we can preserve important event orderings in the face of unreliable delivery and
252 ordering of messages from the queues.
254 This function keeps track of the ID of each event that this shard has
255 successfully processed going back to the PRECEDING_EVENT_LOOKBACK period. If an
256 event arrives that must follow another one, confirm that we have recently seen
257 and processed that event before proceeding.
259 Args:
260 handler: The function to call when an out-of-order event is
261 ready to be processed
262 event: The event to be processed. This object should include
263 metadata indicating if and what event it follows.
264 depth: The current recursion depth, used to prevent infinite
265 recursion due to cyclic dependencies between events. Defaults to 0.
267 Raises EventArrivedEarly if the current event shouldn't be processed yet.
268 """
269 if depth > MAX_DEPTH_OF_PRECEDING_EVENT:
270 # There is either a cyclic set of events or a chain
271 # of events that is too long
272 self._log(
273 event,
274 "has exceeded the maximum recursion depth %s",
275 MAX_DEPTH_OF_PRECEDING_EVENT,
276 )
277 raise MaxDepthExceeded(event)
279 async with self.event_is_processing(event):
280 await self.wait_for_leader(event)
281 yield
283 # We have just processed an event that other events may have been waiting
284 # on, so let's react to them now in the order they occurred
285 try:
286 for waiter in await self.get_followers(event):
287 await handler(waiter, depth=depth + 1)
288 except MaxDepthExceeded:
289 # We'll only process the first MAX_DEPTH_OF_PRECEDING_EVENT followers.
290 # If we hit this limit, we'll just log and move on.
291 self._log(
292 event,
293 "reached its max depth of %s followers processed.",
294 MAX_DEPTH_OF_PRECEDING_EVENT,
295 )
297 # If this event was itself waiting on a leader, let's consider it as
298 # resolved now that it has been processed
299 if event.follows and event.follows != event.id:
300 await self.forget_follower(event)
302 def _log(self, event: ReceivedEvent, message: str, *args: Any) -> None:
303 logger.info(
304 "Event %r (%s) for %r " + message,
305 event.event,
306 event.id,
307 event.resource.id,
308 *args,
309 extra={
310 "event_id": event.id,
311 "follows": event.follows,
312 "resource_id": event.resource.id,
313 },
314 )