Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/compatibility/async_dispatch.py: 54%

49 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import asyncio 1a

2import inspect 1a

3from collections.abc import Coroutine 1a

4from functools import wraps 1a

5from typing import TYPE_CHECKING, Any, Callable, TypeVar, Union, cast 1a

6 

7from typing_extensions import ParamSpec 1a

8 

9if TYPE_CHECKING: 9 ↛ 10line 9 didn't jump to line 10 because the condition on line 9 was never true1a

10 from prefect.flows import Flow 

11 from prefect.tasks import Task 

12 

13R = TypeVar("R") 1a

14P = ParamSpec("P") 1a

15 

16 

17def is_in_async_context() -> bool: 1a

18 """ 

19 Returns True if called from within an async context. 

20 

21 An async context is one of: 

22 - a coroutine 

23 - a running event loop 

24 - a task or flow that is async 

25 """ 

26 from prefect.context import get_run_context 

27 from prefect.exceptions import MissingContextError 

28 

29 try: 

30 run_ctx = get_run_context() 

31 parent_obj = getattr(run_ctx, "task", None) 

32 if not parent_obj: 

33 parent_obj = getattr(run_ctx, "flow", None) 

34 return getattr(parent_obj, "isasync", True) 

35 except MissingContextError: 

36 # not in an execution context, make best effort to 

37 # decide whether to syncify 

38 try: 

39 asyncio.get_running_loop() 

40 return True 

41 except RuntimeError: 

42 return False 

43 

44 

45def _is_acceptable_callable( 1a

46 obj: Union[ 

47 Callable[P, R], "Flow[P, R]", "Task[P, R]", "classmethod[type[Any], P, R]" 

48 ], 

49) -> bool: 

50 if inspect.iscoroutinefunction(obj): 1a

51 return True 1a

52 

53 # Check if a task or flow. Need to avoid importing `Task` or `Flow` here 

54 # due to circular imports. 

55 if (fn := getattr(obj, "fn", None)) and inspect.iscoroutinefunction(fn): 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true1a

56 return True 

57 

58 if isinstance(obj, classmethod) and inspect.iscoroutinefunction(obj.__func__): 58 ↛ 61line 58 didn't jump to line 61 because the condition on line 58 was always true1a

59 return True 1a

60 

61 return False 

62 

63 

64def async_dispatch( 1a

65 async_impl: Union[ 

66 Callable[P, Coroutine[Any, Any, R]], 

67 "classmethod[type[Any], P, Coroutine[Any, Any, R]]", 

68 ], 

69) -> Callable[[Callable[P, R]], Callable[P, Union[R, Coroutine[Any, Any, R]]]]: 

70 """ 

71 Decorator that dispatches to either sync or async implementation based on context. 

72 

73 Args: 

74 async_impl: The async implementation to dispatch to when in async context 

75 """ 

76 if not _is_acceptable_callable(async_impl): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true1a

77 raise TypeError("async_impl must be an async function") 

78 if isinstance(async_impl, classmethod): 1a

79 async_impl = cast(Callable[P, Coroutine[Any, Any, R]], async_impl.__func__) 1a

80 

81 def decorator( 1a

82 sync_fn: Callable[P, R], 

83 ) -> Callable[P, Union[R, Coroutine[Any, Any, R]]]: 

84 @wraps(sync_fn) 1a

85 def wrapper( 1a

86 *args: P.args, 

87 **kwargs: P.kwargs, 

88 ) -> Union[R, Coroutine[Any, Any, R]]: 

89 _sync = kwargs.pop("_sync", None) 

90 should_run_sync = ( 

91 bool(_sync) if _sync is not None else not is_in_async_context() 

92 ) 

93 fn = sync_fn if should_run_sync else async_impl 

94 return fn(*args, **kwargs) 

95 

96 # Add the .aio attribute for compatibility with existing code that expects it 

97 # (e.g., CLI commands, tests that mock .aio) 

98 wrapper.aio = async_impl # type: ignore 1a

99 return wrapper 1a

100 

101 return decorator 1a