Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/api.py: 38%
112 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Primary developer-facing API for concurrency management.
3"""
5import abc 1a
6import asyncio 1a
7import concurrent.futures 1a
8import contextlib 1a
9from collections.abc import Awaitable, Iterable 1a
10from contextlib import AbstractContextManager 1a
11from typing import Any, Callable, Optional, Union, cast 1a
13from typing_extensions import ParamSpec, TypeAlias, TypeVar 1a
15from prefect._internal.concurrency.threads import ( 1a
16 WorkerThread,
17 get_global_loop,
18 in_global_loop,
19)
20from prefect._internal.concurrency.waiters import AsyncWaiter, Call, SyncWaiter 1a
22P = ParamSpec("P") 1a
23T = TypeVar("T", infer_variance=True) 1a
24Future = Union[concurrent.futures.Future[T], asyncio.Future[T]] 1a
26_SyncOrAsyncCallable: TypeAlias = Callable[P, Union[T, Awaitable[T]]] 1a
29def create_call( 1a
30 __fn: _SyncOrAsyncCallable[P, T], *args: P.args, **kwargs: P.kwargs
31) -> Call[T]:
32 return Call[T].new(__fn, *args, **kwargs)
35def cast_to_call( 1a
36 call_like: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
37) -> Call[T]:
38 if isinstance(call_like, Call):
39 return cast(Call[T], call_like)
40 else:
41 return create_call(call_like)
44class _base(abc.ABC): 1a
45 @staticmethod 1a
46 @abc.abstractmethod 1a
47 def wait_for_call_in_loop_thread( 1a
48 __call: Union["_SyncOrAsyncCallable[[], Any]", Call[T]],
49 timeout: Optional[float] = None,
50 done_callbacks: Optional[Iterable[Call[Any]]] = None,
51 ) -> T:
52 """
53 Schedule a function in the global worker thread and wait for completion.
55 Returns the result of the call.
56 """
57 raise NotImplementedError()
59 @staticmethod 1a
60 @abc.abstractmethod 1a
61 def wait_for_call_in_new_thread( 1a
62 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
63 timeout: Optional[float] = None,
64 done_callbacks: Optional[Iterable[Call[Any]]] = None,
65 ) -> T:
66 """
67 Schedule a function in a new worker thread.
69 Returns the result of the call.
70 """
71 raise NotImplementedError()
73 @staticmethod 1a
74 def call_soon_in_new_thread( 1a
75 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
76 timeout: Optional[float] = None,
77 ) -> Call[T]:
78 """
79 Schedule a call for execution in a new worker thread.
81 Returns the submitted call.
82 """
83 call = cast_to_call(__call)
84 runner = WorkerThread(run_once=True)
85 call.set_timeout(timeout)
86 runner.submit(call)
87 return call
89 @staticmethod 1a
90 def call_soon_in_loop_thread( 1a
91 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
92 timeout: Optional[float] = None,
93 ) -> Call[T]:
94 """
95 Schedule a call for execution in the global event loop thread.
97 Returns the submitted call.
98 """
99 call = cast_to_call(__call)
100 runner = get_global_loop()
101 call.set_timeout(timeout)
102 runner.submit(call)
103 return call
105 @staticmethod 1a
106 def call_in_new_thread( 1a
107 __call: Union[Callable[[], T], Call[T]], timeout: Optional[float] = None
108 ) -> T:
109 """
110 Run a call in a new worker thread.
112 Returns the result of the call.
113 """
114 raise NotImplementedError()
116 @staticmethod 1a
117 def call_in_loop_thread( 1a
118 __call: Union[Callable[[], Awaitable[T]], Call[T]],
119 timeout: Optional[float] = None,
120 ) -> T:
121 """
122 Run a call in the global event loop thread.
124 Returns the result of the call.
125 """
126 raise NotImplementedError()
129class from_async(_base): 1a
130 @staticmethod 1a
131 async def wait_for_call_in_loop_thread( 1a
132 __call: Union[Callable[[], Awaitable[T]], Call[T]],
133 timeout: Optional[float] = None,
134 done_callbacks: Optional[Iterable[Call[Any]]] = None,
135 contexts: Optional[Iterable[AbstractContextManager[Any]]] = None,
136 ) -> T:
137 call = cast_to_call(__call)
138 waiter = AsyncWaiter(call)
139 for callback in done_callbacks or []:
140 waiter.add_done_callback(callback)
141 _base.call_soon_in_loop_thread(call, timeout=timeout)
142 with contextlib.ExitStack() as stack:
143 for context in contexts or []:
144 stack.enter_context(context)
145 await waiter.wait()
146 return call.result()
148 @staticmethod 1a
149 async def wait_for_call_in_new_thread( 1a
150 __call: Union[Callable[[], T], Call[T]],
151 timeout: Optional[float] = None,
152 done_callbacks: Optional[Iterable[Call[Any]]] = None,
153 ) -> T:
154 call = cast_to_call(__call)
155 waiter = AsyncWaiter(call=call)
156 for callback in done_callbacks or []:
157 waiter.add_done_callback(callback)
158 _base.call_soon_in_new_thread(call, timeout=timeout)
159 await waiter.wait()
160 return call.result()
162 @staticmethod 1a
163 def call_in_new_thread( 1a
164 __call: Union[Callable[[], T], Call[T]], timeout: Optional[float] = None
165 ) -> Awaitable[T]:
166 call = _base.call_soon_in_new_thread(__call, timeout=timeout)
167 return call.aresult()
169 @staticmethod 1a
170 def call_in_loop_thread( 1a
171 __call: Union[Callable[[], Awaitable[T]], Call[T]],
172 timeout: Optional[float] = None,
173 ) -> Awaitable[T]:
174 call = _base.call_soon_in_loop_thread(__call, timeout=timeout)
175 return call.aresult()
178class from_sync(_base): 1a
179 @staticmethod 1a
180 def wait_for_call_in_loop_thread( 1a
181 __call: Union[
182 Callable[[], Awaitable[T]],
183 Call[T],
184 ],
185 timeout: Optional[float] = None,
186 done_callbacks: Optional[Iterable[Call[T]]] = None,
187 contexts: Optional[Iterable[AbstractContextManager[Any]]] = None,
188 ) -> T:
189 call = cast_to_call(__call)
190 waiter = SyncWaiter(call)
191 _base.call_soon_in_loop_thread(call, timeout=timeout)
192 for callback in done_callbacks or []:
193 waiter.add_done_callback(callback)
194 with contextlib.ExitStack() as stack:
195 for context in contexts or []:
196 stack.enter_context(context)
197 waiter.wait()
198 return call.result()
200 @staticmethod 1a
201 def wait_for_call_in_new_thread( 1a
202 __call: Union[Callable[[], T], Call[T]],
203 timeout: Optional[float] = None,
204 done_callbacks: Optional[Iterable[Call[T]]] = None,
205 ) -> T:
206 call = cast_to_call(__call)
207 waiter = SyncWaiter(call=call)
208 for callback in done_callbacks or []:
209 waiter.add_done_callback(callback)
210 _base.call_soon_in_new_thread(call, timeout=timeout)
211 waiter.wait()
212 return call.result()
214 @staticmethod 1a
215 def call_in_new_thread( 1a
216 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
217 timeout: Optional[float] = None,
218 ) -> T:
219 call = _base.call_soon_in_new_thread(__call, timeout=timeout)
220 return call.result()
222 @staticmethod 1a
223 def call_in_loop_thread( 1a
224 __call: Union["_SyncOrAsyncCallable[[], T]", Call[T]],
225 timeout: Optional[float] = None,
226 ) -> Union[Awaitable[T], T]:
227 if in_global_loop():
228 # Avoid deadlock where the call is submitted to the loop then the loop is
229 # blocked waiting for the call
230 call = cast_to_call(__call)
231 return call()
233 call = _base.call_soon_in_loop_thread(__call, timeout=timeout)
234 return call.result()