Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/api/events.py: 28%
120 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import base64 1a
2from typing import TYPE_CHECKING, List, Optional 1a
4from fastapi import Response, WebSocket 1a
5from fastapi.exceptions import HTTPException 1a
6from fastapi.param_functions import Depends, Path 1a
7from fastapi.params import Body, Query 1a
8from sqlalchemy.ext.asyncio import AsyncSession 1a
9from starlette.requests import Request 1a
10from starlette.status import WS_1002_PROTOCOL_ERROR 1a
12from prefect._internal.compatibility.starlette import status 1a
13from prefect.logging import get_logger 1a
14from prefect.server.api.dependencies import is_ephemeral_request 1a
15from prefect.server.database import PrefectDBInterface, provide_database_interface 1a
16from prefect.server.events import messaging, stream 1a
17from prefect.server.events.counting import ( 1a
18 Countable,
19 InvalidEventCountParameters,
20 TimeUnit,
21)
22from prefect.server.events.filters import EventFilter, EventOrder 1a
23from prefect.server.events.models.automations import automations_session 1a
24from prefect.server.events.pipeline import EventsPipeline 1a
25from prefect.server.events.schemas.events import Event, EventCount, EventPage 1a
26from prefect.server.events.storage import ( 1a
27 INTERACTIVE_PAGE_SIZE,
28 InvalidTokenError,
29 database,
30)
31from prefect.server.utilities import subscriptions 1a
32from prefect.server.utilities.server import PrefectRouter 1a
33from prefect.settings import ( 1a
34 PREFECT_EVENTS_MAXIMUM_WEBSOCKET_BACKFILL,
35 PREFECT_EVENTS_WEBSOCKET_BACKFILL_PAGE_SIZE,
36)
38if TYPE_CHECKING: 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true1a
39 import logging
41logger: "logging.Logger" = get_logger(__name__) 1a
44router: PrefectRouter = PrefectRouter(prefix="/events", tags=["Events"]) 1a
47@router.post("", status_code=status.HTTP_204_NO_CONTENT, response_class=Response) 1a
48async def create_events( 1a
49 events: List[Event],
50 ephemeral_request: bool = Depends(is_ephemeral_request),
51) -> None:
52 """
53 Record a batch of Events.
55 For more information, see https://docs.prefect.io/v3/concepts/events.
56 """
57 if ephemeral_request:
58 await EventsPipeline().process_events(events)
59 else:
60 received_events = [event.receive() for event in events]
61 await messaging.publish(received_events)
64@router.websocket("/in") 1a
65async def stream_events_in(websocket: WebSocket) -> None: 1a
66 """Open a WebSocket to stream incoming Events"""
68 await websocket.accept()
70 try:
71 async with messaging.create_event_publisher() as publisher:
72 async for event_json in websocket.iter_text():
73 event = Event.model_validate_json(event_json)
74 await publisher.publish_event(event.receive())
75 except subscriptions.NORMAL_DISCONNECT_EXCEPTIONS: # pragma: no cover
76 pass # it's fine if a client disconnects either normally or abnormally
78 return None
81@router.websocket("/out") 1a
82async def stream_workspace_events_out( 1a
83 websocket: WebSocket,
84) -> None:
85 """Open a WebSocket to stream Events"""
86 websocket = await subscriptions.accept_prefect_socket(
87 websocket,
88 )
89 if not websocket:
90 return
92 try:
93 # After authentication, the next message is expected to be a filter message, any
94 # other type of message will close the connection.
95 message = await websocket.receive_json()
97 if message["type"] != "filter":
98 return await websocket.close(
99 WS_1002_PROTOCOL_ERROR, reason="Expected 'filter' message"
100 )
102 wants_backfill = message.get("backfill", True)
104 try:
105 filter = EventFilter.model_validate(message["filter"])
106 except Exception as e:
107 return await websocket.close(
108 WS_1002_PROTOCOL_ERROR, reason=f"Invalid filter: {e}"
109 )
111 filter.occurred.clamp(PREFECT_EVENTS_MAXIMUM_WEBSOCKET_BACKFILL.value())
112 filter.order = EventOrder.ASC
114 # subscribe to the ongoing event stream first so we don't miss events...
115 async with stream.events(filter) as event_stream:
116 # ...then if the user wants, backfill up to the last 1k events...
117 if wants_backfill:
118 backfilled_ids = set()
120 async with automations_session() as session:
121 backfill, _, next_page = await database.query_events(
122 session=session,
123 filter=filter,
124 page_size=PREFECT_EVENTS_WEBSOCKET_BACKFILL_PAGE_SIZE.value(),
125 )
127 while backfill:
128 for event in backfill:
129 backfilled_ids.add(event.id)
130 await websocket.send_json(
131 {
132 "type": "event",
133 "event": event.model_dump(mode="json"),
134 }
135 )
137 if not next_page:
138 break
140 backfill, _, next_page = await database.query_next_page(
141 session=session,
142 page_token=next_page,
143 )
145 # ...before resuming the ongoing stream of events
146 async for event in event_stream:
147 if not event:
148 if await subscriptions.still_connected(websocket):
149 continue
150 break
152 if wants_backfill and event.id in backfilled_ids:
153 backfilled_ids.remove(event.id)
154 continue
156 await websocket.send_json(
157 {"type": "event", "event": event.model_dump(mode="json")}
158 )
160 except subscriptions.NORMAL_DISCONNECT_EXCEPTIONS: # pragma: no cover
161 pass # it's fine if a client disconnects either normally or abnormally
163 return None
166def verified_page_token( 1a
167 page_token: str = Query(..., alias="page-token"),
168) -> str:
169 try:
170 page_token = base64.b64decode(page_token.encode()).decode()
171 except Exception:
172 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
174 if not page_token:
175 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
177 return page_token
180@router.post( 1a
181 "/filter",
182)
183async def read_events( 1a
184 request: Request,
185 filter: Optional[EventFilter] = Body(
186 None,
187 description=(
188 "Additional optional filter criteria to narrow down the set of Events"
189 ),
190 ),
191 limit: int = Body(
192 INTERACTIVE_PAGE_SIZE,
193 ge=0,
194 le=INTERACTIVE_PAGE_SIZE,
195 embed=True,
196 description="The number of events to return with each page",
197 ),
198 db: PrefectDBInterface = Depends(provide_database_interface),
199) -> EventPage:
200 """
201 Queries for Events matching the given filter criteria in the given Account. Returns
202 the first page of results, and the URL to request the next page (if there are more
203 results).
204 """
205 filter = filter or EventFilter()
206 async with db.session_context() as session:
207 events, total, next_token = await database.query_events(
208 session=session,
209 filter=filter,
210 page_size=limit,
211 )
213 return EventPage(
214 events=events,
215 total=total,
216 next_page=generate_next_page_link(request, next_token),
217 )
220@router.get( 1a
221 "/filter/next",
222)
223async def read_account_events_page( 1a
224 request: Request,
225 page_token: str = Depends(verified_page_token),
226 db: PrefectDBInterface = Depends(provide_database_interface),
227) -> EventPage:
228 """
229 Returns the next page of Events for a previous query against the given Account, and
230 the URL to request the next page (if there are more results).
231 """
232 async with db.session_context() as session:
233 try:
234 events, total, next_token = await database.query_next_page(
235 session=session, page_token=page_token
236 )
237 except InvalidTokenError:
238 raise HTTPException(status_code=status.HTTP_403_FORBIDDEN)
240 return EventPage(
241 events=events,
242 total=total,
243 next_page=generate_next_page_link(request, next_token),
244 )
247def generate_next_page_link( 1a
248 request: Request,
249 page_token: Optional[str],
250) -> Optional[str]:
251 if not page_token:
252 return None
254 next_page = (
255 f"{request.base_url}api/events/filter/next"
256 f"?page-token={base64.b64encode(page_token.encode()).decode()}"
257 )
258 return next_page
261@router.post( 1a
262 "/count-by/{countable}",
263)
264async def count_account_events( 1a
265 filter: EventFilter,
266 countable: Countable = Path(...),
267 time_unit: TimeUnit = Body(default=TimeUnit.day),
268 time_interval: float = Body(default=1.0, ge=0.01),
269 db: PrefectDBInterface = Depends(provide_database_interface),
270) -> List[EventCount]:
271 """
272 Returns distinct objects and the count of events associated with them. Objects
273 that can be counted include the day the event occurred, the type of event, or
274 the IDs of the resources associated with the event.
275 """
276 async with db.session_context() as session:
277 return await handle_event_count_request(
278 session=session,
279 filter=filter,
280 countable=countable,
281 time_unit=time_unit,
282 time_interval=time_interval,
283 )
286async def handle_event_count_request( 1a
287 session: AsyncSession,
288 filter: EventFilter,
289 countable: Countable,
290 time_unit: TimeUnit,
291 time_interval: float,
292) -> List[EventCount]:
293 try:
294 return await database.count_events(
295 session=session,
296 filter=filter,
297 countable=countable,
298 time_unit=time_unit,
299 time_interval=time_interval,
300 )
301 except InvalidEventCountParameters as exc:
302 raise HTTPException(
303 status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
304 detail=exc.message,
305 )