Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/_asyncio.py: 0%

29 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1import asyncio 

2from typing import Optional 

3from uuid import UUID 

4 

5import httpx 

6 

7from prefect.client.orchestration import get_client 

8from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse 

9from prefect.utilities.asyncutils import sync_compatible 

10 

11from .services import ConcurrencySlotAcquisitionService 

12 

13 

14class ConcurrencySlotAcquisitionError(Exception): 

15 """Raised when an unhandlable occurs while acquiring concurrency slots.""" 

16 

17 

18class AcquireConcurrencySlotTimeoutError(TimeoutError): 

19 """Raised when acquiring a concurrency slot times out.""" 

20 

21 

22@sync_compatible 

23async def acquire_concurrency_slots( 

24 names: list[str], 

25 task_run_id: UUID, 

26 timeout_seconds: Optional[float] = None, 

27) -> list[MinimalConcurrencyLimitResponse]: 

28 service = ConcurrencySlotAcquisitionService.instance(frozenset(names)) 

29 future = service.send((task_run_id, timeout_seconds)) 

30 try: 

31 response = await asyncio.wrap_future(future) 

32 except TimeoutError as timeout: 

33 raise AcquireConcurrencySlotTimeoutError( 

34 f"Attempt to acquire concurrency limits timed out after {timeout_seconds} second(s)" 

35 ) from timeout 

36 except Exception as exc: 

37 raise ConcurrencySlotAcquisitionError( 

38 f"Unable to acquire concurrency limits {names!r}" 

39 ) from exc 

40 else: 

41 return _response_to_concurrency_limit_response(response) 

42 

43 

44@sync_compatible 

45async def release_concurrency_slots( 

46 names: list[str], task_run_id: UUID, occupancy_seconds: float 

47) -> list[MinimalConcurrencyLimitResponse]: 

48 async with get_client() as client: 

49 response = await client.decrement_v1_concurrency_slots( 

50 names=names, 

51 task_run_id=task_run_id, 

52 occupancy_seconds=occupancy_seconds, 

53 ) 

54 return _response_to_concurrency_limit_response(response) 

55 

56 

57def _response_to_concurrency_limit_response( 

58 response: httpx.Response, 

59) -> list[MinimalConcurrencyLimitResponse]: 

60 data: list[MinimalConcurrencyLimitResponse] = response.json() or [] 

61 return [ 

62 MinimalConcurrencyLimitResponse.model_validate(limit) for limit in data if data 

63 ]