Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/api.py: 41%

112 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Primary developer-facing API for concurrency management. 

3""" 

4 

5import abc 1a

6import asyncio 1a

7import concurrent.futures 1a

8import contextlib 1a

9from collections.abc import Awaitable, Iterable 1a

10from contextlib import AbstractContextManager 1a

11from typing import Any, Callable, Optional, Union, cast 1a

12 

13from typing_extensions import ParamSpec, TypeAlias, TypeVar 1a

14 

15from prefect._internal.concurrency.threads import ( 1a

16 WorkerThread, 

17 get_global_loop, 

18 in_global_loop, 

19) 

20from prefect._internal.concurrency.waiters import AsyncWaiter, Call, SyncWaiter 1a

21 

22P = ParamSpec("P") 1a

23T = TypeVar("T", infer_variance=True) 1a

24Future = Union[concurrent.futures.Future[T], asyncio.Future[T]] 1a

25 

26_SyncOrAsyncCallable: TypeAlias = Callable[P, Union[T, Awaitable[T]]] 1a

27 

28 

29def create_call( 1a

30 __fn: _SyncOrAsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs 

31) -> Call[T]: 

32 return Call[T].new(__fn, *args, **kwargs) 1bcd

33 

34 

35def cast_to_call( 1a

36 call_like: Union["_SyncOrAsyncCallable[[], T]", Call[T]], 

37) -> Call[T]: 

38 if isinstance(call_like, Call): 38 ↛ 39line 38 didn't jump to line 39 because the condition on line 38 was never true1bcd

39 return cast(Call[T], call_like) 

40 else: 

41 return create_call(call_like) 1bcd

42 

43 

44class _base(abc.ABC): 1a

45 @staticmethod 1a

46 @abc.abstractmethod 1a

47 def wait_for_call_in_loop_thread( 1a

48 __call: Union["_SyncOrAsyncCallable[[], Any]", Call[T]], 

49 timeout: Optional[float] = None, 

50 done_callbacks: Optional[Iterable[Call[Any]]] = None, 

51 ) -> T: 

52 """ 

53 Schedule a function in the global worker thread and wait for completion. 

54 

55 Returns the result of the call. 

56 """ 

57 raise NotImplementedError() 

58 

59 @staticmethod 1a

60 @abc.abstractmethod 1a

61 def wait_for_call_in_new_thread( 1a

62 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]], 

63 timeout: Optional[float] = None, 

64 done_callbacks: Optional[Iterable[Call[Any]]] = None, 

65 ) -> T: 

66 """ 

67 Schedule a function in a new worker thread. 

68 

69 Returns the result of the call. 

70 """ 

71 raise NotImplementedError() 

72 

73 @staticmethod 1a

74 def call_soon_in_new_thread( 1a

75 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]], 

76 timeout: Optional[float] = None, 

77 ) -> Call[T]: 

78 """ 

79 Schedule a call for execution in a new worker thread. 

80 

81 Returns the submitted call. 

82 """ 

83 call = cast_to_call(__call) 

84 runner = WorkerThread(run_once=True) 

85 call.set_timeout(timeout) 

86 runner.submit(call) 

87 return call 

88 

89 @staticmethod 1a

90 def call_soon_in_loop_thread( 1a

91 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]], 

92 timeout: Optional[float] = None, 

93 ) -> Call[T]: 

94 """ 

95 Schedule a call for execution in the global event loop thread. 

96 

97 Returns the submitted call. 

98 """ 

99 call = cast_to_call(__call) 

100 runner = get_global_loop() 

101 call.set_timeout(timeout) 

102 runner.submit(call) 

103 return call 

104 

105 @staticmethod 1a

106 def call_in_new_thread( 1a

107 __call: Union[Callable[[], T], Call[T]], timeout: Optional[float] = None 

108 ) -> T: 

109 """ 

110 Run a call in a new worker thread. 

111 

112 Returns the result of the call. 

113 """ 

114 raise NotImplementedError() 

115 

116 @staticmethod 1a

117 def call_in_loop_thread( 1a

118 __call: Union[Callable[[], Awaitable[T]], Call[T]], 

119 timeout: Optional[float] = None, 

120 ) -> T: 

121 """ 

122 Run a call in the global event loop thread. 

123 

124 Returns the result of the call. 

125 """ 

126 raise NotImplementedError() 

127 

128 

129class from_async(_base): 1a

130 @staticmethod 1a

131 async def wait_for_call_in_loop_thread( 1a

132 __call: Union[Callable[[], Awaitable[T]], Call[T]], 

133 timeout: Optional[float] = None, 

134 done_callbacks: Optional[Iterable[Call[Any]]] = None, 

135 contexts: Optional[Iterable[AbstractContextManager[Any]]] = None, 

136 ) -> T: 

137 call = cast_to_call(__call) 

138 waiter = AsyncWaiter(call) 

139 for callback in done_callbacks or []: 

140 waiter.add_done_callback(callback) 

141 _base.call_soon_in_loop_thread(call, timeout=timeout) 

142 with contextlib.ExitStack() as stack: 

143 for context in contexts or []: 

144 stack.enter_context(context) 

145 await waiter.wait() 

146 return call.result() 

147 

148 @staticmethod 1a

149 async def wait_for_call_in_new_thread( 1a

150 __call: Union[Callable[[], T], Call[T]], 

151 timeout: Optional[float] = None, 

152 done_callbacks: Optional[Iterable[Call[Any]]] = None, 

153 ) -> T: 

154 call = cast_to_call(__call) 

155 waiter = AsyncWaiter(call=call) 

156 for callback in done_callbacks or []: 

157 waiter.add_done_callback(callback) 

158 _base.call_soon_in_new_thread(call, timeout=timeout) 

159 await waiter.wait() 

160 return call.result() 

161 

162 @staticmethod 1a

163 def call_in_new_thread( 1a

164 __call: Union[Callable[[], T], Call[T]], timeout: Optional[float] = None 

165 ) -> Awaitable[T]: 

166 call = _base.call_soon_in_new_thread(__call, timeout=timeout) 

167 return call.aresult() 

168 

169 @staticmethod 1a

170 def call_in_loop_thread( 1a

171 __call: Union[Callable[[], Awaitable[T]], Call[T]], 

172 timeout: Optional[float] = None, 

173 ) -> Awaitable[T]: 

174 call = _base.call_soon_in_loop_thread(__call, timeout=timeout) 

175 return call.aresult() 

176 

177 

178class from_sync(_base): 1a

179 @staticmethod 1a

180 def wait_for_call_in_loop_thread( 1a

181 __call: Union[ 

182 Callable[[], Awaitable[T]], 

183 Call[T], 

184 ], 

185 timeout: Optional[float] = None, 

186 done_callbacks: Optional[Iterable[Call[T]]] = None, 

187 contexts: Optional[Iterable[AbstractContextManager[Any]]] = None, 

188 ) -> T: 

189 call = cast_to_call(__call) 

190 waiter = SyncWaiter(call) 

191 _base.call_soon_in_loop_thread(call, timeout=timeout) 

192 for callback in done_callbacks or []: 

193 waiter.add_done_callback(callback) 

194 with contextlib.ExitStack() as stack: 

195 for context in contexts or []: 

196 stack.enter_context(context) 

197 waiter.wait() 

198 return call.result() 

199 

200 @staticmethod 1a

201 def wait_for_call_in_new_thread( 1a

202 __call: Union[Callable[[], T], Call[T]], 

203 timeout: Optional[float] = None, 

204 done_callbacks: Optional[Iterable[Call[T]]] = None, 

205 ) -> T: 

206 call = cast_to_call(__call) 

207 waiter = SyncWaiter(call=call) 

208 for callback in done_callbacks or []: 

209 waiter.add_done_callback(callback) 

210 _base.call_soon_in_new_thread(call, timeout=timeout) 

211 waiter.wait() 

212 return call.result() 

213 

214 @staticmethod 1a

215 def call_in_new_thread( 1a

216 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]], 

217 timeout: Optional[float] = None, 

218 ) -> T: 

219 call = _base.call_soon_in_new_thread(__call, timeout=timeout) 

220 return call.result() 

221 

222 @staticmethod 1a

223 def call_in_loop_thread( 1a

224 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]], 

225 timeout: Optional[float] = None, 

226 ) -> Union[Awaitable[T], T]: 

227 if in_global_loop(): 

228 # Avoid deadlock where the call is submitted to the loop then the loop is 

229 # blocked waiting for the call 

230 call = cast_to_call(__call) 

231 return call() 

232 

233 call = _base.call_soon_in_loop_thread(__call, timeout=timeout) 

234 return call.result()