Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/event_loop.py: 39%
32 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1"""
2Thread-safe utilities for working with asynchronous event loops.
3"""
5import asyncio 1a
6import concurrent.futures 1a
7import functools 1a
8from collections.abc import Coroutine 1a
9from typing import Any, Callable, Optional, TypeVar 1a
11from typing_extensions import ParamSpec 1a
13P = ParamSpec("P") 1a
14T = TypeVar("T") 1a
17def get_running_loop() -> Optional[asyncio.AbstractEventLoop]: 1a
18 """
19 Get the current running loop.
21 Returns `None` if there is no running loop.
22 """
23 try: 1a
24 return asyncio.get_running_loop() 1a
25 except RuntimeError: 1a
26 return None 1a
29def call_soon_in_loop( 1a
30 __loop: asyncio.AbstractEventLoop,
31 __fn: Callable[P, T],
32 *args: P.args,
33 **kwargs: P.kwargs,
34) -> concurrent.futures.Future[T]:
35 """
36 Run a synchronous call in an event loop's thread from another thread.
38 This function is non-blocking and safe to call from an asynchronous context.
40 Returns a future that can be used to retrieve the result of the call.
41 """
42 future: concurrent.futures.Future[T] = concurrent.futures.Future()
44 @functools.wraps(__fn)
45 def wrapper() -> None:
46 try:
47 result = __fn(*args, **kwargs)
48 except BaseException as exc:
49 future.set_exception(exc)
50 if not isinstance(exc, Exception):
51 raise
52 else:
53 future.set_result(result)
55 # `call_soon...` returns a `Handle` object which doesn't provide access to the
56 # result of the call. We wrap the call with a future to facilitate retrieval.
57 if __loop is get_running_loop():
58 __loop.call_soon(wrapper)
59 else:
60 __loop.call_soon_threadsafe(wrapper)
62 return future
65async def run_coroutine_in_loop_from_async( 1a
66 __loop: asyncio.AbstractEventLoop, __coro: Coroutine[Any, Any, T]
67) -> T:
68 """
69 Run an asynchronous call in an event loop from an asynchronous context.
71 Returns an awaitable that returns the result of the coroutine.
72 """
73 if __loop is get_running_loop():
74 return await __coro
75 else:
76 return await asyncio.wrap_future(
77 asyncio.run_coroutine_threadsafe(__coro, __loop)
78 )