Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/sync.py: 0%
19 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from collections.abc import Generator
2from contextlib import contextmanager
3from typing import TYPE_CHECKING, Optional, TypeVar, Union
5from ._events import (
6 emit_concurrency_acquisition_events,
7)
8from ._sync import (
9 acquire_concurrency_slots as _acquire_concurrency_slots,
10)
11from ._sync import (
12 concurrency as _concurrency_internal,
13)
15if TYPE_CHECKING:
16 from prefect.client.schemas.objects import ConcurrencyLeaseHolder
18T = TypeVar("T")
21@contextmanager
22def concurrency(
23 names: Union[str, list[str]],
24 occupy: int = 1,
25 timeout_seconds: Optional[float] = None,
26 max_retries: Optional[int] = None,
27 lease_duration: float = 300,
28 strict: bool = False,
29 holder: "Optional[ConcurrencyLeaseHolder]" = None,
30) -> Generator[None, None, None]:
31 """A context manager that acquires and releases concurrency slots from the
32 given concurrency limits.
34 Args:
35 names: The names of the concurrency limits to acquire slots from.
36 occupy: The number of slots to acquire and hold from each limit.
37 timeout_seconds: The number of seconds to wait for the slots to be acquired before
38 raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
39 max_retries: The maximum number of retries to acquire the concurrency slots.
40 lease_duration: The duration of the lease for the acquired slots in seconds.
41 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist.
42 Defaults to `False`.
43 holder: A dictionary containing information about the holder of the concurrency slots.
44 Typically includes 'type' and 'id' keys.
46 Raises:
47 TimeoutError: If the slots are not acquired within the given timeout.
48 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`.
50 Example:
51 A simple example of using the sync `concurrency` context manager:
52 ```python
53 from prefect.concurrency.sync import concurrency
55 def resource_heavy():
56 with concurrency("test", occupy=1):
57 print("Resource heavy task")
59 def main():
60 resource_heavy()
61 ```
62 """
63 with _concurrency_internal(
64 names=names,
65 occupy=occupy,
66 timeout_seconds=timeout_seconds,
67 max_retries=max_retries,
68 lease_duration=lease_duration,
69 strict=strict,
70 holder=holder,
71 suppress_warnings=False,
72 ):
73 yield
76def rate_limit(
77 names: Union[str, list[str]],
78 occupy: int = 1,
79 timeout_seconds: Optional[float] = None,
80 strict: bool = False,
81) -> None:
82 """Block execution until an `occupy` number of slots of the concurrency
83 limits given in `names` are acquired. Requires that all given concurrency
84 limits have a slot decay.
86 Args:
87 names: The names of the concurrency limits to acquire slots from.
88 occupy: The number of slots to acquire and hold from each limit.
89 timeout_seconds: The number of seconds to wait for the slots to be acquired before
90 raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
91 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist.
92 Defaults to `False`.
94 Raises:
95 TimeoutError: If the slots are not acquired within the given timeout.
96 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`.
97 """
98 if not names:
99 return
101 names = names if isinstance(names, list) else [names]
103 limits = _acquire_concurrency_slots(
104 names,
105 occupy,
106 mode="rate_limit",
107 timeout_seconds=timeout_seconds,
108 strict=strict,
109 )
110 emit_concurrency_acquisition_events(limits, occupy)