Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/asyncio.py: 0%
20 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from collections.abc import AsyncGenerator
2from contextlib import asynccontextmanager
3from typing import TYPE_CHECKING, Optional, Union
5from ._asyncio import (
6 AcquireConcurrencySlotTimeoutError as AcquireConcurrencySlotTimeoutError,
7)
8from ._asyncio import ConcurrencySlotAcquisitionError as ConcurrencySlotAcquisitionError
9from ._asyncio import (
10 aacquire_concurrency_slots,
11)
12from ._asyncio import (
13 concurrency as _concurrency_internal,
14)
15from ._events import (
16 emit_concurrency_acquisition_events,
17)
19if TYPE_CHECKING:
20 from prefect.client.schemas.objects import ConcurrencyLeaseHolder
23@asynccontextmanager
24async def concurrency(
25 names: Union[str, list[str]],
26 occupy: int = 1,
27 timeout_seconds: Optional[float] = None,
28 max_retries: Optional[int] = None,
29 lease_duration: float = 300,
30 strict: bool = False,
31 holder: "Optional[ConcurrencyLeaseHolder]" = None,
32) -> AsyncGenerator[None, None]:
33 """A
34 context manager that acquires and releases concurrency slots from the
35 given concurrency limits.
37 Args:
38 names: The names of the concurrency limits to acquire slots from.
39 occupy: The number of slots to acquire and hold from each limit.
40 timeout_seconds: The number of seconds to wait for the slots to be acquired before
41 raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
42 max_retries: The maximum number of retries to acquire the concurrency slots.
43 lease_duration: The duration of the lease for the acquired slots in seconds.
44 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist.
45 Defaults to `False`.
46 holder: A dictionary containing information about the holder of the concurrency slots.
47 Typically includes 'type' and 'id' keys.
49 Raises:
50 TimeoutError: If the slots are not acquired within the given timeout.
51 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`.
53 Example:
54 A simple example of using the async `concurrency` context manager:
55 ```python
56 from prefect.concurrency.asyncio import concurrency
58 async def resource_heavy():
59 async with concurrency("test", occupy=1):
60 print("Resource heavy task")
62 async def main():
63 await resource_heavy()
64 ```
65 """
66 async with _concurrency_internal(
67 names=names,
68 occupy=occupy,
69 timeout_seconds=timeout_seconds,
70 max_retries=max_retries,
71 lease_duration=lease_duration,
72 strict=strict,
73 holder=holder,
74 suppress_warnings=False,
75 ):
76 yield
79async def rate_limit(
80 names: Union[str, list[str]],
81 occupy: int = 1,
82 timeout_seconds: Optional[float] = None,
83 strict: bool = False,
84) -> None:
85 """
86 Block execution until an `occupy` number of slots of the concurrency
87 limits given in `names` are acquired.
89 Requires that all given concurrency limits have a slot decay.
91 Args:
92 names: The names of the concurrency limits to acquire slots from.
93 occupy: The number of slots to acquire and hold from each limit.
94 timeout_seconds: The number of seconds to wait for the slots to be acquired before
95 raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
96 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist.
97 Defaults to `False`.
99 Raises:
100 TimeoutError: If the slots are not acquired within the given timeout.
101 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`.
102 """
103 if not names:
104 return
106 names = names if isinstance(names, list) else [names]
108 limits = await aacquire_concurrency_slots(
109 names=names,
110 slots=occupy,
111 mode="rate_limit",
112 timeout_seconds=timeout_seconds,
113 strict=strict,
114 )
115 emit_concurrency_acquisition_events(limits, occupy)