Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/clients.py: 40%
119 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import abc 1a
2from textwrap import dedent 1a
3from types import TracebackType 1a
4from typing import ( 1a
5 TYPE_CHECKING,
6 Any,
7 ClassVar,
8 Dict,
9 List,
10 Optional,
11 Tuple,
12 Type,
13 Union,
14)
15from uuid import UUID 1a
17import httpx 1a
18from typing_extensions import Self, TypeAlias 1a
20from prefect.client.base import PrefectHttpxAsyncClient 1a
21from prefect.logging import get_logger 1a
22from prefect.server.events import messaging 1a
23from prefect.server.events.schemas.events import ( 1a
24 Event,
25 ReceivedEvent,
26 ResourceSpecification,
27)
29if TYPE_CHECKING: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true1a
30 import logging
32logger: "logging.Logger" = get_logger(__name__) 1a
34LabelValue: TypeAlias = Union[str, List[str]] 1a
37class EventsClient(abc.ABC): 1a
38 """The abstract interface for a Prefect Events client"""
40 @abc.abstractmethod 1a
41 async def emit(self, event: Event) -> Optional[Event]: ... 41 ↛ exitline 41 didn't return from function 'emit' because 1a
43 async def __aenter__(self) -> Self: 1a
44 return self
46 async def __aexit__( 1a
47 self,
48 exc_type: Optional[Type[Exception]],
49 exc_val: Optional[Exception],
50 exc_tb: Optional[TracebackType],
51 ) -> None:
52 return None
55class NullEventsClient(EventsClient): 1a
56 """A no-op implementation of the Prefect Events client for testing"""
58 async def emit(self, event: Event) -> None: 1a
59 pass
62class AssertingEventsClient(EventsClient): 1a
63 """An implementation of the Prefect Events client that records all events sent
64 to it for inspection during tests."""
66 last: ClassVar[Optional["AssertingEventsClient"]] = None 1a
67 all: ClassVar[List["AssertingEventsClient"]] = [] 1a
69 args: tuple[Any, ...] 1a
70 kwargs: dict[str, Any] 1a
71 events: list[Event] 1a
73 def __init__(self, *args: Any, **kwargs: Any): 1a
74 AssertingEventsClient.last = self
75 AssertingEventsClient.all.append(self)
76 self.args = args
77 self.kwargs = kwargs
79 @classmethod 1a
80 def reset(cls) -> None: 1a
81 """Reset all captured instances and their events. For use this between tests"""
82 cls.last = None
83 cls.all = []
85 async def emit(self, event: Event) -> Event: 1a
86 if not hasattr(self, "events"):
87 raise TypeError(
88 "Events may only be emitted while this client is being used as a "
89 "context manager"
90 )
91 self.events.append(event)
92 return event
94 async def __aenter__(self) -> Self: 1a
95 self.events = []
96 return self
98 async def __aexit__( 1a
99 self,
100 exc_type: Optional[Type[Exception]],
101 exc_val: Optional[Exception],
102 exc_tb: Optional[TracebackType],
103 ) -> None:
104 # retain the events list so that assertions may be made
105 return None
107 @classmethod 1a
108 def emitted_events_count(cls) -> int: 1a
109 return sum(len(client.events) for client in cls.all)
111 @classmethod 1a
112 def assert_emitted_event_count(cls, count: int) -> None: 1a
113 """Assert that the given number of events were emitted."""
114 total_num_events = cls.emitted_events_count()
115 assert total_num_events == count, (
116 f"The number of emitted events did not match the expected count: {total_num_events=} != {count=}"
117 )
119 @classmethod 1a
120 def assert_emitted_event_with( 1a
121 cls,
122 event: Optional[str] = None,
123 resource: Optional[Dict[str, LabelValue]] = None,
124 related: Optional[List[Dict[str, LabelValue]]] = None,
125 payload: Optional[Dict[str, Any]] = None,
126 ) -> None:
127 """Assert that an event was emitted containing the given properties."""
128 assert cls.last is not None and cls.all, "No event client was created"
130 emitted_events = [
131 event for client in cls.all for event in reversed(client.events)
132 ]
134 resource_spec = (
135 ResourceSpecification.model_validate(resource) if resource else None
136 )
137 related_specs = (
138 [
139 ResourceSpecification.model_validate(related_resource)
140 for related_resource in related
141 ]
142 if related
143 else None
144 )
146 mismatch_reasons: List[Tuple[str, str]] = []
148 def event_matches(emitted_event: Event) -> bool:
149 if event is not None and emitted_event.event != event:
150 mismatch_reasons.append((f"{event=}", f"{emitted_event.event=}"))
151 return False
153 if resource_spec and not resource_spec.matches(emitted_event.resource):
154 mismatch_reasons.append((f"{resource=}", f"{emitted_event.resource=}"))
155 return False
157 if related_specs:
158 for related_spec in related_specs:
159 if not any(
160 related_spec.matches(related_resource)
161 for related_resource in emitted_event.related
162 ):
163 mismatch_reasons.append(
164 (f"{related=}", f"{emitted_event.related=}")
165 )
166 return False
168 if payload and any(
169 emitted_event.payload.get(k) != v for k, v in payload.items()
170 ):
171 mismatch_reasons.append((f"{payload=}", f"{emitted_event.payload=}"))
172 return False
174 return True
176 assert any(
177 event_matches(emitted_event) for emitted_event in emitted_events
178 ), f"""An event was not emitted matching the following criteria:
179 {event=}
180 {resource=}
181 {related=}
182 {payload=}
184# of captured events: {len(emitted_events)}
185{
186 chr(10).join(
187 dedent(f'''
188 Expected:
189 {expected}
190 Received:
191 {received}
192 ''')
193 for expected, received in mismatch_reasons
194 )
195 }
196"""
198 @classmethod 1a
199 def assert_no_emitted_event_with( 1a
200 cls,
201 event: Optional[str] = None,
202 resource: Optional[Dict[str, LabelValue]] = None,
203 related: Optional[List[Dict[str, LabelValue]]] = None,
204 payload: Optional[Dict[str, Any]] = None,
205 ) -> None:
206 try:
207 cls.assert_emitted_event_with(event, resource, related, payload)
208 except AssertionError:
209 return
210 else:
211 assert False, "An event was emitted matching the given criteria"
214class PrefectServerEventsClient(EventsClient): 1a
215 _publisher: messaging.EventPublisher 1a
217 async def __aenter__(self) -> Self: 1a
218 publisher = messaging.create_event_publisher()
219 self._publisher = await publisher.__aenter__()
220 return self
222 async def __aexit__( 1a
223 self,
224 exc_type: Optional[Type[Exception]],
225 exc_val: Optional[Exception],
226 exc_tb: Optional[TracebackType],
227 ) -> None:
228 await self._publisher.__aexit__(exc_type, exc_val, exc_tb)
229 del self._publisher
230 return None
232 async def emit(self, event: Event) -> ReceivedEvent: 1a
233 if not hasattr(self, "_publisher"):
234 raise TypeError(
235 "Events may only be emitted while this client is being used as a "
236 "context manager"
237 )
239 received_event = event.receive()
240 await self._publisher.publish_event(received_event)
241 return received_event
244class PrefectServerEventsAPIClient: 1a
245 _http_client: PrefectHttpxAsyncClient 1a
247 def __init__(self, additional_headers: dict[str, str] = {}): 1a
248 from prefect.server.api.server import create_app
250 # create_app caches application instances, and invoking it with no arguments
251 # will point it to the the currently running server instance
252 api_app = create_app()
254 self._http_client = PrefectHttpxAsyncClient(
255 transport=httpx.ASGITransport(app=api_app, raise_app_exceptions=False),
256 headers={**additional_headers},
257 base_url="http://prefect-in-memory/api",
258 enable_csrf_support=False,
259 raise_on_all_errors=False,
260 )
262 async def __aenter__(self) -> Self: 1a
263 await self._http_client.__aenter__()
264 return self
266 async def __aexit__(self, *args: Any) -> None: 1a
267 await self._http_client.__aexit__(*args)
269 async def pause_automation(self, automation_id: UUID) -> httpx.Response: 1a
270 return await self._http_client.patch(
271 f"/automations/{automation_id}", json={"enabled": False}
272 )
274 async def resume_automation(self, automation_id: UUID) -> httpx.Response: 1a
275 return await self._http_client.patch(
276 f"/automations/{automation_id}", json={"enabled": True}
277 )