Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/utilities.py: 30%
46 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Utilities for working with clients.
3"""
5# This module must not import from `prefect.client` when it is imported to avoid
6# circular imports for decorators such as `inject_client` which are widely used.
8from collections.abc import Coroutine 1a
9from functools import wraps 1a
10from typing import TYPE_CHECKING, Any, Callable, Optional, Union 1a
12from typing_extensions import Concatenate, ParamSpec, TypeGuard, TypeVar 1a
14if TYPE_CHECKING: 14 ↛ 15line 14 didn't jump to line 15 because the condition on line 14 was never true1a
15 from prefect.client.orchestration import PrefectClient, SyncPrefectClient
17P = ParamSpec("P") 1a
18R = TypeVar("R", infer_variance=True) 1a
21def _current_async_client( 1a
22 client: Union["PrefectClient", "SyncPrefectClient"],
23) -> TypeGuard["PrefectClient"]:
24 """Determine if the client is a PrefectClient instance attached to the current loop"""
25 from prefect._internal.concurrency.event_loop import get_running_loop
27 # Only a PrefectClient will have a _loop attribute that is the current loop
28 return getattr(client, "_loop", None) == get_running_loop()
31def get_or_create_client( 1a
32 client: Optional["PrefectClient"] = None,
33) -> tuple["PrefectClient", bool]:
34 """
35 Returns provided client, infers a client from context if available, or creates a new client.
37 Args:
38 - client (PrefectClient, optional): an optional client to use
40 Returns:
41 - tuple: a tuple of the client and a boolean indicating if the client was inferred from context
42 """
43 if client is not None:
44 return client, True
46 from prefect.context import AsyncClientContext, FlowRunContext, TaskRunContext
48 async_client_context = AsyncClientContext.get()
49 flow_run_context = FlowRunContext.get()
50 task_run_context = TaskRunContext.get()
52 for context in (async_client_context, flow_run_context, task_run_context):
53 if context is None:
54 continue
55 if _current_async_client(context_client := context.client):
56 return context_client, True
58 from prefect.client.orchestration import get_client as get_httpx_client
60 return get_httpx_client(), False
63def client_injector( 1a
64 func: Callable[Concatenate["PrefectClient", P], Coroutine[Any, Any, R]],
65) -> Callable[P, Coroutine[Any, Any, R]]:
66 @wraps(func) 1a
67 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 1a
68 client, _ = get_or_create_client()
69 return await func(client, *args, **kwargs)
71 return wrapper 1a
74def inject_client( 1a
75 fn: Callable[P, Coroutine[Any, Any, R]],
76) -> Callable[P, Coroutine[Any, Any, R]]:
77 """
78 Simple helper to provide a context managed client to an asynchronous function.
80 The decorated function _must_ take a `client` kwarg and if a client is passed when
81 called it will be used instead of creating a new one, but it will not be context
82 managed as it is assumed that the caller is managing the context.
83 """
85 @wraps(fn) 1a
86 async def with_injected_client(*args: P.args, **kwargs: P.kwargs) -> R: 1a
87 given = kwargs.pop("client", None)
88 if TYPE_CHECKING:
89 assert given is None or isinstance(given, PrefectClient)
90 client, inferred = get_or_create_client(given)
91 if not inferred:
92 context = client
93 else:
94 from prefect.utilities.asyncutils import asyncnullcontext
96 context = asyncnullcontext(client)
97 async with context as new_client:
98 kwargs |= {"client": new_client}
99 return await fn(*args, **kwargs)
101 return with_injected_client 1a