Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/services.py: 0%
44 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import asyncio
2from collections.abc import AsyncGenerator
3from contextlib import asynccontextmanager
4from json import JSONDecodeError
5from typing import TYPE_CHECKING, Optional
6from uuid import UUID
8import httpx
9from starlette import status
10from typing_extensions import Unpack
12from prefect._internal.concurrency import logger
13from prefect._internal.concurrency.services import FutureQueueService
14from prefect.client.orchestration import get_client
15from prefect.utilities.timeout import timeout_async
17if TYPE_CHECKING:
18 from prefect.client.orchestration import PrefectClient
21class ConcurrencySlotAcquisitionServiceError(Exception):
22 """Raised when an error occurs while acquiring concurrency slots."""
25class ConcurrencySlotAcquisitionService(
26 FutureQueueService[Unpack[tuple[UUID, Optional[float]]], httpx.Response]
27):
28 def __init__(self, concurrency_limit_names: frozenset[str]) -> None:
29 super().__init__(concurrency_limit_names)
30 self._client: PrefectClient
31 self.concurrency_limit_names: list[str] = sorted(list(concurrency_limit_names))
33 @asynccontextmanager
34 async def _lifespan(self) -> AsyncGenerator[None, None]:
35 async with get_client() as client:
36 self._client = client
37 yield
39 async def acquire(
40 self, task_run_id: UUID, timeout_seconds: Optional[float] = None
41 ) -> httpx.Response:
42 with timeout_async(seconds=timeout_seconds):
43 while True:
44 try:
45 return await self._client.increment_v1_concurrency_slots(
46 task_run_id=task_run_id,
47 names=self.concurrency_limit_names,
48 )
49 except httpx.HTTPStatusError as exc:
50 if not exc.response.status_code == status.HTTP_423_LOCKED:
51 raise
53 retry_after = exc.response.headers.get("Retry-After")
54 if retry_after:
55 retry_after = float(retry_after)
56 await asyncio.sleep(retry_after)
57 else:
58 # We received a 423 but no Retry-After header. This
59 # should indicate that the server told us to abort
60 # because the concurrency limit is set to 0, i.e.
61 # effectively disabled.
62 try:
63 reason = exc.response.json()["detail"]
64 except (JSONDecodeError, KeyError):
65 logger.error(
66 "Failed to parse response from concurrency limit 423 Locked response: %s",
67 exc.response.content,
68 )
69 reason = "Concurrency limit is locked (server did not specify the reason)"
70 raise ConcurrencySlotAcquisitionServiceError(reason) from exc