Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/services.py: 31%
66 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1import asyncio 1a
2from collections.abc import AsyncGenerator 1a
3from contextlib import asynccontextmanager 1a
4from typing import TYPE_CHECKING, Literal, Optional 1a
6import httpx 1a
7from starlette import status 1a
8from typing_extensions import TypeAlias, Unpack 1a
10from prefect._internal.concurrency import logger 1a
11from prefect._internal.concurrency.services import FutureQueueService 1a
12from prefect.client.orchestration import get_client 1a
13from prefect.utilities.timeout import timeout_async 1a
15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a
16 from prefect.client.orchestration import PrefectClient
17 from prefect.client.schemas.objects import ConcurrencyLeaseHolder
19_Item: TypeAlias = tuple[ 1a
20 int, Literal["concurrency", "rate_limit"], Optional[float], Optional[int]
21]
23_ItemWithLease: TypeAlias = tuple[ 1a
24 int,
25 Literal["concurrency", "rate_limit"],
26 Optional[float],
27 Optional[int],
28 float,
29 bool,
30 Optional["ConcurrencyLeaseHolder"],
31]
34class ConcurrencySlotAcquisitionService( 1a
35 FutureQueueService[Unpack[_Item], httpx.Response]
36):
37 def __init__(self, concurrency_limit_names: frozenset[str]): 1a
38 super().__init__(concurrency_limit_names)
39 self._client: PrefectClient
40 self.concurrency_limit_names: list[str] = sorted(list(concurrency_limit_names))
42 @asynccontextmanager 1a
43 async def _lifespan(self) -> AsyncGenerator[None, None]: 1a
44 async with get_client() as client:
45 self._client = client
46 yield
48 async def acquire( 1a
49 self,
50 slots: int,
51 mode: Literal["concurrency", "rate_limit"],
52 timeout_seconds: Optional[float] = None,
53 max_retries: Optional[int] = None,
54 ) -> httpx.Response:
55 with timeout_async(seconds=timeout_seconds):
56 while True:
57 try:
58 return await self._client.increment_concurrency_slots(
59 names=self.concurrency_limit_names,
60 slots=slots,
61 mode=mode,
62 )
63 except httpx.HTTPStatusError as exc:
64 if not exc.response.status_code == status.HTTP_423_LOCKED:
65 raise
67 if max_retries is not None and max_retries <= 0:
68 raise exc
69 retry_after = float(exc.response.headers["Retry-After"])
70 logger.debug(
71 f"Unable to acquire concurrency slot. Retrying in {retry_after} second(s)."
72 )
73 await asyncio.sleep(retry_after)
74 if max_retries is not None:
75 max_retries -= 1
78class ConcurrencySlotAcquisitionWithLeaseService( 1a
79 FutureQueueService[Unpack[_ItemWithLease], httpx.Response]
80):
81 """A service that acquires concurrency slots with leases.
83 This service serializes acquisition attempts for a given set of limit names,
84 preventing thundering herd issues when many tasks try to acquire slots simultaneously.
85 Each unique set of limit names gets its own singleton service instance.
87 Args:
88 concurrency_limit_names: A frozenset of concurrency limit names to acquire slots from.
89 """
91 def __init__(self, concurrency_limit_names: frozenset[str]): 1a
92 super().__init__(concurrency_limit_names)
93 self._client: PrefectClient
94 self.concurrency_limit_names: list[str] = sorted(list(concurrency_limit_names))
96 @asynccontextmanager 1a
97 async def _lifespan(self) -> AsyncGenerator[None, None]: 1a
98 async with get_client() as client:
99 self._client = client
100 yield
102 async def acquire( 1a
103 self,
104 slots: int,
105 mode: Literal["concurrency", "rate_limit"],
106 timeout_seconds: Optional[float] = None,
107 max_retries: Optional[int] = None,
108 lease_duration: float = 300,
109 strict: bool = False,
110 holder: Optional["ConcurrencyLeaseHolder"] = None,
111 ) -> httpx.Response:
112 """Acquire concurrency slots with a lease, with retry logic for 423 responses.
114 Args:
115 slots: Number of slots to acquire
116 mode: Either "concurrency" or "rate_limit"
117 timeout_seconds: Optional timeout for the entire acquisition attempt
118 max_retries: Maximum number of retries on 423 LOCKED responses
119 lease_duration: Duration of the lease in seconds
120 strict: Whether to raise errors for missing limits
121 holder: Optional holder information for the lease
123 Returns:
124 HTTP response from the server
126 Raises:
127 httpx.HTTPStatusError: If the server returns an error other than 423 LOCKED
128 TimeoutError: If acquisition times out
129 """
130 with timeout_async(seconds=timeout_seconds):
131 while True:
132 try:
133 return await self._client.increment_concurrency_slots_with_lease(
134 names=self.concurrency_limit_names,
135 slots=slots,
136 mode=mode,
137 lease_duration=lease_duration,
138 holder=holder,
139 )
140 except httpx.HTTPStatusError as exc:
141 if exc.response.status_code != status.HTTP_423_LOCKED:
142 raise
144 if max_retries is not None and max_retries <= 0:
145 raise exc
147 retry_after = float(exc.response.headers["Retry-After"])
148 logger.debug(
149 f"Unable to acquire concurrency slot with lease for {self.concurrency_limit_names}. Retrying in {retry_after} second(s)."
150 )
151 await asyncio.sleep(retry_after)
152 if max_retries is not None:
153 max_retries -= 1