Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/utilities/messaging/__init__.py: 75%

92 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import abc 1a

2from contextlib import asynccontextmanager, AbstractAsyncContextManager 1a

3from dataclasses import dataclass 1a

4import importlib 1a

5from types import TracebackType 1a

6from typing import ( 1a

7 TYPE_CHECKING, 

8 Any, 

9 Callable, 

10 Optional, 

11 Protocol, 

12 Type, 

13 TypeVar, 

14 Union, 

15 runtime_checkable, 

16) 

17from collections.abc import AsyncGenerator, Awaitable, Iterable, Mapping 1a

18from typing_extensions import Self 1a

19 

20from prefect.settings import PREFECT_MESSAGING_CACHE, PREFECT_MESSAGING_BROKER 1a

21from prefect.logging import get_logger 1a

22 

23if TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true1a

24 import logging 

25 

26logger: "logging.Logger" = get_logger(__name__) 1a

27 

28 

29M = TypeVar("M", bound="Message", covariant=True) 1a

30 

31 

32class Message(Protocol): 1a

33 """ 

34 A protocol representing a message sent to a message broker. 

35 """ 

36 

37 @property 1a

38 def data(self) -> Union[str, bytes]: ... 38 ↛ exitline 38 didn't return from function 'data' because 1a

39 

40 @property 1a

41 def attributes(self) -> Mapping[str, Any]: ... 41 ↛ exitline 41 didn't return from function 'attributes' because 1a

42 

43 

44class Cache(abc.ABC): 1a

45 @abc.abstractmethod 1a

46 async def clear_recently_seen_messages(self) -> None: ... 46 ↛ exitline 46 didn't return from function 'clear_recently_seen_messages' because 1a

47 

48 @abc.abstractmethod 1a

49 async def without_duplicates( 49 ↛ exitline 49 didn't return from function 'without_duplicates' because 1a

50 self, attribute: str, messages: Iterable[M] 

51 ) -> list[M]: ... 

52 

53 @abc.abstractmethod 1a

54 async def forget_duplicates( 54 ↛ exitline 54 didn't return from function 'forget_duplicates' because 1a

55 self, attribute: str, messages: Iterable[Message] 

56 ) -> None: ... 

57 

58 

59class Publisher(AbstractAsyncContextManager["Publisher"], abc.ABC): 1a

60 def __init__( 60 ↛ exitline 60 didn't return from function '__init__' because 1a

61 self, 

62 topic: str, 

63 cache: Optional[Cache] = None, 

64 deduplicate_by: Optional[str] = None, 

65 ) -> None: ... 

66 

67 @abc.abstractmethod 1a

68 async def publish_data( 68 ↛ exitline 68 didn't return from function 'publish_data' because 1a

69 self, data: bytes, attributes: Mapping[str, str] 

70 ) -> None: ... 

71 

72 @abc.abstractmethod 1a

73 async def __aenter__(self) -> Self: ... 73 ↛ exitline 73 didn't return from function '__aenter__' because 1a

74 

75 @abc.abstractmethod 1a

76 async def __aexit__( 76 ↛ exitline 76 didn't return from function '__aexit__' because 1a

77 self, 

78 exc_type: Optional[Type[BaseException]], 

79 exc_val: Optional[BaseException], 

80 exc_tb: Optional[TracebackType], 

81 ) -> None: ... 

82 

83 

84@dataclass 1a

85class CapturedMessage: 1a

86 data: bytes 1a

87 attributes: Mapping[str, str] 1a

88 

89 

90class CapturingPublisher(Publisher): 1a

91 messages: list[CapturedMessage] = [] 1a

92 deduplicate_by: Optional[str] 1a

93 

94 def __init__( 1a

95 self, 

96 topic: str, 

97 cache: Optional[Cache] = None, 

98 deduplicate_by: Optional[str] = None, 

99 ) -> None: 

100 self.topic = topic 

101 self.cache: Cache = cache or create_cache() 

102 self.deduplicate_by = deduplicate_by 

103 

104 async def __aenter__(self) -> Self: 1a

105 return self 

106 

107 async def __aexit__( 1a

108 self, 

109 exc_type: Optional[Type[BaseException]], 

110 exc_val: Optional[BaseException], 

111 exc_tb: Optional[TracebackType], 

112 ) -> None: 

113 pass 

114 

115 async def publish_data(self, data: bytes, attributes: Mapping[str, str]) -> None: 1a

116 to_publish = [CapturedMessage(data, attributes)] 

117 

118 if self.deduplicate_by: 

119 to_publish = await self.cache.without_duplicates( 

120 self.deduplicate_by, to_publish 

121 ) 

122 

123 self.messages.extend(to_publish) 

124 

125 

126MessageHandler = Callable[[Message], Awaitable[None]] 1a

127 

128 

129class StopConsumer(Exception): 1a

130 """ 

131 Exception to raise to stop a consumer. 

132 """ 

133 

134 def __init__(self, ack: bool = False): 1a

135 self.ack = ack 

136 

137 

138class Consumer(abc.ABC): 1a

139 """ 

140 Abstract base class for consumers that receive messages from a message broker and 

141 call a handler function for each message received. 

142 """ 

143 

144 def __init__(self, topic: str, **kwargs: Any) -> None: 1a

145 self.topic = topic 

146 

147 @abc.abstractmethod 1a

148 async def run(self, handler: MessageHandler) -> None: 1a

149 """Runs the consumer (indefinitely)""" 

150 ... 

151 

152 

153@runtime_checkable 1a

154class CacheModule(Protocol): 1a

155 Cache: type[Cache] 1a

156 

157 

158def create_cache() -> Cache: 1a

159 """ 

160 Creates a new cache with the applications default settings. 

161 

162 Returns: 

163 a new Cache instance 

164 """ 

165 module = importlib.import_module(PREFECT_MESSAGING_CACHE.value()) 1bcdefg

166 assert isinstance(module, CacheModule) 1bcdefg

167 

168 return module.Cache() 1bcdefg

169 

170 

171@runtime_checkable 1a

172class BrokerModule(Protocol): 1a

173 Publisher: type[Publisher] 1a

174 Consumer: type[Consumer] 1a

175 ephemeral_subscription: Callable[ 1a

176 [str], AbstractAsyncContextManager[Mapping[str, Any]] 

177 ] 

178 

179 # Used for testing: a context manager that breaks the topic in a way that raises 

180 # a ValueError("oops") when attempting to publish a message. 

181 break_topic: Callable[[], AbstractAsyncContextManager[None]] 1a

182 

183 

184def create_publisher( 1a

185 topic: str, cache: Optional[Cache] = None, deduplicate_by: Optional[str] = None 

186) -> Publisher: 

187 """ 

188 Creates a new publisher with the applications default settings. 

189 Args: 

190 topic: the topic to publish to 

191 Returns: 

192 a new Consumer instance 

193 """ 

194 cache = cache or create_cache() 1bcdefg

195 

196 module = importlib.import_module(PREFECT_MESSAGING_BROKER.value()) 1bcdefg

197 assert isinstance(module, BrokerModule) 1bcdefg

198 return module.Publisher(topic, cache, deduplicate_by=deduplicate_by) 1bcdefg

199 

200 

201@asynccontextmanager 1a

202async def ephemeral_subscription(topic: str) -> AsyncGenerator[Mapping[str, Any], Any]: 1a

203 """ 

204 Creates an ephemeral subscription to the given source, removing it when the context 

205 exits. 

206 """ 

207 module = importlib.import_module(PREFECT_MESSAGING_BROKER.value()) 

208 assert isinstance(module, BrokerModule) 

209 async with module.ephemeral_subscription(topic) as consumer_create_kwargs: 

210 yield consumer_create_kwargs 

211 

212 

213def create_consumer(topic: str, **kwargs: Any) -> Consumer: 1a

214 """ 

215 Creates a new consumer with the applications default settings. 

216 Args: 

217 topic: the topic to consume from 

218 Returns: 

219 a new Consumer instance 

220 """ 

221 module = importlib.import_module(PREFECT_MESSAGING_BROKER.value()) 1h

222 assert isinstance(module, BrokerModule) 1h

223 return module.Consumer(topic, **kwargs) 1h