Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/clients.py: 40%

119 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import abc 1a

2from textwrap import dedent 1a

3from types import TracebackType 1a

4from typing import ( 1a

5 TYPE_CHECKING, 

6 Any, 

7 ClassVar, 

8 Dict, 

9 List, 

10 Optional, 

11 Tuple, 

12 Type, 

13 Union, 

14) 

15from uuid import UUID 1a

16 

17import httpx 1a

18from typing_extensions import Self, TypeAlias 1a

19 

20from prefect.client.base import PrefectHttpxAsyncClient 1a

21from prefect.logging import get_logger 1a

22from prefect.server.events import messaging 1a

23from prefect.server.events.schemas.events import ( 1a

24 Event, 

25 ReceivedEvent, 

26 ResourceSpecification, 

27) 

28 

29if TYPE_CHECKING: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true1a

30 import logging 

31 

32logger: "logging.Logger" = get_logger(__name__) 1a

33 

34LabelValue: TypeAlias = Union[str, List[str]] 1a

35 

36 

37class EventsClient(abc.ABC): 1a

38 """The abstract interface for a Prefect Events client""" 

39 

40 @abc.abstractmethod 1a

41 async def emit(self, event: Event) -> Optional[Event]: ... 41 ↛ exitline 41 didn't return from function 'emit' because 1a

42 

43 async def __aenter__(self) -> Self: 1a

44 return self 

45 

46 async def __aexit__( 1a

47 self, 

48 exc_type: Optional[Type[Exception]], 

49 exc_val: Optional[Exception], 

50 exc_tb: Optional[TracebackType], 

51 ) -> None: 

52 return None 

53 

54 

55class NullEventsClient(EventsClient): 1a

56 """A no-op implementation of the Prefect Events client for testing""" 

57 

58 async def emit(self, event: Event) -> None: 1a

59 pass 

60 

61 

62class AssertingEventsClient(EventsClient): 1a

63 """An implementation of the Prefect Events client that records all events sent 

64 to it for inspection during tests.""" 

65 

66 last: ClassVar[Optional["AssertingEventsClient"]] = None 1a

67 all: ClassVar[List["AssertingEventsClient"]] = [] 1a

68 

69 args: tuple[Any, ...] 1a

70 kwargs: dict[str, Any] 1a

71 events: list[Event] 1a

72 

73 def __init__(self, *args: Any, **kwargs: Any): 1a

74 AssertingEventsClient.last = self 

75 AssertingEventsClient.all.append(self) 

76 self.args = args 

77 self.kwargs = kwargs 

78 

79 @classmethod 1a

80 def reset(cls) -> None: 1a

81 """Reset all captured instances and their events. For use this between tests""" 

82 cls.last = None 

83 cls.all = [] 

84 

85 async def emit(self, event: Event) -> Event: 1a

86 if not hasattr(self, "events"): 

87 raise TypeError( 

88 "Events may only be emitted while this client is being used as a " 

89 "context manager" 

90 ) 

91 self.events.append(event) 

92 return event 

93 

94 async def __aenter__(self) -> Self: 1a

95 self.events = [] 

96 return self 

97 

98 async def __aexit__( 1a

99 self, 

100 exc_type: Optional[Type[Exception]], 

101 exc_val: Optional[Exception], 

102 exc_tb: Optional[TracebackType], 

103 ) -> None: 

104 # retain the events list so that assertions may be made 

105 return None 

106 

107 @classmethod 1a

108 def emitted_events_count(cls) -> int: 1a

109 return sum(len(client.events) for client in cls.all) 

110 

111 @classmethod 1a

112 def assert_emitted_event_count(cls, count: int) -> None: 1a

113 """Assert that the given number of events were emitted.""" 

114 total_num_events = cls.emitted_events_count() 

115 assert total_num_events == count, ( 

116 f"The number of emitted events did not match the expected count: {total_num_events=} != {count=}" 

117 ) 

118 

119 @classmethod 1a

120 def assert_emitted_event_with( 1a

121 cls, 

122 event: Optional[str] = None, 

123 resource: Optional[Dict[str, LabelValue]] = None, 

124 related: Optional[List[Dict[str, LabelValue]]] = None, 

125 payload: Optional[Dict[str, Any]] = None, 

126 ) -> None: 

127 """Assert that an event was emitted containing the given properties.""" 

128 assert cls.last is not None and cls.all, "No event client was created" 

129 

130 emitted_events = [ 

131 event for client in cls.all for event in reversed(client.events) 

132 ] 

133 

134 resource_spec = ( 

135 ResourceSpecification.model_validate(resource) if resource else None 

136 ) 

137 related_specs = ( 

138 [ 

139 ResourceSpecification.model_validate(related_resource) 

140 for related_resource in related 

141 ] 

142 if related 

143 else None 

144 ) 

145 

146 mismatch_reasons: List[Tuple[str, str]] = [] 

147 

148 def event_matches(emitted_event: Event) -> bool: 

149 if event is not None and emitted_event.event != event: 

150 mismatch_reasons.append((f"{event=}", f"{emitted_event.event=}")) 

151 return False 

152 

153 if resource_spec and not resource_spec.matches(emitted_event.resource): 

154 mismatch_reasons.append((f"{resource=}", f"{emitted_event.resource=}")) 

155 return False 

156 

157 if related_specs: 

158 for related_spec in related_specs: 

159 if not any( 

160 related_spec.matches(related_resource) 

161 for related_resource in emitted_event.related 

162 ): 

163 mismatch_reasons.append( 

164 (f"{related=}", f"{emitted_event.related=}") 

165 ) 

166 return False 

167 

168 if payload and any( 

169 emitted_event.payload.get(k) != v for k, v in payload.items() 

170 ): 

171 mismatch_reasons.append((f"{payload=}", f"{emitted_event.payload=}")) 

172 return False 

173 

174 return True 

175 

176 assert any( 

177 event_matches(emitted_event) for emitted_event in emitted_events 

178 ), f"""An event was not emitted matching the following criteria: 

179 {event=} 

180 {resource=} 

181 {related=} 

182 {payload=} 

183 

184# of captured events: {len(emitted_events)} 

185{ 

186 chr(10).join( 

187 dedent(f''' 

188 Expected: 

189 {expected} 

190 Received: 

191 {received} 

192 ''') 

193 for expected, received in mismatch_reasons 

194 ) 

195 } 

196""" 

197 

198 @classmethod 1a

199 def assert_no_emitted_event_with( 1a

200 cls, 

201 event: Optional[str] = None, 

202 resource: Optional[Dict[str, LabelValue]] = None, 

203 related: Optional[List[Dict[str, LabelValue]]] = None, 

204 payload: Optional[Dict[str, Any]] = None, 

205 ) -> None: 

206 try: 

207 cls.assert_emitted_event_with(event, resource, related, payload) 

208 except AssertionError: 

209 return 

210 else: 

211 assert False, "An event was emitted matching the given criteria" 

212 

213 

214class PrefectServerEventsClient(EventsClient): 1a

215 _publisher: messaging.EventPublisher 1a

216 

217 async def __aenter__(self) -> Self: 1a

218 publisher = messaging.create_event_publisher() 

219 self._publisher = await publisher.__aenter__() 

220 return self 

221 

222 async def __aexit__( 1a

223 self, 

224 exc_type: Optional[Type[Exception]], 

225 exc_val: Optional[Exception], 

226 exc_tb: Optional[TracebackType], 

227 ) -> None: 

228 await self._publisher.__aexit__(exc_type, exc_val, exc_tb) 

229 del self._publisher 

230 return None 

231 

232 async def emit(self, event: Event) -> ReceivedEvent: 1a

233 if not hasattr(self, "_publisher"): 

234 raise TypeError( 

235 "Events may only be emitted while this client is being used as a " 

236 "context manager" 

237 ) 

238 

239 received_event = event.receive() 

240 await self._publisher.publish_event(received_event) 

241 return received_event 

242 

243 

244class PrefectServerEventsAPIClient: 1a

245 _http_client: PrefectHttpxAsyncClient 1a

246 

247 def __init__(self, additional_headers: dict[str, str] = {}): 1a

248 from prefect.server.api.server import create_app 

249 

250 # create_app caches application instances, and invoking it with no arguments 

251 # will point it to the the currently running server instance 

252 api_app = create_app() 

253 

254 self._http_client = PrefectHttpxAsyncClient( 

255 transport=httpx.ASGITransport(app=api_app, raise_app_exceptions=False), 

256 headers={**additional_headers}, 

257 base_url="http://prefect-in-memory/api", 

258 enable_csrf_support=False, 

259 raise_on_all_errors=False, 

260 ) 

261 

262 async def __aenter__(self) -> Self: 1a

263 await self._http_client.__aenter__() 

264 return self 

265 

266 async def __aexit__(self, *args: Any) -> None: 1a

267 await self._http_client.__aexit__(*args) 

268 

269 async def pause_automation(self, automation_id: UUID) -> httpx.Response: 1a

270 return await self._http_client.patch( 

271 f"/automations/{automation_id}", json={"enabled": False} 

272 ) 

273 

274 async def resume_automation(self, automation_id: UUID) -> httpx.Response: 1a

275 return await self._http_client.patch( 

276 f"/automations/{automation_id}", json={"enabled": True} 

277 )