Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/services.py: 31%

66 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import asyncio 1a

2from collections.abc import AsyncGenerator 1a

3from contextlib import asynccontextmanager 1a

4from typing import TYPE_CHECKING, Literal, Optional 1a

5 

6import httpx 1a

7from starlette import status 1a

8from typing_extensions import TypeAlias, Unpack 1a

9 

10from prefect._internal.concurrency import logger 1a

11from prefect._internal.concurrency.services import FutureQueueService 1a

12from prefect.client.orchestration import get_client 1a

13from prefect.utilities.timeout import timeout_async 1a

14 

15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a

16 from prefect.client.orchestration import PrefectClient 

17 from prefect.client.schemas.objects import ConcurrencyLeaseHolder 

18 

19_Item: TypeAlias = tuple[ 1a

20 int, Literal["concurrency", "rate_limit"], Optional[float], Optional[int] 

21] 

22 

23_ItemWithLease: TypeAlias = tuple[ 1a

24 int, 

25 Literal["concurrency", "rate_limit"], 

26 Optional[float], 

27 Optional[int], 

28 float, 

29 bool, 

30 Optional["ConcurrencyLeaseHolder"], 

31] 

32 

33 

34class ConcurrencySlotAcquisitionService( 1a

35 FutureQueueService[Unpack[_Item], httpx.Response] 

36): 

37 def __init__(self, concurrency_limit_names: frozenset[str]): 1a

38 super().__init__(concurrency_limit_names) 

39 self._client: PrefectClient 

40 self.concurrency_limit_names: list[str] = sorted(list(concurrency_limit_names)) 

41 

42 @asynccontextmanager 1a

43 async def _lifespan(self) -> AsyncGenerator[None, None]: 1a

44 async with get_client() as client: 

45 self._client = client 

46 yield 

47 

48 async def acquire( 1a

49 self, 

50 slots: int, 

51 mode: Literal["concurrency", "rate_limit"], 

52 timeout_seconds: Optional[float] = None, 

53 max_retries: Optional[int] = None, 

54 ) -> httpx.Response: 

55 with timeout_async(seconds=timeout_seconds): 

56 while True: 

57 try: 

58 return await self._client.increment_concurrency_slots( 

59 names=self.concurrency_limit_names, 

60 slots=slots, 

61 mode=mode, 

62 ) 

63 except httpx.HTTPStatusError as exc: 

64 if not exc.response.status_code == status.HTTP_423_LOCKED: 

65 raise 

66 

67 if max_retries is not None and max_retries <= 0: 

68 raise exc 

69 retry_after = float(exc.response.headers["Retry-After"]) 

70 logger.debug( 

71 f"Unable to acquire concurrency slot. Retrying in {retry_after} second(s)." 

72 ) 

73 await asyncio.sleep(retry_after) 

74 if max_retries is not None: 

75 max_retries -= 1 

76 

77 

78class ConcurrencySlotAcquisitionWithLeaseService( 1a

79 FutureQueueService[Unpack[_ItemWithLease], httpx.Response] 

80): 

81 """A service that acquires concurrency slots with leases. 

82 

83 This service serializes acquisition attempts for a given set of limit names, 

84 preventing thundering herd issues when many tasks try to acquire slots simultaneously. 

85 Each unique set of limit names gets its own singleton service instance. 

86 

87 Args: 

88 concurrency_limit_names: A frozenset of concurrency limit names to acquire slots from. 

89 """ 

90 

91 def __init__(self, concurrency_limit_names: frozenset[str]): 1a

92 super().__init__(concurrency_limit_names) 

93 self._client: PrefectClient 

94 self.concurrency_limit_names: list[str] = sorted(list(concurrency_limit_names)) 

95 

96 @asynccontextmanager 1a

97 async def _lifespan(self) -> AsyncGenerator[None, None]: 1a

98 async with get_client() as client: 

99 self._client = client 

100 yield 

101 

102 async def acquire( 1a

103 self, 

104 slots: int, 

105 mode: Literal["concurrency", "rate_limit"], 

106 timeout_seconds: Optional[float] = None, 

107 max_retries: Optional[int] = None, 

108 lease_duration: float = 300, 

109 strict: bool = False, 

110 holder: Optional["ConcurrencyLeaseHolder"] = None, 

111 ) -> httpx.Response: 

112 """Acquire concurrency slots with a lease, with retry logic for 423 responses. 

113 

114 Args: 

115 slots: Number of slots to acquire 

116 mode: Either "concurrency" or "rate_limit" 

117 timeout_seconds: Optional timeout for the entire acquisition attempt 

118 max_retries: Maximum number of retries on 423 LOCKED responses 

119 lease_duration: Duration of the lease in seconds 

120 strict: Whether to raise errors for missing limits 

121 holder: Optional holder information for the lease 

122 

123 Returns: 

124 HTTP response from the server 

125 

126 Raises: 

127 httpx.HTTPStatusError: If the server returns an error other than 423 LOCKED 

128 TimeoutError: If acquisition times out 

129 """ 

130 with timeout_async(seconds=timeout_seconds): 

131 while True: 

132 try: 

133 return await self._client.increment_concurrency_slots_with_lease( 

134 names=self.concurrency_limit_names, 

135 slots=slots, 

136 mode=mode, 

137 lease_duration=lease_duration, 

138 holder=holder, 

139 ) 

140 except httpx.HTTPStatusError as exc: 

141 if exc.response.status_code != status.HTTP_423_LOCKED: 

142 raise 

143 

144 if max_retries is not None and max_retries <= 0: 

145 raise exc 

146 

147 retry_after = float(exc.response.headers["Retry-After"]) 

148 logger.debug( 

149 f"Unable to acquire concurrency slot with lease for {self.concurrency_limit_names}. Retrying in {retry_after} second(s)." 

150 ) 

151 await asyncio.sleep(retry_after) 

152 if max_retries is not None: 

153 max_retries -= 1