Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/clients.py: 47%

119 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import abc 1a

2from textwrap import dedent 1a

3from types import TracebackType 1a

4from typing import ( 1a

5 TYPE_CHECKING, 

6 Any, 

7 ClassVar, 

8 Dict, 

9 List, 

10 Optional, 

11 Tuple, 

12 Type, 

13 Union, 

14) 

15from uuid import UUID 1a

16 

17import httpx 1a

18from typing_extensions import Self, TypeAlias 1a

19 

20from prefect.client.base import PrefectHttpxAsyncClient 1a

21from prefect.logging import get_logger 1a

22from prefect.server.events import messaging 1a

23from prefect.server.events.schemas.events import ( 1a

24 Event, 

25 ReceivedEvent, 

26 ResourceSpecification, 

27) 

28 

29if TYPE_CHECKING: 29 ↛ 30line 29 didn't jump to line 30 because the condition on line 29 was never true1a

30 import logging 

31 

32logger: "logging.Logger" = get_logger(__name__) 1a

33 

34LabelValue: TypeAlias = Union[str, List[str]] 1a

35 

36 

37class EventsClient(abc.ABC): 1a

38 """The abstract interface for a Prefect Events client""" 

39 

40 @abc.abstractmethod 1a

41 async def emit(self, event: Event) -> Optional[Event]: ... 41 ↛ exitline 41 didn't return from function 'emit' because 1a

42 

43 async def __aenter__(self) -> Self: 1a

44 return self 

45 

46 async def __aexit__( 1a

47 self, 

48 exc_type: Optional[Type[Exception]], 

49 exc_val: Optional[Exception], 

50 exc_tb: Optional[TracebackType], 

51 ) -> None: 

52 return None 

53 

54 

55class NullEventsClient(EventsClient): 1a

56 """A no-op implementation of the Prefect Events client for testing""" 

57 

58 async def emit(self, event: Event) -> None: 1a

59 pass 

60 

61 

62class AssertingEventsClient(EventsClient): 1a

63 """An implementation of the Prefect Events client that records all events sent 

64 to it for inspection during tests.""" 

65 

66 last: ClassVar[Optional["AssertingEventsClient"]] = None 1a

67 all: ClassVar[List["AssertingEventsClient"]] = [] 1a

68 

69 args: tuple[Any, ...] 1a

70 kwargs: dict[str, Any] 1a

71 events: list[Event] 1a

72 

73 def __init__(self, *args: Any, **kwargs: Any): 1a

74 AssertingEventsClient.last = self 

75 AssertingEventsClient.all.append(self) 

76 self.args = args 

77 self.kwargs = kwargs 

78 

79 @classmethod 1a

80 def reset(cls) -> None: 1a

81 """Reset all captured instances and their events. For use this between tests""" 

82 cls.last = None 

83 cls.all = [] 

84 

85 async def emit(self, event: Event) -> Event: 1a

86 if not hasattr(self, "events"): 

87 raise TypeError( 

88 "Events may only be emitted while this client is being used as a " 

89 "context manager" 

90 ) 

91 self.events.append(event) 

92 return event 

93 

94 async def __aenter__(self) -> Self: 1a

95 self.events = [] 

96 return self 

97 

98 async def __aexit__( 1a

99 self, 

100 exc_type: Optional[Type[Exception]], 

101 exc_val: Optional[Exception], 

102 exc_tb: Optional[TracebackType], 

103 ) -> None: 

104 # retain the events list so that assertions may be made 

105 return None 

106 

107 @classmethod 1a

108 def emitted_events_count(cls) -> int: 1a

109 return sum(len(client.events) for client in cls.all) 

110 

111 @classmethod 1a

112 def assert_emitted_event_count(cls, count: int) -> None: 1a

113 """Assert that the given number of events were emitted.""" 

114 total_num_events = cls.emitted_events_count() 

115 assert total_num_events == count, ( 

116 f"The number of emitted events did not match the expected count: {total_num_events=} != {count=}" 

117 ) 

118 

119 @classmethod 1a

120 def assert_emitted_event_with( 1a

121 cls, 

122 event: Optional[str] = None, 

123 resource: Optional[Dict[str, LabelValue]] = None, 

124 related: Optional[List[Dict[str, LabelValue]]] = None, 

125 payload: Optional[Dict[str, Any]] = None, 

126 ) -> None: 

127 """Assert that an event was emitted containing the given properties.""" 

128 assert cls.last is not None and cls.all, "No event client was created" 

129 

130 emitted_events = [ 

131 event for client in cls.all for event in reversed(client.events) 

132 ] 

133 

134 resource_spec = ( 

135 ResourceSpecification.model_validate(resource) if resource else None 

136 ) 

137 related_specs = ( 

138 [ 

139 ResourceSpecification.model_validate(related_resource) 

140 for related_resource in related 

141 ] 

142 if related 

143 else None 

144 ) 

145 

146 mismatch_reasons: List[Tuple[str, str]] = [] 

147 

148 def event_matches(emitted_event: Event) -> bool: 

149 if event is not None and emitted_event.event != event: 

150 mismatch_reasons.append((f"{event=}", f"{emitted_event.event=}")) 

151 return False 

152 

153 if resource_spec and not resource_spec.matches(emitted_event.resource): 

154 mismatch_reasons.append((f"{resource=}", f"{emitted_event.resource=}")) 

155 return False 

156 

157 if related_specs: 

158 for related_spec in related_specs: 

159 if not any( 

160 related_spec.matches(related_resource) 

161 for related_resource in emitted_event.related 

162 ): 

163 mismatch_reasons.append( 

164 (f"{related=}", f"{emitted_event.related=}") 

165 ) 

166 return False 

167 

168 if payload and any( 

169 emitted_event.payload.get(k) != v for k, v in payload.items() 

170 ): 

171 mismatch_reasons.append((f"{payload=}", f"{emitted_event.payload=}")) 

172 return False 

173 

174 return True 

175 

176 assert any( 

177 event_matches(emitted_event) for emitted_event in emitted_events 

178 ), f"""An event was not emitted matching the following criteria: 

179 {event=} 

180 {resource=} 

181 {related=} 

182 {payload=} 

183 

184# of captured events: {len(emitted_events)} 

185{ 

186 chr(10).join( 

187 dedent(f''' 

188 Expected: 

189 {expected} 

190 Received: 

191 {received} 

192 ''') 

193 for expected, received in mismatch_reasons 

194 ) 

195 } 

196""" 

197 

198 @classmethod 1a

199 def assert_no_emitted_event_with( 1a

200 cls, 

201 event: Optional[str] = None, 

202 resource: Optional[Dict[str, LabelValue]] = None, 

203 related: Optional[List[Dict[str, LabelValue]]] = None, 

204 payload: Optional[Dict[str, Any]] = None, 

205 ) -> None: 

206 try: 

207 cls.assert_emitted_event_with(event, resource, related, payload) 

208 except AssertionError: 

209 return 

210 else: 

211 assert False, "An event was emitted matching the given criteria" 

212 

213 

214class PrefectServerEventsClient(EventsClient): 1a

215 _publisher: messaging.EventPublisher 1a

216 

217 async def __aenter__(self) -> Self: 1a

218 publisher = messaging.create_event_publisher() 1bcdefg

219 self._publisher = await publisher.__aenter__() 1bcdefg

220 return self 1bcdefg

221 

222 async def __aexit__( 1a

223 self, 

224 exc_type: Optional[Type[Exception]], 

225 exc_val: Optional[Exception], 

226 exc_tb: Optional[TracebackType], 

227 ) -> None: 

228 await self._publisher.__aexit__(exc_type, exc_val, exc_tb) 1bcdefg

229 del self._publisher 1bcdefg

230 return None 1bcdefg

231 

232 async def emit(self, event: Event) -> ReceivedEvent: 1a

233 if not hasattr(self, "_publisher"): 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true1bcdefg

234 raise TypeError( 

235 "Events may only be emitted while this client is being used as a " 

236 "context manager" 

237 ) 

238 

239 received_event = event.receive() 1bcdefg

240 await self._publisher.publish_event(received_event) 1bcdefg

241 return received_event 1bcdefg

242 

243 

244class PrefectServerEventsAPIClient: 1a

245 _http_client: PrefectHttpxAsyncClient 1a

246 

247 def __init__(self, additional_headers: dict[str, str] = {}): 1a

248 from prefect.server.api.server import create_app 

249 

250 # create_app caches application instances, and invoking it with no arguments 

251 # will point it to the the currently running server instance 

252 api_app = create_app() 

253 

254 self._http_client = PrefectHttpxAsyncClient( 

255 transport=httpx.ASGITransport(app=api_app, raise_app_exceptions=False), 

256 headers={**additional_headers}, 

257 base_url="http://prefect-in-memory/api", 

258 enable_csrf_support=False, 

259 raise_on_all_errors=False, 

260 ) 

261 

262 async def __aenter__(self) -> Self: 1a

263 await self._http_client.__aenter__() 

264 return self 

265 

266 async def __aexit__(self, *args: Any) -> None: 1a

267 await self._http_client.__aexit__(*args) 

268 

269 async def pause_automation(self, automation_id: UUID) -> httpx.Response: 1a

270 return await self._http_client.patch( 

271 f"/automations/{automation_id}", json={"enabled": False} 

272 ) 

273 

274 async def resume_automation(self, automation_id: UUID) -> httpx.Response: 1a

275 return await self._http_client.patch( 

276 f"/automations/{automation_id}", json={"enabled": True} 

277 )