Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/_asyncio.py: 0%
29 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1import asyncio
2from typing import Optional
3from uuid import UUID
5import httpx
7from prefect.client.orchestration import get_client
8from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse
9from prefect.utilities.asyncutils import sync_compatible
11from .services import ConcurrencySlotAcquisitionService
14class ConcurrencySlotAcquisitionError(Exception):
15 """Raised when an unhandlable occurs while acquiring concurrency slots."""
18class AcquireConcurrencySlotTimeoutError(TimeoutError):
19 """Raised when acquiring a concurrency slot times out."""
22@sync_compatible
23async def acquire_concurrency_slots(
24 names: list[str],
25 task_run_id: UUID,
26 timeout_seconds: Optional[float] = None,
27) -> list[MinimalConcurrencyLimitResponse]:
28 service = ConcurrencySlotAcquisitionService.instance(frozenset(names))
29 future = service.send((task_run_id, timeout_seconds))
30 try:
31 response = await asyncio.wrap_future(future)
32 except TimeoutError as timeout:
33 raise AcquireConcurrencySlotTimeoutError(
34 f"Attempt to acquire concurrency limits timed out after {timeout_seconds} second(s)"
35 ) from timeout
36 except Exception as exc:
37 raise ConcurrencySlotAcquisitionError(
38 f"Unable to acquire concurrency limits {names!r}"
39 ) from exc
40 else:
41 return _response_to_concurrency_limit_response(response)
44@sync_compatible
45async def release_concurrency_slots(
46 names: list[str], task_run_id: UUID, occupancy_seconds: float
47) -> list[MinimalConcurrencyLimitResponse]:
48 async with get_client() as client:
49 response = await client.decrement_v1_concurrency_slots(
50 names=names,
51 task_run_id=task_run_id,
52 occupancy_seconds=occupancy_seconds,
53 )
54 return _response_to_concurrency_limit_response(response)
57def _response_to_concurrency_limit_response(
58 response: httpx.Response,
59) -> list[MinimalConcurrencyLimitResponse]:
60 data: list[MinimalConcurrencyLimitResponse] = response.json() or []
61 return [
62 MinimalConcurrencyLimitResponse.model_validate(limit) for limit in data if data
63 ]