Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/subscribers.py: 28%
95 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Flow run subscriber that interleaves events and logs from a flow run
3"""
5from __future__ import annotations 1a
7import asyncio 1a
8from types import TracebackType 1a
9from typing import TYPE_CHECKING, Any, Optional, Union 1a
10from uuid import UUID 1a
12from typing_extensions import Self 1a
14from prefect.client.schemas.filters import LogFilter, LogFilterFlowRunId 1a
15from prefect.client.schemas.objects import TERMINAL_STATES, Log, StateType 1a
16from prefect.events import Event 1a
17from prefect.events.clients import get_events_subscriber 1a
18from prefect.events.filters import EventAnyResourceFilter, EventFilter 1a
19from prefect.logging.clients import get_logs_subscriber 1a
21if TYPE_CHECKING: 21 ↛ 22line 21 didn't jump to line 22 because the condition on line 21 was never true1a
22 from prefect.events.clients import PrefectEventSubscriber
23 from prefect.logging.clients import PrefectLogsSubscriber
26class FlowRunSubscriber: 1a
27 """
28 Subscribes to both events and logs for a specific flow run, yielding them
29 in an interleaved stream.
31 This subscriber combines the event stream and log stream for a flow run into
32 a single async iterator. When a terminal event (Completed, Failed, or Crashed)
33 is received, the event subscription stops but log subscription continues for a
34 configurable timeout to catch any straggler logs.
36 Example:
37 ```python
38 from prefect.events.subscribers import FlowRunSubscriber
40 async with FlowRunSubscriber(flow_run_id=my_flow_run_id) as subscriber:
41 async for item in subscriber:
42 if isinstance(item, Event):
43 print(f"Event: {item.event}")
44 else: # isinstance(item, Log)
45 print(f"Log: {item.message}")
46 ```
47 """
49 _flow_run_id: UUID 1a
50 _queue: asyncio.Queue[Union[Log, Event, None]] 1a
51 _tasks: list[asyncio.Task[None]] 1a
52 _flow_completed: bool 1a
53 _straggler_timeout: int 1a
54 _reconnection_attempts: int 1a
55 _log_filter: LogFilter 1a
56 _event_filter: EventFilter 1a
57 _logs_subscriber: PrefectLogsSubscriber | Any 1a
58 _events_subscriber: PrefectEventSubscriber | Any 1a
59 _sentinels_received: int 1a
61 def __init__( 1a
62 self,
63 flow_run_id: UUID,
64 straggler_timeout: int = 3,
65 reconnection_attempts: int = 10,
66 ):
67 """
68 Args:
69 flow_run_id: The ID of the flow run to follow
70 straggler_timeout: After a terminal event, how long (in seconds) to wait
71 for additional logs before stopping
72 reconnection_attempts: Number of times to attempt reconnection if
73 the websocket connection is lost
74 """
75 self._flow_run_id = flow_run_id
76 self._straggler_timeout = straggler_timeout
77 self._reconnection_attempts = reconnection_attempts
78 self._queue = asyncio.Queue()
79 self._tasks = []
80 self._flow_completed = False
81 self._sentinels_received = 0
83 self._log_filter = LogFilter(flow_run_id=LogFilterFlowRunId(any_=[flow_run_id]))
84 self._event_filter = EventFilter(
85 any_resource=EventAnyResourceFilter(id=[f"prefect.flow-run.{flow_run_id}"])
86 )
88 self._logs_subscriber = None
89 self._events_subscriber = None
91 async def __aenter__(self) -> Self: 1a
92 """Enter the async context manager"""
93 self._logs_subscriber = get_logs_subscriber(
94 filter=self._log_filter, reconnection_attempts=self._reconnection_attempts
95 )
96 self._events_subscriber = get_events_subscriber(
97 filter=self._event_filter, reconnection_attempts=self._reconnection_attempts
98 )
100 await self._logs_subscriber.__aenter__()
101 await self._events_subscriber.__aenter__()
103 self._tasks = [
104 asyncio.create_task(self._consume_logs()),
105 asyncio.create_task(self._consume_events()),
106 ]
108 return self
110 async def __aexit__( 1a
111 self,
112 exc_type: Optional[type[BaseException]],
113 exc_val: Optional[BaseException],
114 exc_tb: Optional[TracebackType],
115 ) -> None:
116 """Exit the async context manager and clean up resources"""
117 for task in self._tasks:
118 task.cancel()
120 await asyncio.gather(*self._tasks, return_exceptions=True)
122 await self._logs_subscriber.__aexit__(exc_type, exc_val, exc_tb)
123 await self._events_subscriber.__aexit__(exc_type, exc_val, exc_tb)
125 def __aiter__(self) -> Self: 1a
126 """Return self as an async iterator"""
127 return self
129 async def __anext__(self) -> Union[Log, Event]: 1a
130 """Get the next log or event from the interleaved stream"""
131 while self._sentinels_received < len(self._tasks):
132 if self._flow_completed:
133 try:
134 item = await asyncio.wait_for(
135 self._queue.get(), timeout=self._straggler_timeout
136 )
137 except asyncio.TimeoutError:
138 raise StopAsyncIteration
139 else:
140 item = await self._queue.get()
142 if item is None:
143 self._sentinels_received += 1
144 continue
146 return item
148 raise StopAsyncIteration
150 async def _consume_logs(self) -> None: 1a
151 """Background task to consume logs and put them in the queue"""
152 try:
153 async for log in self._logs_subscriber:
154 await self._queue.put(log)
155 except asyncio.CancelledError:
156 pass
157 except Exception:
158 pass
159 finally:
160 await self._queue.put(None)
162 async def _consume_events(self) -> None: 1a
163 """Background task to consume events and put them in the queue"""
164 try:
165 async for event in self._events_subscriber:
166 await self._queue.put(event)
168 # Check if this is a terminal state event for our flow run
169 if event.resource.id == f"prefect.flow-run.{self._flow_run_id}":
170 # Get state type from event resource or payload
171 state_type_str = event.resource.get("prefect.state-type")
172 if not state_type_str and "validated_state" in event.payload:
173 state_type_str = event.payload["validated_state"].get("type")
175 if state_type_str:
176 try:
177 state_type = StateType(state_type_str)
178 if state_type in TERMINAL_STATES:
179 self._flow_completed = True
180 break
181 except ValueError:
182 pass
183 except Exception:
184 pass
185 finally:
186 await self._queue.put(None)