Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/utilities.py: 30%

46 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1""" 

2Utilities for working with clients. 

3""" 

4 

5# This module must not import from `prefect.client` when it is imported to avoid 

6# circular imports for decorators such as `inject_client` which are widely used. 

7 

8from collections.abc import Coroutine 1a

9from functools import wraps 1a

10from typing import TYPE_CHECKING, Any, Callable, Optional, Union 1a

11 

12from typing_extensions import Concatenate, ParamSpec, TypeGuard, TypeVar 1a

13 

14if TYPE_CHECKING: 14 ↛ 15line 14 didn't jump to line 15 because the condition on line 14 was never true1a

15 from prefect.client.orchestration import PrefectClient, SyncPrefectClient 

16 

17P = ParamSpec("P") 1a

18R = TypeVar("R", infer_variance=True) 1a

19 

20 

21def _current_async_client( 1a

22 client: Union["PrefectClient", "SyncPrefectClient"], 

23) -> TypeGuard["PrefectClient"]: 

24 """Determine if the client is a PrefectClient instance attached to the current loop""" 

25 from prefect._internal.concurrency.event_loop import get_running_loop 

26 

27 # Only a PrefectClient will have a _loop attribute that is the current loop 

28 return getattr(client, "_loop", None) == get_running_loop() 

29 

30 

31def get_or_create_client( 1a

32 client: Optional["PrefectClient"] = None, 

33) -> tuple["PrefectClient", bool]: 

34 """ 

35 Returns provided client, infers a client from context if available, or creates a new client. 

36 

37 Args: 

38 - client (PrefectClient, optional): an optional client to use 

39 

40 Returns: 

41 - tuple: a tuple of the client and a boolean indicating if the client was inferred from context 

42 """ 

43 if client is not None: 

44 return client, True 

45 

46 from prefect.context import AsyncClientContext, FlowRunContext, TaskRunContext 

47 

48 async_client_context = AsyncClientContext.get() 

49 flow_run_context = FlowRunContext.get() 

50 task_run_context = TaskRunContext.get() 

51 

52 for context in (async_client_context, flow_run_context, task_run_context): 

53 if context is None: 

54 continue 

55 if _current_async_client(context_client := context.client): 

56 return context_client, True 

57 

58 from prefect.client.orchestration import get_client as get_httpx_client 

59 

60 return get_httpx_client(), False 

61 

62 

63def client_injector( 1a

64 func: Callable[Concatenate["PrefectClient", P], Coroutine[Any, Any, R]], 

65) -> Callable[P, Coroutine[Any, Any, R]]: 

66 @wraps(func) 1a

67 async def wrapper(*args: P.args, **kwargs: P.kwargs) -> R: 1a

68 client, _ = get_or_create_client() 

69 return await func(client, *args, **kwargs) 

70 

71 return wrapper 1a

72 

73 

74def inject_client( 1a

75 fn: Callable[P, Coroutine[Any, Any, R]], 

76) -> Callable[P, Coroutine[Any, Any, R]]: 

77 """ 

78 Simple helper to provide a context managed client to an asynchronous function. 

79 

80 The decorated function _must_ take a `client` kwarg and if a client is passed when 

81 called it will be used instead of creating a new one, but it will not be context 

82 managed as it is assumed that the caller is managing the context. 

83 """ 

84 

85 @wraps(fn) 1a

86 async def with_injected_client(*args: P.args, **kwargs: P.kwargs) -> R: 1a

87 given = kwargs.pop("client", None) 

88 if TYPE_CHECKING: 

89 assert given is None or isinstance(given, PrefectClient) 

90 client, inferred = get_or_create_client(given) 

91 if not inferred: 

92 context = client 

93 else: 

94 from prefect.utilities.asyncutils import asyncnullcontext 

95 

96 context = asyncnullcontext(client) 

97 async with context as new_client: 

98 kwargs |= {"client": new_client} 

99 return await fn(*args, **kwargs) 

100 

101 return with_injected_client 1a