Coverage for /usr/local/lib/python3.12/site-packages/prefect/_waiters.py: 24%

108 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import asyncio 1a

4import atexit 1a

5import threading 1a

6import uuid 1a

7from typing import ( 1a

8 TYPE_CHECKING, 

9 Callable, 

10) 

11 

12import anyio 1a

13from cachetools import TTLCache 1a

14from typing_extensions import Self 1a

15 

16from prefect._internal.concurrency.api import create_call, from_async, from_sync 1a

17from prefect._internal.concurrency.threads import get_global_loop 1a

18from prefect.client.schemas.objects import ( 1a

19 TERMINAL_STATES, 

20) 

21from prefect.events.clients import get_events_subscriber 1a

22from prefect.events.filters import EventFilter, EventNameFilter 1a

23from prefect.logging import get_logger 1a

24 

25if TYPE_CHECKING: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true1a

26 import logging 

27 

28 

29class FlowRunWaiter: 1a

30 """ 

31 A service used for waiting for a flow run to finish. 

32 

33 This service listens for flow run events and provides a way to wait for a specific 

34 flow run to finish. This is useful for waiting for a flow run to finish before 

35 continuing execution. 

36 

37 The service is a singleton and must be started before use. The service will 

38 automatically start when the first instance is created. A single websocket 

39 connection is used to listen for flow run events. 

40 

41 The service can be used to wait for a flow run to finish by calling 

42 `FlowRunWaiter.wait_for_flow_run` with the flow run ID to wait for. The method 

43 will return when the flow run has finished or the timeout has elapsed. 

44 

45 The service will automatically stop when the Python process exits or when the 

46 global loop thread is stopped. 

47 

48 Example: 

49 ```python 

50 import asyncio 

51 from uuid import uuid4 

52 

53 from prefect import flow 

54 from prefect.flow_engine import run_flow_async 

55 from prefect.flow_runs import FlowRunWaiter 

56 

57 

58 @flow 

59 async def test_flow(): 

60 await asyncio.sleep(5) 

61 print("Done!") 

62 

63 

64 async def main(): 

65 flow_run_id = uuid4() 

66 asyncio.create_flow(run_flow_async(flow=test_flow, flow_run_id=flow_run_id)) 

67 

68 await FlowRunWaiter.wait_for_flow_run(flow_run_id) 

69 print("Flow run finished") 

70 

71 

72 if __name__ == "__main__": 

73 asyncio.run(main()) 

74 ``` 

75 """ 

76 

77 _instance: Self | None = None 1a

78 _instance_lock = threading.Lock() 1a

79 

80 def __init__(self): 1a

81 self.logger: "logging.Logger" = get_logger("FlowRunWaiter") 

82 self._consumer_task: asyncio.Task[None] | None = None 

83 self._observed_completed_flow_runs: TTLCache[uuid.UUID, bool] = TTLCache( 

84 maxsize=10000, ttl=600 

85 ) 

86 self._completion_events: dict[uuid.UUID, asyncio.Event] = {} 

87 self._completion_callbacks: dict[uuid.UUID, Callable[[], None]] = {} 

88 self._loop: asyncio.AbstractEventLoop | None = None 

89 self._observed_completed_flow_runs_lock = threading.Lock() 

90 self._completion_events_lock = threading.Lock() 

91 self._started = False 

92 

93 def start(self) -> None: 1a

94 """ 

95 Start the FlowRunWaiter service. 

96 """ 

97 if self._started: 

98 return 

99 self.logger.debug("Starting FlowRunWaiter") 

100 loop_thread = get_global_loop() 

101 

102 if not asyncio.get_running_loop() == loop_thread.loop: 

103 raise RuntimeError("FlowRunWaiter must run on the global loop thread.") 

104 

105 self._loop = loop_thread.loop 

106 if TYPE_CHECKING: 

107 assert self._loop is not None 

108 

109 consumer_started = asyncio.Event() 

110 self._consumer_task = self._loop.create_task( 

111 self._consume_events(consumer_started) 

112 ) 

113 asyncio.run_coroutine_threadsafe(consumer_started.wait(), self._loop) 

114 

115 loop_thread.add_shutdown_call(create_call(self.stop)) 

116 atexit.register(self.stop) 

117 self._started = True 

118 

119 async def _consume_events(self, consumer_started: asyncio.Event): 1a

120 async with get_events_subscriber( 

121 filter=EventFilter( 

122 event=EventNameFilter( 

123 name=[ 

124 f"prefect.flow-run.{state.name.title()}" 

125 for state in TERMINAL_STATES 

126 ], 

127 ) 

128 ) 

129 ) as subscriber: 

130 consumer_started.set() 

131 async for event in subscriber: 

132 try: 

133 self.logger.debug( 

134 f"Received event: {event.resource['prefect.resource.id']}" 

135 ) 

136 flow_run_id = uuid.UUID( 

137 event.resource["prefect.resource.id"].replace( 

138 "prefect.flow-run.", "" 

139 ) 

140 ) 

141 

142 with self._observed_completed_flow_runs_lock: 

143 # Cache the flow run ID for a short period of time to avoid 

144 # unnecessary waits 

145 self._observed_completed_flow_runs[flow_run_id] = True 

146 with self._completion_events_lock: 

147 # Set the event for the flow run ID if it is in the cache 

148 # so the waiter can wake up the waiting coroutine 

149 if flow_run_id in self._completion_events: 

150 self._completion_events[flow_run_id].set() 

151 if flow_run_id in self._completion_callbacks: 

152 self._completion_callbacks[flow_run_id]() 

153 except Exception as exc: 

154 self.logger.error(f"Error processing event: {exc}") 

155 

156 def stop(self) -> None: 1a

157 """ 

158 Stop the FlowRunWaiter service. 

159 """ 

160 self.logger.debug("Stopping FlowRunWaiter") 

161 if self._consumer_task: 

162 self._consumer_task.cancel() 

163 self._consumer_task = None 

164 self.__class__._instance = None 

165 self._started = False 

166 

167 @classmethod 1a

168 async def wait_for_flow_run( 1a

169 cls, flow_run_id: uuid.UUID, timeout: float | None = None 

170 ) -> None: 

171 """ 

172 Wait for a flow run to finish. 

173 

174 Note this relies on a websocket connection to receive events from the server 

175 and will not work with an ephemeral server. 

176 

177 Args: 

178 flow_run_id: The ID of the flow run to wait for. 

179 timeout: The maximum time to wait for the flow run to 

180 finish. Defaults to None. 

181 """ 

182 instance = cls.instance() 

183 with instance._observed_completed_flow_runs_lock: 

184 if flow_run_id in instance._observed_completed_flow_runs: 

185 return 

186 

187 # Need to create event in loop thread to ensure it can be set 

188 # from the loop thread 

189 finished_event = await from_async.wait_for_call_in_loop_thread( 

190 create_call(asyncio.Event) 

191 ) 

192 with instance._completion_events_lock: 

193 # Cache the event for the flow run ID so the consumer can set it 

194 # when the event is received 

195 instance._completion_events[flow_run_id] = finished_event 

196 

197 try: 

198 # Now check one more time whether the flow run arrived before we start to 

199 # wait on it, in case it came in while we were setting up the event above. 

200 with instance._observed_completed_flow_runs_lock: 

201 if flow_run_id in instance._observed_completed_flow_runs: 

202 return 

203 

204 with anyio.move_on_after(delay=timeout): 

205 await from_async.wait_for_call_in_loop_thread( 

206 create_call(finished_event.wait) 

207 ) 

208 finally: 

209 with instance._completion_events_lock: 

210 # Remove the event from the cache after it has been waited on 

211 instance._completion_events.pop(flow_run_id, None) 

212 

213 @classmethod 1a

214 def add_done_callback( 1a

215 cls, flow_run_id: uuid.UUID, callback: Callable[[], None] 

216 ) -> None: 

217 """ 

218 Add a callback to be called when a flow run finishes. 

219 

220 Args: 

221 flow_run_id: The ID of the flow run to wait for. 

222 callback: The callback to call when the flow run finishes. 

223 """ 

224 instance = cls.instance() 

225 with instance._observed_completed_flow_runs_lock: 

226 if flow_run_id in instance._observed_completed_flow_runs: 

227 callback() 

228 return 

229 

230 with instance._completion_events_lock: 

231 # Cache the event for the flow run ID so the consumer can set it 

232 # when the event is received 

233 instance._completion_callbacks[flow_run_id] = callback 

234 

235 @classmethod 1a

236 def instance(cls) -> Self: 1a

237 """ 

238 Get the singleton instance of FlowRunWaiter. 

239 """ 

240 with cls._instance_lock: 

241 if cls._instance is None: 

242 cls._instance = cls._new_instance() 

243 return cls._instance 

244 

245 @classmethod 1a

246 def _new_instance(cls): 1a

247 instance = cls() 

248 

249 if threading.get_ident() == get_global_loop().thread.ident: 

250 instance.start() 

251 else: 

252 from_sync.call_soon_in_loop_thread(create_call(instance.start)).result() 

253 

254 return instance