Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/ordering/__init__.py: 71%
55 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Manages the partial causal ordering of events for a particular consumer. This module
3maintains a buffer of events to be processed, aiming to process them in the order they
4occurred causally.
5"""
7import abc 1a
8from datetime import timedelta 1a
9import importlib 1a
10from typing import ( 1a
11 TYPE_CHECKING,
12 AsyncContextManager,
13 List,
14 Protocol,
15 Union,
16 runtime_checkable,
17)
18from uuid import UUID 1a
19from prefect.logging import get_logger 1a
20from prefect.server.events.schemas.events import Event, ReceivedEvent 1a
21from prefect.settings import get_current_settings 1a
23if TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true1a
24 import logging
26logger: "logging.Logger" = get_logger(__name__) 1a
28# How long we'll retain preceding events (to aid with ordering)
29PRECEDING_EVENT_LOOKBACK = timedelta(minutes=15) 1a
31# How long we'll retain events we've processed (to prevent re-processing an event)
32PROCESSED_EVENT_LOOKBACK = timedelta(minutes=30) 1a
34# How long we'll remember that we've seen an event
35SEEN_EXPIRATION = max(PRECEDING_EVENT_LOOKBACK, PROCESSED_EVENT_LOOKBACK) 1a
37# How deep we'll allow the recursion to go when processing events
38MAX_DEPTH_OF_PRECEDING_EVENT = 20 1a
41@runtime_checkable 1a
42class CausalOrderingModule(Protocol): 1a
43 CausalOrdering: type["CausalOrdering"] 1a
46class EventArrivedEarly(Exception): 1a
47 def __init__(self, event: ReceivedEvent): 1a
48 self.event = event
51class MaxDepthExceeded(Exception): 1a
52 def __init__(self, event: ReceivedEvent): 1a
53 self.event = event
56class event_handler(Protocol): 1a
57 async def __call__( 57 ↛ exitline 57 didn't return from function '__call__' because 1a
58 self, event: ReceivedEvent, depth: int = 0
59 ) -> None: ... # pragma: no cover
62class CausalOrdering(abc.ABC): 1a
63 def __init__(self, scope: str): 1a
64 self.scope = scope
66 @abc.abstractmethod 1a
67 async def event_has_been_seen(self, event: Union[UUID, Event]) -> bool: ... 67 ↛ exitline 67 didn't return from function 'event_has_been_seen' because 1a
69 @abc.abstractmethod 1a
70 async def record_event_as_seen(self, event: ReceivedEvent) -> None: ... 70 ↛ exitline 70 didn't return from function 'record_event_as_seen' because 1a
72 @abc.abstractmethod 1a
73 async def record_follower(self, event: ReceivedEvent) -> None: ... 73 ↛ exitline 73 didn't return from function 'record_follower' because 1a
75 @abc.abstractmethod 1a
76 async def forget_follower(self, follower: ReceivedEvent) -> None: ... 76 ↛ exitline 76 didn't return from function 'forget_follower' because 1a
78 @abc.abstractmethod 1a
79 async def get_followers(self, leader: ReceivedEvent) -> List[ReceivedEvent]: ... 79 ↛ exitline 79 didn't return from function 'get_followers' because 1a
81 @abc.abstractmethod 1a
82 async def get_lost_followers(self) -> List[ReceivedEvent]: ... 82 ↛ exitline 82 didn't return from function 'get_lost_followers' because 1a
84 @abc.abstractmethod 1a
85 def preceding_event_confirmed( 85 ↛ exitline 85 didn't return from function 'preceding_event_confirmed' because 1a
86 self, handler: event_handler, event: ReceivedEvent, depth: int = 0
87 ) -> AsyncContextManager[None]: ...
90def get_triggers_causal_ordering() -> CausalOrdering: 1a
91 import_path = get_current_settings().server.events.causal_ordering 1b
92 causal_ordering_module = importlib.import_module(import_path) 1b
94 if not isinstance(causal_ordering_module, CausalOrderingModule): 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true1b
95 raise ValueError(
96 f"Module at {import_path} does not export a CausalOrdering class. Please check your server.events.causal_ordering setting."
97 )
99 return causal_ordering_module.CausalOrdering(scope="triggers") 1b
102def get_task_run_recorder_causal_ordering() -> CausalOrdering: 1a
103 import_path = get_current_settings().server.events.causal_ordering
104 causal_ordering_module = importlib.import_module(import_path)
106 if not isinstance(causal_ordering_module, CausalOrderingModule):
107 raise ValueError(
108 f"Module at {import_path} does not export a CausalOrdering class. Please check your server.events.causal_ordering setting."
109 )
111 return causal_ordering_module.CausalOrdering(scope="task-run-recorder")