Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/clients.py: 47%
119 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import abc 1a
2from textwrap import dedent 1a
3from types import TracebackType 1a
4from typing import ( 1a
5 TYPE_CHECKING,
6 Any,
7 ClassVar,
8 Dict,
9 List,
10 Optional,
11 Tuple,
12 Type,
13 Union,
14)
15from uuid import UUID 1a
17import httpx 1a
18from typing_extensions import Self, TypeAlias 1a
20from prefect.client.base import PrefectHttpxAsyncClient 1a
21from prefect.logging import get_logger 1a
22from prefect.server.events import messaging 1a
23from prefect.server.events.schemas.events import ( 1a
24 Event,
25 ReceivedEvent,
26 ResourceSpecification,
27)
29if TYPE_CHECKING: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true1a
30 import logging
32logger: "logging.Logger" = get_logger(__name__) 1a
34LabelValue: TypeAlias = Union[str, List[str]] 1a
37class EventsClient(abc.ABC): 1a
38 """The abstract interface for a Prefect Events client"""
40 @abc.abstractmethod 1a
41 async def emit(self, event: Event) -> Optional[Event]: ... 41 ↛ exitline 41 didn't return from function 'emit' because 1a
43 async def __aenter__(self) -> Self: 1a
44 return self
46 async def __aexit__( 1a
47 self,
48 exc_type: Optional[Type[Exception]],
49 exc_val: Optional[Exception],
50 exc_tb: Optional[TracebackType],
51 ) -> None:
52 return None
55class NullEventsClient(EventsClient): 1a
56 """A no-op implementation of the Prefect Events client for testing"""
58 async def emit(self, event: Event) -> None: 1a
59 pass
62class AssertingEventsClient(EventsClient): 1a
63 """An implementation of the Prefect Events client that records all events sent
64 to it for inspection during tests."""
66 last: ClassVar[Optional["AssertingEventsClient"]] = None 1a
67 all: ClassVar[List["AssertingEventsClient"]] = [] 1a
69 args: tuple[Any, ...] 1a
70 kwargs: dict[str, Any] 1a
71 events: list[Event] 1a
73 def __init__(self, *args: Any, **kwargs: Any): 1a
74 AssertingEventsClient.last = self
75 AssertingEventsClient.all.append(self)
76 self.args = args
77 self.kwargs = kwargs
79 @classmethod 1a
80 def reset(cls) -> None: 1a
81 """Reset all captured instances and their events. For use this between tests"""
82 cls.last = None
83 cls.all = []
85 async def emit(self, event: Event) -> Event: 1a
86 if not hasattr(self, "events"):
87 raise TypeError(
88 "Events may only be emitted while this client is being used as a "
89 "context manager"
90 )
91 self.events.append(event)
92 return event
94 async def __aenter__(self) -> Self: 1a
95 self.events = []
96 return self
98 async def __aexit__( 1a
99 self,
100 exc_type: Optional[Type[Exception]],
101 exc_val: Optional[Exception],
102 exc_tb: Optional[TracebackType],
103 ) -> None:
104 # retain the events list so that assertions may be made
105 return None
107 @classmethod 1a
108 def emitted_events_count(cls) -> int: 1a
109 return sum(len(client.events) for client in cls.all)
111 @classmethod 1a
112 def assert_emitted_event_count(cls, count: int) -> None: 1a
113 """Assert that the given number of events were emitted."""
114 total_num_events = cls.emitted_events_count()
115 assert total_num_events == count, (
116 f"The number of emitted events did not match the expected count: {total_num_events=} != {count=}"
117 )
119 @classmethod 1a
120 def assert_emitted_event_with( 1a
121 cls,
122 event: Optional[str] = None,
123 resource: Optional[Dict[str, LabelValue]] = None,
124 related: Optional[List[Dict[str, LabelValue]]] = None,
125 payload: Optional[Dict[str, Any]] = None,
126 ) -> None:
127 """Assert that an event was emitted containing the given properties."""
128 assert cls.last is not None and cls.all, "No event client was created"
130 emitted_events = [
131 event for client in cls.all for event in reversed(client.events)
132 ]
134 resource_spec = (
135 ResourceSpecification.model_validate(resource) if resource else None
136 )
137 related_specs = (
138 [
139 ResourceSpecification.model_validate(related_resource)
140 for related_resource in related
141 ]
142 if related
143 else None
144 )
146 mismatch_reasons: List[Tuple[str, str]] = []
148 def event_matches(emitted_event: Event) -> bool:
149 if event is not None and emitted_event.event != event:
150 mismatch_reasons.append((f"{event=}", f"{emitted_event.event=}"))
151 return False
153 if resource_spec and not resource_spec.matches(emitted_event.resource):
154 mismatch_reasons.append((f"{resource=}", f"{emitted_event.resource=}"))
155 return False
157 if related_specs:
158 for related_spec in related_specs:
159 if not any(
160 related_spec.matches(related_resource)
161 for related_resource in emitted_event.related
162 ):
163 mismatch_reasons.append(
164 (f"{related=}", f"{emitted_event.related=}")
165 )
166 return False
168 if payload and any(
169 emitted_event.payload.get(k) != v for k, v in payload.items()
170 ):
171 mismatch_reasons.append((f"{payload=}", f"{emitted_event.payload=}"))
172 return False
174 return True
176 assert any(
177 event_matches(emitted_event) for emitted_event in emitted_events
178 ), f"""An event was not emitted matching the following criteria:
179 {event=}
180 {resource=}
181 {related=}
182 {payload=}
184# of captured events: {len(emitted_events)}
185{
186 chr(10).join(
187 dedent(f'''
188 Expected:
189 {expected}
190 Received:
191 {received}
192 ''')
193 for expected, received in mismatch_reasons
194 )
195 }
196"""
198 @classmethod 1a
199 def assert_no_emitted_event_with( 1a
200 cls,
201 event: Optional[str] = None,
202 resource: Optional[Dict[str, LabelValue]] = None,
203 related: Optional[List[Dict[str, LabelValue]]] = None,
204 payload: Optional[Dict[str, Any]] = None,
205 ) -> None:
206 try:
207 cls.assert_emitted_event_with(event, resource, related, payload)
208 except AssertionError:
209 return
210 else:
211 assert False, "An event was emitted matching the given criteria"
214class PrefectServerEventsClient(EventsClient): 1a
215 _publisher: messaging.EventPublisher 1a
217 async def __aenter__(self) -> Self: 1a
218 publisher = messaging.create_event_publisher() 1bcdefg
219 self._publisher = await publisher.__aenter__() 1bcdefg
220 return self 1bcdefg
222 async def __aexit__( 1a
223 self,
224 exc_type: Optional[Type[Exception]],
225 exc_val: Optional[Exception],
226 exc_tb: Optional[TracebackType],
227 ) -> None:
228 await self._publisher.__aexit__(exc_type, exc_val, exc_tb) 1bcdefg
229 del self._publisher 1bcdefg
230 return None 1bcdefg
232 async def emit(self, event: Event) -> ReceivedEvent: 1a
233 if not hasattr(self, "_publisher"): 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true1bcdefg
234 raise TypeError(
235 "Events may only be emitted while this client is being used as a "
236 "context manager"
237 )
239 received_event = event.receive() 1bcdefg
240 await self._publisher.publish_event(received_event) 1bcdefg
241 return received_event 1bcdefg
244class PrefectServerEventsAPIClient: 1a
245 _http_client: PrefectHttpxAsyncClient 1a
247 def __init__(self, additional_headers: dict[str, str] = {}): 1a
248 from prefect.server.api.server import create_app
250 # create_app caches application instances, and invoking it with no arguments
251 # will point it to the the currently running server instance
252 api_app = create_app()
254 self._http_client = PrefectHttpxAsyncClient(
255 transport=httpx.ASGITransport(app=api_app, raise_app_exceptions=False),
256 headers={**additional_headers},
257 base_url="http://prefect-in-memory/api",
258 enable_csrf_support=False,
259 raise_on_all_errors=False,
260 )
262 async def __aenter__(self) -> Self: 1a
263 await self._http_client.__aenter__()
264 return self
266 async def __aexit__(self, *args: Any) -> None: 1a
267 await self._http_client.__aexit__(*args)
269 async def pause_automation(self, automation_id: UUID) -> httpx.Response: 1a
270 return await self._http_client.patch(
271 f"/automations/{automation_id}", json={"enabled": False}
272 )
274 async def resume_automation(self, automation_id: UUID) -> httpx.Response: 1a
275 return await self._http_client.patch(
276 f"/automations/{automation_id}", json={"enabled": True}
277 )