Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/base.py: 44%

26 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3from typing import TYPE_CHECKING, Any, Literal 1a

4 

5from typing_extensions import TypeAlias 1a

6 

7if TYPE_CHECKING: 7 ↛ 8line 7 didn't jump to line 8 because the condition on line 7 was never true1a

8 from httpx import AsyncClient, Client, Response 

9 

10 from prefect.client.base import ServerType 

11 from prefect.client.orchestration.routes import ServerRoutes 

12 

13HTTP_METHODS: TypeAlias = Literal["GET", "POST", "PUT", "DELETE", "PATCH"] 1a

14 

15 

16class BaseClient: 1a

17 server_type: "ServerType" 1a

18 

19 def __init__(self, client: "Client"): 1a

20 self._client = client 

21 

22 def request( 1a

23 self, 

24 method: HTTP_METHODS, 

25 path: "ServerRoutes", 

26 params: dict[str, Any] | None = None, 

27 path_params: dict[str, Any] | None = None, 

28 **kwargs: Any, 

29 ) -> "Response": 

30 if path_params: 

31 path = path.format(**path_params) # type: ignore 

32 request = self._client.build_request(method, path, params=params, **kwargs) 

33 return self._client.send(request) 

34 

35 

36class BaseAsyncClient: 1a

37 server_type: "ServerType" 1a

38 

39 def __init__(self, client: "AsyncClient"): 1a

40 self._client = client 

41 

42 async def request( 1a

43 self, 

44 method: HTTP_METHODS, 

45 path: "ServerRoutes", 

46 params: dict[str, Any] | None = None, 

47 path_params: dict[str, Any] | None = None, 

48 **kwargs: Any, 

49 ) -> "Response": 

50 if path_params: 

51 path = path.format(**path_params) # type: ignore 

52 request = self._client.build_request(method, path, params=params, **kwargs) 

53 return await self._client.send(request)