Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/ordering/memory.py: 44%
161 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations
3import asyncio
4import logging
5from contextlib import asynccontextmanager
6from datetime import datetime, timedelta
7from typing import Any, AsyncGenerator
8from uuid import UUID
10import anyio
11from cachetools import TTLCache
13import prefect.types._datetime
14from prefect.logging import get_logger
15from prefect.server.events.ordering import (
16 MAX_DEPTH_OF_PRECEDING_EVENT,
17 PRECEDING_EVENT_LOOKBACK,
18 SEEN_EXPIRATION,
19 EventArrivedEarly,
20 MaxDepthExceeded,
21 event_handler,
22)
23from prefect.server.events.ordering import CausalOrdering as _CausalOrdering
24from prefect.server.events.schemas.events import Event, ReceivedEvent
26logger: logging.Logger = get_logger(__name__)
28# How long we'll wait for an in-flight event to be processed for follower handling
29IN_FLIGHT_EVENT_TIMEOUT = timedelta(seconds=8)
32class EventBeingProcessed(Exception):
33 """Indicates that an event is currently being processed and should not be processed
34 until it is finished. This may happen due to concurrent processing."""
36 def __init__(self, event: ReceivedEvent):
37 self.event = event
40class CausalOrdering(_CausalOrdering):
41 # Class-level storage for different scopes
42 _instances: dict[str, "CausalOrdering"] = {}
43 _locks: dict[str, asyncio.Lock] = {}
45 def __new__(cls, scope: str) -> "CausalOrdering":
46 if scope not in cls._instances: 1edafcb
47 cls._instances[scope] = super().__new__(cls) 1e
48 return cls._instances[scope] 1edafcb
50 def __init__(self, scope: str):
51 # Only initialize once per scope
52 if hasattr(self, "_initialized") and self._initialized: 1edafcb
53 return 1edafcb
55 self.scope: str = scope 1e
56 self._processing_events: set[UUID] = set() 1e
57 self._seen_events: TTLCache[UUID, bool] = TTLCache( 1e
58 maxsize=10000, ttl=SEEN_EXPIRATION.total_seconds()
59 )
60 self._followers: dict[UUID, set[UUID]] = {} # leader_id -> set of follower_ids 1e
61 self._events: dict[UUID, ReceivedEvent] = {} # event_id -> event 1e
62 self._waitlist: dict[UUID, datetime] = {} # event_id -> received_time 1e
64 # Each scope gets its own lock
65 if scope not in self.__class__._locks: 65 ↛ 67line 65 didn't jump to line 67 because the condition on line 65 was always true1e
66 self.__class__._locks[scope] = asyncio.Lock() 1e
67 self._lock = self.__class__._locks[scope] 1e
69 self._initialized = True 1e
71 def clear(self) -> None:
72 """Clear all data for this scope."""
73 self._processing_events.clear()
74 self._seen_events.clear()
75 self._followers.clear()
76 self._events.clear()
77 self._waitlist.clear()
79 @classmethod
80 def clear_all_scopes(cls) -> None:
81 """Clear all data for all scopes - useful for testing."""
82 for instance in cls._instances.values():
83 instance.clear()
84 cls._instances.clear()
85 cls._locks.clear()
87 async def record_event_as_processing(self, event: ReceivedEvent) -> bool:
88 """Record that an event is being processed, returning False if already processing."""
89 async with self._lock: 1dacb
90 if event.id in self._processing_events: 90 ↛ 91line 90 didn't jump to line 91 because the condition on line 90 was never true1dacb
91 return False
92 self._processing_events.add(event.id) 1dacb
93 return True 1dacb
95 async def event_has_started_processing(self, event: UUID | Event) -> bool:
96 event_id = event.id if isinstance(event, Event) else event 1ab
97 async with self._lock: 1ab
98 return event_id in self._processing_events 1ab
100 async def forget_event_is_processing(self, event: ReceivedEvent) -> None:
101 async with self._lock: 1dacb
102 self._processing_events.discard(event.id) 1dacb
104 async def event_has_been_seen(self, event: UUID | Event) -> bool:
105 event_id = event.id if isinstance(event, Event) else event 1dafcb
106 async with self._lock: 1dafcb
107 return event_id in self._seen_events 1dafcb
109 async def record_event_as_seen(self, event: ReceivedEvent) -> None:
110 async with self._lock: 1dacb
111 self._seen_events[event.id] = True 1dacb
113 async def record_follower(self, event: ReceivedEvent) -> None:
114 """Remember that this event is waiting on another event to arrive."""
115 assert event.follows
117 async with self._lock:
118 self._events[event.id] = event
119 if event.follows not in self._followers:
120 self._followers[event.follows] = set()
121 self._followers[event.follows].add(event.id)
122 self._waitlist[event.id] = event.received
124 async def forget_follower(self, follower: ReceivedEvent) -> None:
125 """Forget that this event is waiting on another event to arrive."""
126 assert follower.follows 1acb
128 async with self._lock: 1acb
129 self._waitlist.pop(follower.id, None) 1acb
130 if follower.follows in self._followers: 130 ↛ 131line 130 didn't jump to line 131 because the condition on line 130 was never true1acb
131 self._followers[follower.follows].discard(follower.id)
132 if not self._followers[follower.follows]:
133 del self._followers[follower.follows]
134 self._events.pop(follower.id, None) 1acb
136 async def get_followers(self, leader: ReceivedEvent) -> list[ReceivedEvent]:
137 """Returns events that were waiting on this leader event to arrive."""
138 async with self._lock: 1dacb
139 follower_ids = self._followers.get(leader.id, set()).copy() 1dacb
141 follower_events: list[ReceivedEvent] = [] 1dacb
142 for follower_id in follower_ids: 142 ↛ 143line 142 didn't jump to line 143 because the loop on line 142 never started1dacb
143 if follower_id in self._events:
144 follower_events.append(self._events[follower_id])
146 # Sort by occurred time to maintain causal order
147 return sorted(follower_events, key=lambda f: f.occurred) 1dacb
149 async def followers_by_id(self, follower_ids: list[UUID]) -> list[ReceivedEvent]:
150 """Returns the events with the given IDs, in the order they occurred."""
151 async with self._lock:
152 follower_events = [
153 self._events[fid] for fid in follower_ids if fid in self._events
154 ]
156 return sorted(follower_events, key=lambda f: f.occurred)
158 async def get_lost_followers(self) -> list[ReceivedEvent]:
159 """Returns events that were waiting on a leader event that never arrived."""
160 cutoff_time = prefect.types._datetime.now("UTC") - PRECEDING_EVENT_LOOKBACK 1edafb
162 async with self._lock: 1edafb
163 lost_ids = [ 1edafb
164 event_id
165 for event_id, received_time in self._waitlist.items()
166 if received_time <= cutoff_time
167 ]
169 # Remove lost followers from our tracking
170 lost_events: list[ReceivedEvent] = [] 1edafb
171 for event_id in lost_ids: 171 ↛ 172line 171 didn't jump to line 172 because the loop on line 171 never started1edafb
172 if event_id in self._events:
173 event = self._events[event_id]
174 lost_events.append(event)
176 # Clean up tracking for this lost event
177 if event.follows and event.follows in self._followers:
178 self._followers[event.follows].discard(event_id)
179 if not self._followers[event.follows]:
180 del self._followers[event.follows]
182 del self._events[event_id]
183 del self._waitlist[event_id]
185 return sorted(lost_events, key=lambda f: f.occurred) 1edafb
187 @asynccontextmanager
188 async def event_is_processing(
189 self, event: ReceivedEvent
190 ) -> AsyncGenerator[None, None]:
191 """Mark an event as being processed for the duration of its lifespan through
192 the ordering system."""
193 if not await self.record_event_as_processing(event): 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true1dacb
194 self._log(event, "is already being processed")
195 raise EventBeingProcessed(event)
197 try: 1dacb
198 yield 1dacb
199 await self.record_event_as_seen(event) 1dacb
200 finally:
201 await self.forget_event_is_processing(event) 1dacb
203 async def wait_for_leader(self, event: ReceivedEvent) -> None:
204 """Given an event, wait for its leader to be processed before proceeding, or
205 raise EventArrivedEarly if we would wait too long in this attempt."""
206 # If this event doesn't follow anything (meaningfully), it's ready to go now
207 if not event.follows or event.follows == event.id: 1dacb
208 return 1dacb
210 # If this is an old event, we won't have accurate bookkeeping for its leader
211 # so we're just going to send it
212 age = prefect.types._datetime.now("UTC") - event.received 1acb
213 if age >= PRECEDING_EVENT_LOOKBACK: 213 ↛ 214line 213 didn't jump to line 214 because the condition on line 213 was never true1acb
214 return
216 # If the leader has already been seen, keep on trucking
217 if await self.event_has_been_seen(event.follows): 1acb
218 return 1acb
220 # Check if the leader is currently being processed, and if so, poll until it's
221 # done being processed as a quicker alternative to sitting on the waitlist
222 if await self.event_has_started_processing(event.follows): 222 ↛ 237line 222 didn't jump to line 237 because the condition on line 222 was always true1ab
223 try: 1ab
224 with anyio.fail_after(IN_FLIGHT_EVENT_TIMEOUT.total_seconds()): 1acb
225 while not await self.event_has_been_seen(event.follows): 1afcb
226 await asyncio.sleep(0.25) 1afcb
227 return 1acb
228 except asyncio.TimeoutError:
229 self._log(
230 event,
231 "timed out waiting for its in-flight leader %s, will treat as lost",
232 event.follows,
233 )
235 # Otherwise, we'll stop processing now and sit on the waitlist until the leader
236 # eventually comes through the system
237 self._log(event, "arrived before the event it follows %s", event.follows)
239 await self.record_follower(event)
240 raise EventArrivedEarly(event)
242 @asynccontextmanager
243 async def preceding_event_confirmed(
244 self,
245 handler: event_handler,
246 event: ReceivedEvent,
247 depth: int = 0,
248 ) -> AsyncGenerator[None, None]:
249 """
250 Events may optionally declare that they logically follow another event, so that
251 we can preserve important event orderings in the face of unreliable delivery and
252 ordering of messages from the queues.
254 This function keeps track of the ID of each event that this shard has
255 successfully processed going back to the PRECEDING_EVENT_LOOKBACK period. If an
256 event arrives that must follow another one, confirm that we have recently seen
257 and processed that event before proceeding.
259 Args:
260 handler: The function to call when an out-of-order event is
261 ready to be processed
262 event: The event to be processed. This object should include
263 metadata indicating if and what event it follows.
264 depth: The current recursion depth, used to prevent infinite
265 recursion due to cyclic dependencies between events. Defaults to 0.
267 Raises EventArrivedEarly if the current event shouldn't be processed yet.
268 """
269 if depth > MAX_DEPTH_OF_PRECEDING_EVENT: 269 ↛ 272line 269 didn't jump to line 272 because the condition on line 269 was never true1dacb
270 # There is either a cyclic set of events or a chain
271 # of events that is too long
272 self._log(
273 event,
274 "has exceeded the maximum recursion depth %s",
275 MAX_DEPTH_OF_PRECEDING_EVENT,
276 )
277 raise MaxDepthExceeded(event)
279 async with self.event_is_processing(event): 1dacb
280 await self.wait_for_leader(event) 1dacb
281 yield 1dacb
283 # We have just processed an event that other events may have been waiting
284 # on, so let's react to them now in the order they occurred
285 try: 1dacb
286 for waiter in await self.get_followers(event): 286 ↛ 287line 286 didn't jump to line 287 because the loop on line 286 never started1dacb
287 await handler(waiter, depth=depth + 1)
288 except MaxDepthExceeded:
289 # We'll only process the first MAX_DEPTH_OF_PRECEDING_EVENT followers.
290 # If we hit this limit, we'll just log and move on.
291 self._log(
292 event,
293 "reached its max depth of %s followers processed.",
294 MAX_DEPTH_OF_PRECEDING_EVENT,
295 )
297 # If this event was itself waiting on a leader, let's consider it as
298 # resolved now that it has been processed
299 if event.follows and event.follows != event.id: 1dacb
300 await self.forget_follower(event) 1acb
302 def _log(self, event: ReceivedEvent, message: str, *args: Any) -> None:
303 logger.info(
304 "Event %r (%s) for %r " + message,
305 event.event,
306 event.id,
307 event.resource.id,
308 *args,
309 extra={
310 "event_id": event.id,
311 "follows": event.follows,
312 "resource_id": event.resource.id,
313 },
314 )