Coverage for /usr/local/lib/python3.12/site-packages/prefect/_internal/compatibility/async_dispatch.py: 54%
49 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import asyncio 1a
2import inspect 1a
3from collections.abc import Coroutine 1a
4from functools import wraps 1a
5from typing import TYPE_CHECKING, Any, Callable, TypeVar, Union, cast 1a
7from typing_extensions import ParamSpec 1a
9if TYPE_CHECKING: 9 ↛ 10line 9 didn't jump to line 10 because the condition on line 9 was never true1a
10 from prefect.flows import Flow
11 from prefect.tasks import Task
13R = TypeVar("R") 1a
14P = ParamSpec("P") 1a
17def is_in_async_context() -> bool: 1a
18 """
19 Returns True if called from within an async context.
21 An async context is one of:
22 - a coroutine
23 - a running event loop
24 - a task or flow that is async
25 """
26 from prefect.context import get_run_context
27 from prefect.exceptions import MissingContextError
29 try:
30 run_ctx = get_run_context()
31 parent_obj = getattr(run_ctx, "task", None)
32 if not parent_obj:
33 parent_obj = getattr(run_ctx, "flow", None)
34 return getattr(parent_obj, "isasync", True)
35 except MissingContextError:
36 # not in an execution context, make best effort to
37 # decide whether to syncify
38 try:
39 asyncio.get_running_loop()
40 return True
41 except RuntimeError:
42 return False
45def _is_acceptable_callable( 1a
46 obj: Union[
47 Callable[P, R], "Flow[P, R]", "Task[P, R]", "classmethod[type[Any], P, R]"
48 ],
49) -> bool:
50 if inspect.iscoroutinefunction(obj): 1a
51 return True 1a
53 # Check if a task or flow. Need to avoid importing `Task` or `Flow` here
54 # due to circular imports.
55 if (fn := getattr(obj, "fn", None)) and inspect.iscoroutinefunction(fn): 55 ↛ 56line 55 didn't jump to line 56 because the condition on line 55 was never true1a
56 return True
58 if isinstance(obj, classmethod) and inspect.iscoroutinefunction(obj.__func__): 58 ↛ 61line 58 didn't jump to line 61 because the condition on line 58 was always true1a
59 return True 1a
61 return False
64def async_dispatch( 1a
65 async_impl: Union[
66 Callable[P, Coroutine[Any, Any, R]],
67 "classmethod[type[Any], P, Coroutine[Any, Any, R]]",
68 ],
69) -> Callable[[Callable[P, R]], Callable[P, Union[R, Coroutine[Any, Any, R]]]]:
70 """
71 Decorator that dispatches to either sync or async implementation based on context.
73 Args:
74 async_impl: The async implementation to dispatch to when in async context
75 """
76 if not _is_acceptable_callable(async_impl): 76 ↛ 77line 76 didn't jump to line 77 because the condition on line 76 was never true1a
77 raise TypeError("async_impl must be an async function")
78 if isinstance(async_impl, classmethod): 1a
79 async_impl = cast(Callable[P, Coroutine[Any, Any, R]], async_impl.__func__) 1a
81 def decorator( 1a
82 sync_fn: Callable[P, R],
83 ) -> Callable[P, Union[R, Coroutine[Any, Any, R]]]:
84 @wraps(sync_fn) 1a
85 def wrapper( 1a
86 *args: P.args,
87 **kwargs: P.kwargs,
88 ) -> Union[R, Coroutine[Any, Any, R]]:
89 _sync = kwargs.pop("_sync", None)
90 should_run_sync = (
91 bool(_sync) if _sync is not None else not is_in_async_context()
92 )
93 fn = sync_fn if should_run_sync else async_impl
94 return fn(*args, **kwargs)
96 # Add the .aio attribute for compatibility with existing code that expects it
97 # (e.g., CLI commands, tests that mock .aio)
98 wrapper.aio = async_impl # type: ignore 1a
99 return wrapper 1a
101 return decorator 1a