Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/ordering/__init__.py: 71%

55 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Manages the partial causal ordering of events for a particular consumer. This module 

3maintains a buffer of events to be processed, aiming to process them in the order they 

4occurred causally. 

5""" 

6 

7import abc 1a

8from datetime import timedelta 1a

9import importlib 1a

10from typing import ( 1a

11 TYPE_CHECKING, 

12 AsyncContextManager, 

13 List, 

14 Protocol, 

15 Union, 

16 runtime_checkable, 

17) 

18from uuid import UUID 1a

19from prefect.logging import get_logger 1a

20from prefect.server.events.schemas.events import Event, ReceivedEvent 1a

21from prefect.settings import get_current_settings 1a

22 

23if TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true1a

24 import logging 

25 

26logger: "logging.Logger" = get_logger(__name__) 1a

27 

28# How long we'll retain preceding events (to aid with ordering) 

29PRECEDING_EVENT_LOOKBACK = timedelta(minutes=15) 1a

30 

31# How long we'll retain events we've processed (to prevent re-processing an event) 

32PROCESSED_EVENT_LOOKBACK = timedelta(minutes=30) 1a

33 

34# How long we'll remember that we've seen an event 

35SEEN_EXPIRATION = max(PRECEDING_EVENT_LOOKBACK, PROCESSED_EVENT_LOOKBACK) 1a

36 

37# How deep we'll allow the recursion to go when processing events 

38MAX_DEPTH_OF_PRECEDING_EVENT = 20 1a

39 

40 

41@runtime_checkable 1a

42class CausalOrderingModule(Protocol): 1a

43 CausalOrdering: type["CausalOrdering"] 1a

44 

45 

46class EventArrivedEarly(Exception): 1a

47 def __init__(self, event: ReceivedEvent): 1a

48 self.event = event 

49 

50 

51class MaxDepthExceeded(Exception): 1a

52 def __init__(self, event: ReceivedEvent): 1a

53 self.event = event 

54 

55 

56class event_handler(Protocol): 1a

57 async def __call__( 57 ↛ exitline 57 didn't return from function '__call__' because 1a

58 self, event: ReceivedEvent, depth: int = 0 

59 ) -> None: ... # pragma: no cover 

60 

61 

62class CausalOrdering(abc.ABC): 1a

63 def __init__(self, scope: str): 1a

64 self.scope = scope 

65 

66 @abc.abstractmethod 1a

67 async def event_has_been_seen(self, event: Union[UUID, Event]) -> bool: ... 67 ↛ exitline 67 didn't return from function 'event_has_been_seen' because 1a

68 

69 @abc.abstractmethod 1a

70 async def record_event_as_seen(self, event: ReceivedEvent) -> None: ... 70 ↛ exitline 70 didn't return from function 'record_event_as_seen' because 1a

71 

72 @abc.abstractmethod 1a

73 async def record_follower(self, event: ReceivedEvent) -> None: ... 73 ↛ exitline 73 didn't return from function 'record_follower' because 1a

74 

75 @abc.abstractmethod 1a

76 async def forget_follower(self, follower: ReceivedEvent) -> None: ... 76 ↛ exitline 76 didn't return from function 'forget_follower' because 1a

77 

78 @abc.abstractmethod 1a

79 async def get_followers(self, leader: ReceivedEvent) -> List[ReceivedEvent]: ... 79 ↛ exitline 79 didn't return from function 'get_followers' because 1a

80 

81 @abc.abstractmethod 1a

82 async def get_lost_followers(self) -> List[ReceivedEvent]: ... 82 ↛ exitline 82 didn't return from function 'get_lost_followers' because 1a

83 

84 @abc.abstractmethod 1a

85 def preceding_event_confirmed( 85 ↛ exitline 85 didn't return from function 'preceding_event_confirmed' because 1a

86 self, handler: event_handler, event: ReceivedEvent, depth: int = 0 

87 ) -> AsyncContextManager[None]: ... 

88 

89 

90def get_triggers_causal_ordering() -> CausalOrdering: 1a

91 import_path = get_current_settings().server.events.causal_ordering 1b

92 causal_ordering_module = importlib.import_module(import_path) 1b

93 

94 if not isinstance(causal_ordering_module, CausalOrderingModule): 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true1b

95 raise ValueError( 

96 f"Module at {import_path} does not export a CausalOrdering class. Please check your server.events.causal_ordering setting." 

97 ) 

98 

99 return causal_ordering_module.CausalOrdering(scope="triggers") 1b

100 

101 

102def get_task_run_recorder_causal_ordering() -> CausalOrdering: 1a

103 import_path = get_current_settings().server.events.causal_ordering 

104 causal_ordering_module = importlib.import_module(import_path) 

105 

106 if not isinstance(causal_ordering_module, CausalOrderingModule): 

107 raise ValueError( 

108 f"Module at {import_path} does not export a CausalOrdering class. Please check your server.events.causal_ordering setting." 

109 ) 

110 

111 return causal_ordering_module.CausalOrdering(scope="task-run-recorder")