Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/services.py: 0%

44 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import asyncio 

2from collections.abc import AsyncGenerator 

3from contextlib import asynccontextmanager 

4from json import JSONDecodeError 

5from typing import TYPE_CHECKING, Optional 

6from uuid import UUID 

7 

8import httpx 

9from starlette import status 

10from typing_extensions import Unpack 

11 

12from prefect._internal.concurrency import logger 

13from prefect._internal.concurrency.services import FutureQueueService 

14from prefect.client.orchestration import get_client 

15from prefect.utilities.timeout import timeout_async 

16 

17if TYPE_CHECKING: 

18 from prefect.client.orchestration import PrefectClient 

19 

20 

21class ConcurrencySlotAcquisitionServiceError(Exception): 

22 """Raised when an error occurs while acquiring concurrency slots.""" 

23 

24 

25class ConcurrencySlotAcquisitionService( 

26 FutureQueueService[Unpack[tuple[UUID, Optional[float]]], httpx.Response] 

27): 

28 def __init__(self, concurrency_limit_names: frozenset[str]) -> None: 

29 super().__init__(concurrency_limit_names) 

30 self._client: PrefectClient 

31 self.concurrency_limit_names: list[str] = sorted(list(concurrency_limit_names)) 

32 

33 @asynccontextmanager 

34 async def _lifespan(self) -> AsyncGenerator[None, None]: 

35 async with get_client() as client: 

36 self._client = client 

37 yield 

38 

39 async def acquire( 

40 self, task_run_id: UUID, timeout_seconds: Optional[float] = None 

41 ) -> httpx.Response: 

42 with timeout_async(seconds=timeout_seconds): 

43 while True: 

44 try: 

45 return await self._client.increment_v1_concurrency_slots( 

46 task_run_id=task_run_id, 

47 names=self.concurrency_limit_names, 

48 ) 

49 except httpx.HTTPStatusError as exc: 

50 if not exc.response.status_code == status.HTTP_423_LOCKED: 

51 raise 

52 

53 retry_after = exc.response.headers.get("Retry-After") 

54 if retry_after: 

55 retry_after = float(retry_after) 

56 await asyncio.sleep(retry_after) 

57 else: 

58 # We received a 423 but no Retry-After header. This 

59 # should indicate that the server told us to abort 

60 # because the concurrency limit is set to 0, i.e. 

61 # effectively disabled. 

62 try: 

63 reason = exc.response.json()["detail"] 

64 except (JSONDecodeError, KeyError): 

65 logger.error( 

66 "Failed to parse response from concurrency limit 423 Locked response: %s", 

67 exc.response.content, 

68 ) 

69 reason = "Concurrency limit is locked (server did not specify the reason)" 

70 raise ConcurrencySlotAcquisitionServiceError(reason) from exc