Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_events/client.py: 48%
21 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from typing import TYPE_CHECKING 1a
3from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a
4from prefect.client.schemas.events import EventPage 1a
6if TYPE_CHECKING: 6 ↛ 7line 6 didn't jump to line 7 because the condition on line 6 was never true1a
7 from prefect.events.filters import EventFilter
10class EventClient(BaseClient): 1a
11 def read_events( 1a
12 self,
13 filter: "EventFilter | None" = None,
14 limit: int = 100,
15 ) -> EventPage:
16 """
17 query historical events from the API.
19 args:
20 filter: optional filter criteria to narrow down events
21 limit: maximum number of events to return per page (default 100)
23 returns:
24 EventPage containing events, total count, and next page link
25 """
26 response = self.request(
27 "POST",
28 "/events/filter",
29 json={
30 "filter": filter.model_dump(mode="json") if filter else None,
31 "limit": limit,
32 },
33 )
34 return EventPage.model_validate(response.json())
36 def read_events_page(self, next_page_url: str) -> EventPage: 1a
37 """
38 retrieve the next page of events using a next_page URL.
40 args:
41 next_page_url: the next_page URL from a previous EventPage response
43 returns:
44 EventPage containing the next page of events
45 """
46 response = self._client.get(str(next_page_url))
47 response.raise_for_status()
48 return EventPage.model_validate(response.json())
51class EventAsyncClient(BaseAsyncClient): 1a
52 async def read_events( 1a
53 self,
54 filter: "EventFilter | None" = None,
55 limit: int = 100,
56 ) -> EventPage:
57 """
58 query historical events from the API.
60 args:
61 filter: optional filter criteria to narrow down events
62 limit: maximum number of events to return per page (default 100)
64 returns:
65 EventPage containing events, total count, and next page link
66 """
67 response = await self.request(
68 "POST",
69 "/events/filter",
70 json={
71 "filter": filter.model_dump(mode="json") if filter else None,
72 "limit": limit,
73 },
74 )
75 return EventPage.model_validate(response.json())
77 async def read_events_page(self, next_page_url: str) -> EventPage: 1a
78 """
79 retrieve the next page of events using a next_page URL.
81 args:
82 next_page_url: the next_page URL from a previous EventPage response
84 returns:
85 EventPage containing the next page of events
86 """
87 response = await self._client.get(str(next_page_url))
88 response.raise_for_status()
89 return EventPage.model_validate(response.json())