Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/ordering/db.py: 0%
69 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from collections import defaultdict
2from contextlib import asynccontextmanager
3from typing import (
4 TYPE_CHECKING,
5 List,
6 Mapping,
7 MutableMapping,
8 Union,
9)
10from uuid import UUID
12import sqlalchemy as sa
13from cachetools import TTLCache
15import prefect.types._datetime
16from prefect.logging import get_logger
17from prefect.server.database import PrefectDBInterface, db_injector
18from prefect.server.events.ordering import (
19 MAX_DEPTH_OF_PRECEDING_EVENT,
20 PRECEDING_EVENT_LOOKBACK,
21 SEEN_EXPIRATION,
22 EventArrivedEarly,
23 MaxDepthExceeded,
24 event_handler,
25)
26from prefect.server.events.ordering import CausalOrdering as _CausalOrdering
27from prefect.server.events.schemas.events import Event, ReceivedEvent
29if TYPE_CHECKING:
30 import logging
32logger: "logging.Logger" = get_logger(__name__)
35class CausalOrdering(_CausalOrdering):
36 _seen_events: Mapping[str, MutableMapping[UUID, bool]] = defaultdict(
37 lambda: TTLCache(maxsize=10000, ttl=SEEN_EXPIRATION.total_seconds())
38 )
40 scope: str
42 def __init__(self, scope: str):
43 self.scope = scope
45 async def event_has_been_seen(self, event: Union[UUID, Event]) -> bool:
46 id = event.id if isinstance(event, Event) else event
47 return self._seen_events[self.scope].get(id, False)
49 async def record_event_as_seen(self, event: ReceivedEvent) -> None:
50 self._seen_events[self.scope][event.id] = True
52 @db_injector
53 async def record_follower(
54 self, db: PrefectDBInterface, event: ReceivedEvent
55 ) -> None:
56 """Remember that this event is waiting on another event to arrive"""
57 assert event.follows
59 async with db.session_context(begin_transaction=True) as session:
60 await session.execute(
61 sa.insert(db.AutomationEventFollower).values(
62 scope=self.scope,
63 leader_event_id=event.follows,
64 follower_event_id=event.id,
65 received=event.received,
66 follower=event,
67 )
68 )
70 @db_injector
71 async def forget_follower(
72 self, db: PrefectDBInterface, follower: ReceivedEvent
73 ) -> None:
74 """Forget that this event is waiting on another event to arrive"""
75 assert follower.follows
77 async with db.session_context(begin_transaction=True) as session:
78 await session.execute(
79 sa.delete(db.AutomationEventFollower).where(
80 db.AutomationEventFollower.scope == self.scope,
81 db.AutomationEventFollower.follower_event_id == follower.id,
82 )
83 )
85 @db_injector
86 async def get_followers(
87 self, db: PrefectDBInterface, leader: ReceivedEvent
88 ) -> List[ReceivedEvent]:
89 """Returns events that were waiting on this leader event to arrive"""
90 async with db.session_context() as session:
91 query = sa.select(db.AutomationEventFollower.follower).where(
92 db.AutomationEventFollower.scope == self.scope,
93 db.AutomationEventFollower.leader_event_id == leader.id,
94 )
95 result = await session.execute(query)
96 followers = result.scalars().all()
97 return sorted(followers, key=lambda e: e.occurred)
99 @db_injector
100 async def get_lost_followers(self, db: PrefectDBInterface) -> List[ReceivedEvent]:
101 """Returns events that were waiting on a leader event that never arrived"""
102 earlier = prefect.types._datetime.now("UTC") - PRECEDING_EVENT_LOOKBACK
104 async with db.session_context(begin_transaction=True) as session:
105 query = sa.select(db.AutomationEventFollower.follower).where(
106 db.AutomationEventFollower.scope == self.scope,
107 db.AutomationEventFollower.received < earlier,
108 )
109 result = await session.execute(query)
110 followers = result.scalars().all()
112 # forget these followers, since they are never going to see their leader event
114 await session.execute(
115 sa.delete(db.AutomationEventFollower).where(
116 db.AutomationEventFollower.scope == self.scope,
117 db.AutomationEventFollower.received < earlier,
118 )
119 )
121 return sorted(followers, key=lambda e: e.occurred)
123 @asynccontextmanager
124 async def preceding_event_confirmed(
125 self, handler: event_handler, event: ReceivedEvent, depth: int = 0
126 ):
127 """Events may optionally declare that they logically follow another event, so that
128 we can preserve important event orderings in the face of unreliable delivery and
129 ordering of messages from the queues.
131 This function keeps track of the ID of each event that this shard has successfully
132 processed going back to the PRECEDING_EVENT_LOOKBACK period. If an event arrives
133 that must follow another one, confirm that we have recently seen and processed that
134 event before proceeding.
136 Args:
137 event (ReceivedEvent): The event to be processed. This object should include metadata indicating
138 if and what event it follows.
139 depth (int, optional): The current recursion depth, used to prevent infinite recursion due to
140 cyclic dependencies between events. Defaults to 0.
143 Raises EventArrivedEarly if the current event shouldn't be processed yet."""
145 if depth > MAX_DEPTH_OF_PRECEDING_EVENT:
146 logger.exception(
147 "Event %r (%s) for %r has exceeded the maximum recursion depth of %s",
148 event.event,
149 event.id,
150 event.resource.id,
151 MAX_DEPTH_OF_PRECEDING_EVENT,
152 )
153 raise MaxDepthExceeded(event)
155 if event.follows:
156 if not await self.event_has_been_seen(event.follows):
157 age = prefect.types._datetime.now("UTC") - event.received
158 if age < PRECEDING_EVENT_LOOKBACK:
159 logger.debug(
160 "Event %r (%s) for %r arrived before the event it follows %s",
161 event.event,
162 event.id,
163 event.resource.id,
164 event.follows,
165 )
167 # record this follower for safe-keeping
168 await self.record_follower(event)
169 raise EventArrivedEarly(event)
171 yield
173 await self.record_event_as_seen(event)
175 # we have just processed an event that other events were waiting on, so let's
176 # react to them now in the order they occurred
177 for waiter in await self.get_followers(event):
178 await handler(waiter, depth + 1)
180 # if this event was itself waiting on something, let's consider it as resolved now
181 # that it has been processed
182 if event.follows:
183 await self.forget_follower(event)