Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/utilities/messaging/__init__.py: 75%
92 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import abc 1a
2from contextlib import asynccontextmanager, AbstractAsyncContextManager 1a
3from dataclasses import dataclass 1a
4import importlib 1a
5from types import TracebackType 1a
6from typing import ( 1a
7 TYPE_CHECKING,
8 Any,
9 Callable,
10 Optional,
11 Protocol,
12 Type,
13 TypeVar,
14 Union,
15 runtime_checkable,
16)
17from collections.abc import AsyncGenerator, Awaitable, Iterable, Mapping 1a
18from typing_extensions import Self 1a
20from prefect.settings import PREFECT_MESSAGING_CACHE, PREFECT_MESSAGING_BROKER 1a
21from prefect.logging import get_logger 1a
23if TYPE_CHECKING: 23 ↛ 24line 23 didn't jump to line 24 because the condition on line 23 was never true1a
24 import logging
26logger: "logging.Logger" = get_logger(__name__) 1a
29M = TypeVar("M", bound="Message", covariant=True) 1a
32class Message(Protocol): 1a
33 """
34 A protocol representing a message sent to a message broker.
35 """
37 @property 1a
38 def data(self) -> Union[str, bytes]: ... 38 ↛ exitline 38 didn't return from function 'data' because 1a
40 @property 1a
41 def attributes(self) -> Mapping[str, Any]: ... 41 ↛ exitline 41 didn't return from function 'attributes' because 1a
44class Cache(abc.ABC): 1a
45 @abc.abstractmethod 1a
46 async def clear_recently_seen_messages(self) -> None: ... 46 ↛ exitline 46 didn't return from function 'clear_recently_seen_messages' because 1a
48 @abc.abstractmethod 1a
49 async def without_duplicates( 49 ↛ exitline 49 didn't return from function 'without_duplicates' because 1a
50 self, attribute: str, messages: Iterable[M]
51 ) -> list[M]: ...
53 @abc.abstractmethod 1a
54 async def forget_duplicates( 54 ↛ exitline 54 didn't return from function 'forget_duplicates' because 1a
55 self, attribute: str, messages: Iterable[Message]
56 ) -> None: ...
59class Publisher(AbstractAsyncContextManager["Publisher"], abc.ABC): 1a
60 def __init__( 60 ↛ exitline 60 didn't return from function '__init__' because 1a
61 self,
62 topic: str,
63 cache: Optional[Cache] = None,
64 deduplicate_by: Optional[str] = None,
65 ) -> None: ...
67 @abc.abstractmethod 1a
68 async def publish_data( 68 ↛ exitline 68 didn't return from function 'publish_data' because 1a
69 self, data: bytes, attributes: Mapping[str, str]
70 ) -> None: ...
72 @abc.abstractmethod 1a
73 async def __aenter__(self) -> Self: ... 73 ↛ exitline 73 didn't return from function '__aenter__' because 1a
75 @abc.abstractmethod 1a
76 async def __aexit__( 76 ↛ exitline 76 didn't return from function '__aexit__' because 1a
77 self,
78 exc_type: Optional[Type[BaseException]],
79 exc_val: Optional[BaseException],
80 exc_tb: Optional[TracebackType],
81 ) -> None: ...
84@dataclass 1a
85class CapturedMessage: 1a
86 data: bytes 1a
87 attributes: Mapping[str, str] 1a
90class CapturingPublisher(Publisher): 1a
91 messages: list[CapturedMessage] = [] 1a
92 deduplicate_by: Optional[str] 1a
94 def __init__( 1a
95 self,
96 topic: str,
97 cache: Optional[Cache] = None,
98 deduplicate_by: Optional[str] = None,
99 ) -> None:
100 self.topic = topic
101 self.cache: Cache = cache or create_cache()
102 self.deduplicate_by = deduplicate_by
104 async def __aenter__(self) -> Self: 1a
105 return self
107 async def __aexit__( 1a
108 self,
109 exc_type: Optional[Type[BaseException]],
110 exc_val: Optional[BaseException],
111 exc_tb: Optional[TracebackType],
112 ) -> None:
113 pass
115 async def publish_data(self, data: bytes, attributes: Mapping[str, str]) -> None: 1a
116 to_publish = [CapturedMessage(data, attributes)]
118 if self.deduplicate_by:
119 to_publish = await self.cache.without_duplicates(
120 self.deduplicate_by, to_publish
121 )
123 self.messages.extend(to_publish)
126MessageHandler = Callable[[Message], Awaitable[None]] 1a
129class StopConsumer(Exception): 1a
130 """
131 Exception to raise to stop a consumer.
132 """
134 def __init__(self, ack: bool = False): 1a
135 self.ack = ack
138class Consumer(abc.ABC): 1a
139 """
140 Abstract base class for consumers that receive messages from a message broker and
141 call a handler function for each message received.
142 """
144 def __init__(self, topic: str, **kwargs: Any) -> None: 1a
145 self.topic = topic
147 @abc.abstractmethod 1a
148 async def run(self, handler: MessageHandler) -> None: 1a
149 """Runs the consumer (indefinitely)"""
150 ...
153@runtime_checkable 1a
154class CacheModule(Protocol): 1a
155 Cache: type[Cache] 1a
158def create_cache() -> Cache: 1a
159 """
160 Creates a new cache with the applications default settings.
162 Returns:
163 a new Cache instance
164 """
165 module = importlib.import_module(PREFECT_MESSAGING_CACHE.value()) 1bcdefg
166 assert isinstance(module, CacheModule) 1bcdefg
168 return module.Cache() 1bcdefg
171@runtime_checkable 1a
172class BrokerModule(Protocol): 1a
173 Publisher: type[Publisher] 1a
174 Consumer: type[Consumer] 1a
175 ephemeral_subscription: Callable[ 1a
176 [str], AbstractAsyncContextManager[Mapping[str, Any]]
177 ]
179 # Used for testing: a context manager that breaks the topic in a way that raises
180 # a ValueError("oops") when attempting to publish a message.
181 break_topic: Callable[[], AbstractAsyncContextManager[None]] 1a
184def create_publisher( 1a
185 topic: str, cache: Optional[Cache] = None, deduplicate_by: Optional[str] = None
186) -> Publisher:
187 """
188 Creates a new publisher with the applications default settings.
189 Args:
190 topic: the topic to publish to
191 Returns:
192 a new Consumer instance
193 """
194 cache = cache or create_cache() 1bcdefg
196 module = importlib.import_module(PREFECT_MESSAGING_BROKER.value()) 1bcdefg
197 assert isinstance(module, BrokerModule) 1bcdefg
198 return module.Publisher(topic, cache, deduplicate_by=deduplicate_by) 1bcdefg
201@asynccontextmanager 1a
202async def ephemeral_subscription(topic: str) -> AsyncGenerator[Mapping[str, Any], Any]: 1a
203 """
204 Creates an ephemeral subscription to the given source, removing it when the context
205 exits.
206 """
207 module = importlib.import_module(PREFECT_MESSAGING_BROKER.value())
208 assert isinstance(module, BrokerModule)
209 async with module.ephemeral_subscription(topic) as consumer_create_kwargs:
210 yield consumer_create_kwargs
213def create_consumer(topic: str, **kwargs: Any) -> Consumer: 1a
214 """
215 Creates a new consumer with the applications default settings.
216 Args:
217 topic: the topic to consume from
218 Returns:
219 a new Consumer instance
220 """
221 module = importlib.import_module(PREFECT_MESSAGING_BROKER.value()) 1h
222 assert isinstance(module, BrokerModule) 1h
223 return module.Consumer(topic, **kwargs) 1h