Coverage for /usr/local/lib/python3.12/site-packages/prefect/client/orchestration/base.py: 44%
26 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3from typing import TYPE_CHECKING, Any, Literal 1a
5from typing_extensions import TypeAlias 1a
7if TYPE_CHECKING: 7 ↛ 8line 7 didn't jump to line 8 because the condition on line 7 was never true1a
8 from httpx import AsyncClient, Client, Response
10 from prefect.client.base import ServerType
11 from prefect.client.orchestration.routes import ServerRoutes
13HTTP_METHODS: TypeAlias = Literal["GET", "POST", "PUT", "DELETE", "PATCH"] 1a
16class BaseClient: 1a
17 server_type: "ServerType" 1a
19 def __init__(self, client: "Client"): 1a
20 self._client = client
22 def request( 1a
23 self,
24 method: HTTP_METHODS,
25 path: "ServerRoutes",
26 params: dict[str, Any] | None = None,
27 path_params: dict[str, Any] | None = None,
28 **kwargs: Any,
29 ) -> "Response":
30 if path_params:
31 path = path.format(**path_params) # type: ignore
32 request = self._client.build_request(method, path, params=params, **kwargs)
33 return self._client.send(request)
36class BaseAsyncClient: 1a
37 server_type: "ServerType" 1a
39 def __init__(self, client: "AsyncClient"): 1a
40 self._client = client
42 async def request( 1a
43 self,
44 method: HTTP_METHODS,
45 path: "ServerRoutes",
46 params: dict[str, Any] | None = None,
47 path_params: dict[str, Any] | None = None,
48 **kwargs: Any,
49 ) -> "Response":
50 if path_params:
51 path = path.format(**path_params) # type: ignore
52 request = self._client.build_request(method, path, params=params, **kwargs)
53 return await self._client.send(request)