Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/events.py: 41%

120 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import base64 1a

2from typing import TYPE_CHECKING, List, Optional 1a

3 

4from fastapi import Response, WebSocket 1a

5from fastapi.exceptions import HTTPException 1a

6from fastapi.param_functions import Depends, Path 1a

7from fastapi.params import Body, Query 1a

8from sqlalchemy.ext.asyncio import AsyncSession 1a

9from starlette.requests import Request 1a

10from starlette.status import WS_1002_PROTOCOL_ERROR 1a

11 

12from prefect._internal.compatibility.starlette import status 1a

13from prefect.logging import get_logger 1a

14from prefect.server.api.dependencies import is_ephemeral_request 1a

15from prefect.server.database import PrefectDBInterface, provide_database_interface 1a

16from prefect.server.events import messaging, stream 1a

17from prefect.server.events.counting import ( 1a

18 Countable, 

19 InvalidEventCountParameters, 

20 TimeUnit, 

21) 

22from prefect.server.events.filters import EventFilter, EventOrder 1a

23from prefect.server.events.models.automations import automations_session 1a

24from prefect.server.events.pipeline import EventsPipeline 1a

25from prefect.server.events.schemas.events import Event, EventCount, EventPage 1a

26from prefect.server.events.storage import ( 1a

27 INTERACTIVE_PAGE_SIZE, 

28 InvalidTokenError, 

29 database, 

30) 

31from prefect.server.utilities import subscriptions 1a

32from prefect.server.utilities.server import PrefectRouter 1a

33from prefect.settings import ( 1a

34 PREFECT_EVENTS_MAXIMUM_WEBSOCKET_BACKFILL, 

35 PREFECT_EVENTS_WEBSOCKET_BACKFILL_PAGE_SIZE, 

36) 

37 

38if TYPE_CHECKING: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true1a

39 import logging 

40 

41logger: "logging.Logger" = get_logger(__name__) 1a

42 

43 

44router: PrefectRouter = PrefectRouter(prefix="/events", tags=["Events"]) 1a

45 

46 

47@router.post("", status_code=status.HTTP_204_NO_CONTENT, response_class=Response) 1a

48async def create_events( 1a

49 events: List[Event], 

50 ephemeral_request: bool = Depends(is_ephemeral_request), 

51) -> None: 

52 """ 

53 Record a batch of Events. 

54 

55 For more information, see https://docs.prefect.io/v3/concepts/events. 

56 """ 

57 if ephemeral_request: 57 ↛ 58line 57 didn't jump to line 58 because the condition on line 57 was never true1b

58 await EventsPipeline().process_events(events) 

59 else: 

60 received_events = [event.receive() for event in events] 1b

61 await messaging.publish(received_events) 1b

62 

63 

64@router.websocket("/in") 1a

65async def stream_events_in(websocket: WebSocket) -> None: 1a

66 """Open a WebSocket to stream incoming Events""" 

67 

68 await websocket.accept() 

69 

70 try: 

71 async with messaging.create_event_publisher() as publisher: 

72 async for event_json in websocket.iter_text(): 

73 event = Event.model_validate_json(event_json) 

74 await publisher.publish_event(event.receive()) 

75 except subscriptions.NORMAL_DISCONNECT_EXCEPTIONS: # pragma: no cover 

76 pass # it's fine if a client disconnects either normally or abnormally 

77 

78 return None 

79 

80 

81@router.websocket("/out") 1a

82async def stream_workspace_events_out( 1a

83 websocket: WebSocket, 

84) -> None: 

85 """Open a WebSocket to stream Events""" 

86 websocket = await subscriptions.accept_prefect_socket( 

87 websocket, 

88 ) 

89 if not websocket: 

90 return 

91 

92 try: 

93 # After authentication, the next message is expected to be a filter message, any 

94 # other type of message will close the connection. 

95 message = await websocket.receive_json() 

96 

97 if message["type"] != "filter": 

98 return await websocket.close( 

99 WS_1002_PROTOCOL_ERROR, reason="Expected 'filter' message" 

100 ) 

101 

102 wants_backfill = message.get("backfill", True) 

103 

104 try: 

105 filter = EventFilter.model_validate(message["filter"]) 

106 except Exception as e: 

107 return await websocket.close( 

108 WS_1002_PROTOCOL_ERROR, reason=f"Invalid filter: {e}" 

109 ) 

110 

111 filter.occurred.clamp(PREFECT_EVENTS_MAXIMUM_WEBSOCKET_BACKFILL.value()) 

112 filter.order = EventOrder.ASC 

113 

114 # subscribe to the ongoing event stream first so we don't miss events... 

115 async with stream.events(filter) as event_stream: 

116 # ...then if the user wants, backfill up to the last 1k events... 

117 if wants_backfill: 

118 backfilled_ids = set() 

119 

120 async with automations_session() as session: 

121 backfill, _, next_page = await database.query_events( 

122 session=session, 

123 filter=filter, 

124 page_size=PREFECT_EVENTS_WEBSOCKET_BACKFILL_PAGE_SIZE.value(), 

125 ) 

126 

127 while backfill: 

128 for event in backfill: 

129 backfilled_ids.add(event.id) 

130 await websocket.send_json( 

131 { 

132 "type": "event", 

133 "event": event.model_dump(mode="json"), 

134 } 

135 ) 

136 

137 if not next_page: 

138 break 

139 

140 backfill, _, next_page = await database.query_next_page( 

141 session=session, 

142 page_token=next_page, 

143 ) 

144 

145 # ...before resuming the ongoing stream of events 

146 async for event in event_stream: 

147 if not event: 

148 if await subscriptions.still_connected(websocket): 

149 continue 

150 break 

151 

152 if wants_backfill and event.id in backfilled_ids: 

153 backfilled_ids.remove(event.id) 

154 continue 

155 

156 await websocket.send_json( 

157 {"type": "event", "event": event.model_dump(mode="json")} 

158 ) 

159 

160 except subscriptions.NORMAL_DISCONNECT_EXCEPTIONS: # pragma: no cover 

161 pass # it's fine if a client disconnects either normally or abnormally 

162 

163 return None 

164 

165 

166def verified_page_token( 1a

167 page_token: str = Query(..., alias="page-token"), 

168) -> str: 

169 try: 1bc

170 page_token = base64.b64decode(page_token.encode()).decode() 1bc

171 except Exception: 

172 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) 

173 

174 if not page_token: 174 ↛ 177line 174 didn't jump to line 177 because the condition on line 174 was always true1bc

175 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) 1bc

176 

177 return page_token 

178 

179 

180@router.post( 1a

181 "/filter", 

182) 

183async def read_events( 1a

184 request: Request, 

185 filter: Optional[EventFilter] = Body( 

186 None, 

187 description=( 

188 "Additional optional filter criteria to narrow down the set of Events" 

189 ), 

190 ), 

191 limit: int = Body( 

192 INTERACTIVE_PAGE_SIZE, 

193 ge=0, 

194 le=INTERACTIVE_PAGE_SIZE, 

195 embed=True, 

196 description="The number of events to return with each page", 

197 ), 

198 db: PrefectDBInterface = Depends(provide_database_interface), 

199) -> EventPage: 

200 """ 

201 Queries for Events matching the given filter criteria in the given Account. Returns 

202 the first page of results, and the URL to request the next page (if there are more 

203 results). 

204 """ 

205 filter = filter or EventFilter() 1bc

206 async with db.session_context() as session: 1bc

207 events, total, next_token = await database.query_events( 1bc

208 session=session, 

209 filter=filter, 

210 page_size=limit, 

211 ) 

212 

213 return EventPage( 

214 events=events, 

215 total=total, 

216 next_page=generate_next_page_link(request, next_token), 

217 ) 

218 

219 

220@router.get( 1a

221 "/filter/next", 

222) 

223async def read_account_events_page( 1a

224 request: Request, 

225 page_token: str = Depends(verified_page_token), 

226 db: PrefectDBInterface = Depends(provide_database_interface), 

227) -> EventPage: 

228 """ 

229 Returns the next page of Events for a previous query against the given Account, and 

230 the URL to request the next page (if there are more results). 

231 """ 

232 async with db.session_context() as session: 

233 try: 

234 events, total, next_token = await database.query_next_page( 

235 session=session, page_token=page_token 

236 ) 

237 except InvalidTokenError: 

238 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN) 

239 

240 return EventPage( 

241 events=events, 

242 total=total, 

243 next_page=generate_next_page_link(request, next_token), 

244 ) 

245 

246 

247def generate_next_page_link( 1a

248 request: Request, 

249 page_token: Optional[str], 

250) -> Optional[str]: 

251 if not page_token: 251 ↛ 254line 251 didn't jump to line 254 because the condition on line 251 was always true1bc

252 return None 1bc

253 

254 next_page = ( 

255 f"{request.base_url}api/events/filter/next" 

256 f"?page-token={base64.b64encode(page_token.encode()).decode()}" 

257 ) 

258 return next_page 

259 

260 

261@router.post( 1a

262 "/count-by/{countable}", 

263) 

264async def count_account_events( 1a

265 filter: EventFilter, 

266 countable: Countable = Path(...), 

267 time_unit: TimeUnit = Body(default=TimeUnit.day), 

268 time_interval: float = Body(default=1.0, ge=0.01), 

269 db: PrefectDBInterface = Depends(provide_database_interface), 

270) -> List[EventCount]: 

271 """ 

272 Returns distinct objects and the count of events associated with them. Objects 

273 that can be counted include the day the event occurred, the type of event, or 

274 the IDs of the resources associated with the event. 

275 """ 

276 async with db.session_context() as session: 1bc

277 return await handle_event_count_request( 1bc

278 session=session, 

279 filter=filter, 

280 countable=countable, 

281 time_unit=time_unit, 

282 time_interval=time_interval, 

283 ) 

284 

285 

286async def handle_event_count_request( 1a

287 session: AsyncSession, 

288 filter: EventFilter, 

289 countable: Countable, 

290 time_unit: TimeUnit, 

291 time_interval: float, 

292) -> List[EventCount]: 

293 try: 1bc

294 return await database.count_events( 1bc

295 session=session, 

296 filter=filter, 

297 countable=countable, 

298 time_unit=time_unit, 

299 time_interval=time_interval, 

300 ) 

301 except InvalidEventCountParameters as exc: 

302 raise HTTPException( 

303 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY, 

304 detail=exc.message, 

305 )