Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/_sync.py: 45%
31 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3from contextlib import contextmanager 1a
4from typing import Generator, Literal, Optional 1a
5from uuid import UUID 1a
7from prefect.client.schemas.objects import ConcurrencyLeaseHolder 1a
8from prefect.client.schemas.responses import ( 1a
9 ConcurrencyLimitWithLeaseResponse,
10 MinimalConcurrencyLimitResponse,
11)
12from prefect.concurrency._asyncio import ( 1a
13 aacquire_concurrency_slots,
14 aacquire_concurrency_slots_with_lease,
15 arelease_concurrency_slots_with_lease,
16)
17from prefect.concurrency._events import ( 1a
18 emit_concurrency_acquisition_events,
19 emit_concurrency_release_events,
20)
21from prefect.concurrency._leases import maintain_concurrency_lease 1a
22from prefect.utilities.asyncutils import run_coro_as_sync 1a
25def release_concurrency_slots_with_lease(lease_id: UUID) -> None: 1a
26 run_coro_as_sync(arelease_concurrency_slots_with_lease(lease_id))
29def acquire_concurrency_slots( 1a
30 names: list[str],
31 slots: int,
32 mode: Literal["concurrency", "rate_limit"] = "concurrency",
33 timeout_seconds: Optional[float] = None,
34 max_retries: Optional[int] = None,
35 strict: bool = False,
36) -> list[MinimalConcurrencyLimitResponse]:
37 result = run_coro_as_sync(
38 aacquire_concurrency_slots(
39 names, slots, mode, timeout_seconds, max_retries, strict
40 )
41 )
42 return result
45def acquire_concurrency_slots_with_lease( 1a
46 names: list[str],
47 slots: int,
48 mode: Literal["concurrency", "rate_limit"] = "concurrency",
49 timeout_seconds: Optional[float] = None,
50 max_retries: Optional[int] = None,
51 lease_duration: float = 300,
52 strict: bool = False,
53 holder: "Optional[ConcurrencyLeaseHolder]" = None,
54 suppress_warnings: bool = False,
55) -> ConcurrencyLimitWithLeaseResponse:
56 result = run_coro_as_sync(
57 aacquire_concurrency_slots_with_lease(
58 names,
59 slots,
60 mode,
61 timeout_seconds,
62 max_retries,
63 lease_duration,
64 strict,
65 holder,
66 suppress_warnings,
67 )
68 )
69 return result
72@contextmanager 1a
73def concurrency( 1a
74 names: str | list[str],
75 occupy: int = 1,
76 timeout_seconds: Optional[float] = None,
77 max_retries: Optional[int] = None,
78 lease_duration: float = 300,
79 strict: bool = False,
80 holder: "Optional[ConcurrencyLeaseHolder]" = None,
81 suppress_warnings: bool = False,
82) -> Generator[None, None, None]:
83 """A context manager that acquires and releases concurrency slots from the
84 given concurrency limits.
86 Args:
87 names: The names of the concurrency limits to acquire slots from.
88 occupy: The number of slots to acquire and hold from each limit.
89 timeout_seconds: The number of seconds to wait for the slots to be acquired before
90 raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
91 max_retries: The maximum number of retries to acquire the concurrency slots.
92 lease_duration: The duration of the lease for the acquired slots in seconds.
93 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist.
94 Defaults to `False`.
95 holder: A dictionary containing information about the holder of the concurrency slots.
96 Typically includes 'type' and 'id' keys.
98 Raises:
99 TimeoutError: If the slots are not acquired within the given timeout.
100 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`.
102 Example:
103 A simple example of using the sync `concurrency` context manager:
104 ```python
105 from prefect.concurrency.sync import concurrency
107 def resource_heavy():
108 with concurrency("test", occupy=1):
109 print("Resource heavy task")
111 def main():
112 resource_heavy()
113 ```
114 """
115 if not names:
116 yield
117 return
119 names = names if isinstance(names, list) else [names]
121 acquisition_response = acquire_concurrency_slots_with_lease(
122 names,
123 occupy,
124 timeout_seconds=timeout_seconds,
125 strict=strict,
126 lease_duration=lease_duration,
127 max_retries=max_retries,
128 holder=holder,
129 suppress_warnings=suppress_warnings,
130 )
131 emitted_events = emit_concurrency_acquisition_events(
132 acquisition_response.limits, occupy
133 )
135 try:
136 with maintain_concurrency_lease(
137 acquisition_response.lease_id,
138 lease_duration,
139 raise_on_lease_renewal_failure=strict,
140 suppress_warnings=suppress_warnings,
141 ):
142 yield
143 finally:
144 release_concurrency_slots_with_lease(acquisition_response.lease_id)
145 emit_concurrency_release_events(
146 acquisition_response.limits, occupy, emitted_events
147 )