Coverage for /usr/local/lib/python3.12/site-packages/prefect/_waiters.py: 24%
108 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import asyncio 1a
4import atexit 1a
5import threading 1a
6import uuid 1a
7from typing import ( 1a
8 TYPE_CHECKING,
9 Callable,
10)
12import anyio 1a
13from cachetools import TTLCache 1a
14from typing_extensions import Self 1a
16from prefect._internal.concurrency.api import create_call, from_async, from_sync 1a
17from prefect._internal.concurrency.threads import get_global_loop 1a
18from prefect.client.schemas.objects import ( 1a
19 TERMINAL_STATES,
20)
21from prefect.events.clients import get_events_subscriber 1a
22from prefect.events.filters import EventFilter, EventNameFilter 1a
23from prefect.logging import get_logger 1a
25if TYPE_CHECKING: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true1a
26 import logging
29class FlowRunWaiter: 1a
30 """
31 A service used for waiting for a flow run to finish.
33 This service listens for flow run events and provides a way to wait for a specific
34 flow run to finish. This is useful for waiting for a flow run to finish before
35 continuing execution.
37 The service is a singleton and must be started before use. The service will
38 automatically start when the first instance is created. A single websocket
39 connection is used to listen for flow run events.
41 The service can be used to wait for a flow run to finish by calling
42 `FlowRunWaiter.wait_for_flow_run` with the flow run ID to wait for. The method
43 will return when the flow run has finished or the timeout has elapsed.
45 The service will automatically stop when the Python process exits or when the
46 global loop thread is stopped.
48 Example:
49 ```python
50 import asyncio
51 from uuid import uuid4
53 from prefect import flow
54 from prefect.flow_engine import run_flow_async
55 from prefect.flow_runs import FlowRunWaiter
58 @flow
59 async def test_flow():
60 await asyncio.sleep(5)
61 print("Done!")
64 async def main():
65 flow_run_id = uuid4()
66 asyncio.create_flow(run_flow_async(flow=test_flow, flow_run_id=flow_run_id))
68 await FlowRunWaiter.wait_for_flow_run(flow_run_id)
69 print("Flow run finished")
72 if __name__ == "__main__":
73 asyncio.run(main())
74 ```
75 """
77 _instance: Self | None = None 1a
78 _instance_lock = threading.Lock() 1a
80 def __init__(self): 1a
81 self.logger: "logging.Logger" = get_logger("FlowRunWaiter")
82 self._consumer_task: asyncio.Task[None] | None = None
83 self._observed_completed_flow_runs: TTLCache[uuid.UUID, bool] = TTLCache(
84 maxsize=10000, ttl=600
85 )
86 self._completion_events: dict[uuid.UUID, asyncio.Event] = {}
87 self._completion_callbacks: dict[uuid.UUID, Callable[[], None]] = {}
88 self._loop: asyncio.AbstractEventLoop | None = None
89 self._observed_completed_flow_runs_lock = threading.Lock()
90 self._completion_events_lock = threading.Lock()
91 self._started = False
93 def start(self) -> None: 1a
94 """
95 Start the FlowRunWaiter service.
96 """
97 if self._started:
98 return
99 self.logger.debug("Starting FlowRunWaiter")
100 loop_thread = get_global_loop()
102 if not asyncio.get_running_loop() == loop_thread.loop:
103 raise RuntimeError("FlowRunWaiter must run on the global loop thread.")
105 self._loop = loop_thread.loop
106 if TYPE_CHECKING:
107 assert self._loop is not None
109 consumer_started = asyncio.Event()
110 self._consumer_task = self._loop.create_task(
111 self._consume_events(consumer_started)
112 )
113 asyncio.run_coroutine_threadsafe(consumer_started.wait(), self._loop)
115 loop_thread.add_shutdown_call(create_call(self.stop))
116 atexit.register(self.stop)
117 self._started = True
119 async def _consume_events(self, consumer_started: asyncio.Event): 1a
120 async with get_events_subscriber(
121 filter=EventFilter(
122 event=EventNameFilter(
123 name=[
124 f"prefect.flow-run.{state.name.title()}"
125 for state in TERMINAL_STATES
126 ],
127 )
128 )
129 ) as subscriber:
130 consumer_started.set()
131 async for event in subscriber:
132 try:
133 self.logger.debug(
134 f"Received event: {event.resource['prefect.resource.id']}"
135 )
136 flow_run_id = uuid.UUID(
137 event.resource["prefect.resource.id"].replace(
138 "prefect.flow-run.", ""
139 )
140 )
142 with self._observed_completed_flow_runs_lock:
143 # Cache the flow run ID for a short period of time to avoid
144 # unnecessary waits
145 self._observed_completed_flow_runs[flow_run_id] = True
146 with self._completion_events_lock:
147 # Set the event for the flow run ID if it is in the cache
148 # so the waiter can wake up the waiting coroutine
149 if flow_run_id in self._completion_events:
150 self._completion_events[flow_run_id].set()
151 if flow_run_id in self._completion_callbacks:
152 self._completion_callbacks[flow_run_id]()
153 except Exception as exc:
154 self.logger.error(f"Error processing event: {exc}")
156 def stop(self) -> None: 1a
157 """
158 Stop the FlowRunWaiter service.
159 """
160 self.logger.debug("Stopping FlowRunWaiter")
161 if self._consumer_task:
162 self._consumer_task.cancel()
163 self._consumer_task = None
164 self.__class__._instance = None
165 self._started = False
167 @classmethod 1a
168 async def wait_for_flow_run( 1a
169 cls, flow_run_id: uuid.UUID, timeout: float | None = None
170 ) -> None:
171 """
172 Wait for a flow run to finish.
174 Note this relies on a websocket connection to receive events from the server
175 and will not work with an ephemeral server.
177 Args:
178 flow_run_id: The ID of the flow run to wait for.
179 timeout: The maximum time to wait for the flow run to
180 finish. Defaults to None.
181 """
182 instance = cls.instance()
183 with instance._observed_completed_flow_runs_lock:
184 if flow_run_id in instance._observed_completed_flow_runs:
185 return
187 # Need to create event in loop thread to ensure it can be set
188 # from the loop thread
189 finished_event = await from_async.wait_for_call_in_loop_thread(
190 create_call(asyncio.Event)
191 )
192 with instance._completion_events_lock:
193 # Cache the event for the flow run ID so the consumer can set it
194 # when the event is received
195 instance._completion_events[flow_run_id] = finished_event
197 try:
198 # Now check one more time whether the flow run arrived before we start to
199 # wait on it, in case it came in while we were setting up the event above.
200 with instance._observed_completed_flow_runs_lock:
201 if flow_run_id in instance._observed_completed_flow_runs:
202 return
204 with anyio.move_on_after(delay=timeout):
205 await from_async.wait_for_call_in_loop_thread(
206 create_call(finished_event.wait)
207 )
208 finally:
209 with instance._completion_events_lock:
210 # Remove the event from the cache after it has been waited on
211 instance._completion_events.pop(flow_run_id, None)
213 @classmethod 1a
214 def add_done_callback( 1a
215 cls, flow_run_id: uuid.UUID, callback: Callable[[], None]
216 ) -> None:
217 """
218 Add a callback to be called when a flow run finishes.
220 Args:
221 flow_run_id: The ID of the flow run to wait for.
222 callback: The callback to call when the flow run finishes.
223 """
224 instance = cls.instance()
225 with instance._observed_completed_flow_runs_lock:
226 if flow_run_id in instance._observed_completed_flow_runs:
227 callback()
228 return
230 with instance._completion_events_lock:
231 # Cache the event for the flow run ID so the consumer can set it
232 # when the event is received
233 instance._completion_callbacks[flow_run_id] = callback
235 @classmethod 1a
236 def instance(cls) -> Self: 1a
237 """
238 Get the singleton instance of FlowRunWaiter.
239 """
240 with cls._instance_lock:
241 if cls._instance is None:
242 cls._instance = cls._new_instance()
243 return cls._instance
245 @classmethod 1a
246 def _new_instance(cls): 1a
247 instance = cls()
249 if threading.get_ident() == get_global_loop().thread.ident:
250 instance.start()
251 else:
252 from_sync.call_soon_in_loop_thread(create_call(instance.start)).result()
254 return instance