Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/concurrency/event_loop.py: 39%

32 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Thread-safe utilities for working with asynchronous event loops. 

3""" 

4 

5import asyncio 1a

6import concurrent.futures 1a

7import functools 1a

8from collections.abc import Coroutine 1a

9from typing import Any, Callable, Optional, TypeVar 1a

10 

11from typing_extensions import ParamSpec 1a

12 

13P = ParamSpec("P") 1a

14T = TypeVar("T") 1a

15 

16 

17def get_running_loop() -> Optional[asyncio.AbstractEventLoop]: 1a

18 """ 

19 Get the current running loop. 

20 

21 Returns `None` if there is no running loop. 

22 """ 

23 try: 1abcde

24 return asyncio.get_running_loop() 1abcde

25 except RuntimeError: 1a

26 return None 1a

27 

28 

29def call_soon_in_loop( 1a

30 __loop: asyncio.AbstractEventLoop, 

31 __fn: Callable[P, T], 

32 *args: P.args, 

33 **kwargs: P.kwargs, 

34) -> concurrent.futures.Future[T]: 

35 """ 

36 Run a synchronous call in an event loop's thread from another thread. 

37 

38 This function is non-blocking and safe to call from an asynchronous context. 

39 

40 Returns a future that can be used to retrieve the result of the call. 

41 """ 

42 future: concurrent.futures.Future[T] = concurrent.futures.Future() 

43 

44 @functools.wraps(__fn) 

45 def wrapper() -> None: 

46 try: 

47 result = __fn(*args, **kwargs) 

48 except BaseException as exc: 

49 future.set_exception(exc) 

50 if not isinstance(exc, Exception): 

51 raise 

52 else: 

53 future.set_result(result) 

54 

55 # `call_soon...` returns a `Handle` object which doesn't provide access to the 

56 # result of the call. We wrap the call with a future to facilitate retrieval. 

57 if __loop is get_running_loop(): 

58 __loop.call_soon(wrapper) 

59 else: 

60 __loop.call_soon_threadsafe(wrapper) 

61 

62 return future 

63 

64 

65async def run_coroutine_in_loop_from_async( 1a

66 __loop: asyncio.AbstractEventLoop, __coro: Coroutine[Any, Any, T] 

67) -> T: 

68 """ 

69 Run an asynchronous call in an event loop from an asynchronous context. 

70 

71 Returns an awaitable that returns the result of the coroutine. 

72 """ 

73 if __loop is get_running_loop(): 

74 return await __coro 

75 else: 

76 return await asyncio.wrap_future( 

77 asyncio.run_coroutine_threadsafe(__coro, __loop) 

78 )