Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/_events/client.py: 48%

21 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from typing import TYPE_CHECKING 1a

2 

3from prefect.client.orchestration.base import BaseAsyncClient, BaseClient 1a

4from prefect.client.schemas.events import EventPage 1a

5 

6if TYPE_CHECKING: 6 ↛ 7line 6 didn't jump to line 7 because the condition on line 6 was never true1a

7 from prefect.events.filters import EventFilter 

8 

9 

10class EventClient(BaseClient): 1a

11 def read_events( 1a

12 self, 

13 filter: "EventFilter | None" = None, 

14 limit: int = 100, 

15 ) -> EventPage: 

16 """ 

17 query historical events from the API. 

18 

19 args: 

20 filter: optional filter criteria to narrow down events 

21 limit: maximum number of events to return per page (default 100) 

22 

23 returns: 

24 EventPage containing events, total count, and next page link 

25 """ 

26 response = self.request( 

27 "POST", 

28 "/events/filter", 

29 json={ 

30 "filter": filter.model_dump(mode="json") if filter else None, 

31 "limit": limit, 

32 }, 

33 ) 

34 return EventPage.model_validate(response.json()) 

35 

36 def read_events_page(self, next_page_url: str) -> EventPage: 1a

37 """ 

38 retrieve the next page of events using a next_page URL. 

39 

40 args: 

41 next_page_url: the next_page URL from a previous EventPage response 

42 

43 returns: 

44 EventPage containing the next page of events 

45 """ 

46 response = self._client.get(str(next_page_url)) 

47 response.raise_for_status() 

48 return EventPage.model_validate(response.json()) 

49 

50 

51class EventAsyncClient(BaseAsyncClient): 1a

52 async def read_events( 1a

53 self, 

54 filter: "EventFilter | None" = None, 

55 limit: int = 100, 

56 ) -> EventPage: 

57 """ 

58 query historical events from the API. 

59 

60 args: 

61 filter: optional filter criteria to narrow down events 

62 limit: maximum number of events to return per page (default 100) 

63 

64 returns: 

65 EventPage containing events, total count, and next page link 

66 """ 

67 response = await self.request( 

68 "POST", 

69 "/events/filter", 

70 json={ 

71 "filter": filter.model_dump(mode="json") if filter else None, 

72 "limit": limit, 

73 }, 

74 ) 

75 return EventPage.model_validate(response.json()) 

76 

77 async def read_events_page(self, next_page_url: str) -> EventPage: 1a

78 """ 

79 retrieve the next page of events using a next_page URL. 

80 

81 args: 

82 next_page_url: the next_page URL from a previous EventPage response 

83 

84 returns: 

85 EventPage containing the next page of events 

86 """ 

87 response = await self._client.get(str(next_page_url)) 

88 response.raise_for_status() 

89 return EventPage.model_validate(response.json())