Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/subscribers.py: 28%

95 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Flow run subscriber that interleaves events and logs from a flow run 

3""" 

4 

5from __future__ import annotations 1a

6 

7import asyncio 1a

8from types import TracebackType 1a

9from typing import TYPE_CHECKING, Any, Optional, Union 1a

10from uuid import UUID 1a

11 

12from typing_extensions import Self 1a

13 

14from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId 1a

15from prefect.client.schemas.objects import TERMINAL_STATES, Log, StateType 1a

16from prefect.events import Event 1a

17from prefect.events.clients import get_events_subscriber 1a

18from prefect.events.filters import EventAnyResourceFilter, EventFilter 1a

19from prefect.logging.clients import get_logs_subscriber 1a

20 

21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1a

22 from prefect.events.clients import PrefectEventSubscriber 

23 from prefect.logging.clients import PrefectLogsSubscriber 

24 

25 

26class FlowRunSubscriber: 1a

27 """ 

28 Subscribes to both events and logs for a specific flow run, yielding them 

29 in an interleaved stream. 

30 

31 This subscriber combines the event stream and log stream for a flow run into 

32 a single async iterator. When a terminal event (Completed, Failed, or Crashed) 

33 is received, the event subscription stops but log subscription continues for a 

34 configurable timeout to catch any straggler logs. 

35 

36 Example: 

37 ```python 

38 from prefect.events.subscribers import FlowRunSubscriber 

39 

40 async with FlowRunSubscriber(flow_run_id=my_flow_run_id) as subscriber: 

41 async for item in subscriber: 

42 if isinstance(item, Event): 

43 print(f"Event: {item.event}") 

44 else: # isinstance(item, Log) 

45 print(f"Log: {item.message}") 

46 ``` 

47 """ 

48 

49 _flow_run_id: UUID 1a

50 _queue: asyncio.Queue[Union[Log, Event, None]] 1a

51 _tasks: list[asyncio.Task[None]] 1a

52 _flow_completed: bool 1a

53 _straggler_timeout: int 1a

54 _reconnection_attempts: int 1a

55 _log_filter: LogFilter 1a

56 _event_filter: EventFilter 1a

57 _logs_subscriber: PrefectLogsSubscriber | Any 1a

58 _events_subscriber: PrefectEventSubscriber | Any 1a

59 _sentinels_received: int 1a

60 

61 def __init__( 1a

62 self, 

63 flow_run_id: UUID, 

64 straggler_timeout: int = 3, 

65 reconnection_attempts: int = 10, 

66 ): 

67 """ 

68 Args: 

69 flow_run_id: The ID of the flow run to follow 

70 straggler_timeout: After a terminal event, how long (in seconds) to wait 

71 for additional logs before stopping 

72 reconnection_attempts: Number of times to attempt reconnection if 

73 the websocket connection is lost 

74 """ 

75 self._flow_run_id = flow_run_id 

76 self._straggler_timeout = straggler_timeout 

77 self._reconnection_attempts = reconnection_attempts 

78 self._queue = asyncio.Queue() 

79 self._tasks = [] 

80 self._flow_completed = False 

81 self._sentinels_received = 0 

82 

83 self._log_filter = LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run_id])) 

84 self._event_filter = EventFilter( 

85 any_resource=EventAnyResourceFilter(id=[f"prefect.flow-run.{flow_run_id}"]) 

86 ) 

87 

88 self._logs_subscriber = None 

89 self._events_subscriber = None 

90 

91 async def __aenter__(self) -> Self: 1a

92 """Enter the async context manager""" 

93 self._logs_subscriber = get_logs_subscriber( 

94 filter=self._log_filter, reconnection_attempts=self._reconnection_attempts 

95 ) 

96 self._events_subscriber = get_events_subscriber( 

97 filter=self._event_filter, reconnection_attempts=self._reconnection_attempts 

98 ) 

99 

100 await self._logs_subscriber.__aenter__() 

101 await self._events_subscriber.__aenter__() 

102 

103 self._tasks = [ 

104 asyncio.create_task(self._consume_logs()), 

105 asyncio.create_task(self._consume_events()), 

106 ] 

107 

108 return self 

109 

110 async def __aexit__( 1a

111 self, 

112 exc_type: Optional[type[BaseException]], 

113 exc_val: Optional[BaseException], 

114 exc_tb: Optional[TracebackType], 

115 ) -> None: 

116 """Exit the async context manager and clean up resources""" 

117 for task in self._tasks: 

118 task.cancel() 

119 

120 await asyncio.gather(*self._tasks, return_exceptions=True) 

121 

122 await self._logs_subscriber.__aexit__(exc_type, exc_val, exc_tb) 

123 await self._events_subscriber.__aexit__(exc_type, exc_val, exc_tb) 

124 

125 def __aiter__(self) -> Self: 1a

126 """Return self as an async iterator""" 

127 return self 

128 

129 async def __anext__(self) -> Union[Log, Event]: 1a

130 """Get the next log or event from the interleaved stream""" 

131 while self._sentinels_received < len(self._tasks): 

132 if self._flow_completed: 

133 try: 

134 item = await asyncio.wait_for( 

135 self._queue.get(), timeout=self._straggler_timeout 

136 ) 

137 except asyncio.TimeoutError: 

138 raise StopAsyncIteration 

139 else: 

140 item = await self._queue.get() 

141 

142 if item is None: 

143 self._sentinels_received += 1 

144 continue 

145 

146 return item 

147 

148 raise StopAsyncIteration 

149 

150 async def _consume_logs(self) -> None: 1a

151 """Background task to consume logs and put them in the queue""" 

152 try: 

153 async for log in self._logs_subscriber: 

154 await self._queue.put(log) 

155 except asyncio.CancelledError: 

156 pass 

157 except Exception: 

158 pass 

159 finally: 

160 await self._queue.put(None) 

161 

162 async def _consume_events(self) -> None: 1a

163 """Background task to consume events and put them in the queue""" 

164 try: 

165 async for event in self._events_subscriber: 

166 await self._queue.put(event) 

167 

168 # Check if this is a terminal state event for our flow run 

169 if event.resource.id == f"prefect.flow-run.{self._flow_run_id}": 

170 # Get state type from event resource or payload 

171 state_type_str = event.resource.get("prefect.state-type") 

172 if not state_type_str and "validated_state" in event.payload: 

173 state_type_str = event.payload["validated_state"].get("type") 

174 

175 if state_type_str: 

176 try: 

177 state_type = StateType(state_type_str) 

178 if state_type in TERMINAL_STATES: 

179 self._flow_completed = True 

180 break 

181 except ValueError: 

182 pass 

183 except Exception: 

184 pass 

185 finally: 

186 await self._queue.put(None)