Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/asyncio.py: 0%
35 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from collections.abc import AsyncGenerator
2from contextlib import asynccontextmanager
3from typing import TYPE_CHECKING, Optional, Union
4from uuid import UUID
6import anyio
8from prefect.concurrency.v1._asyncio import (
9 acquire_concurrency_slots,
10 release_concurrency_slots,
11)
12from prefect.concurrency.v1._events import (
13 emit_concurrency_acquisition_events,
14 emit_concurrency_release_events,
15)
16from prefect.concurrency.v1.context import ConcurrencyContext
17from prefect.types._datetime import now
19from ._asyncio import (
20 AcquireConcurrencySlotTimeoutError as AcquireConcurrencySlotTimeoutError,
21)
22from ._asyncio import ConcurrencySlotAcquisitionError as ConcurrencySlotAcquisitionError
25@asynccontextmanager
26async def concurrency(
27 names: Union[str, list[str]],
28 task_run_id: UUID,
29 timeout_seconds: Optional[float] = None,
30) -> AsyncGenerator[None, None]:
31 """A context manager that acquires and releases concurrency slots from the
32 given concurrency limits.
34 Args:
35 names: The names of the concurrency limits to acquire slots from.
36 task_run_id: The name of the task_run_id that is incrementing the slots.
37 timeout_seconds: The number of seconds to wait for the slots to be acquired before
38 raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
40 Raises:
41 TimeoutError: If the slots are not acquired within the given timeout.
43 Example:
44 A simple example of using the async `concurrency` context manager:
45 ```python
46 from prefect.concurrency.v1.asyncio import concurrency
48 async def resource_heavy():
49 async with concurrency("test", task_run_id):
50 print("Resource heavy task")
52 async def main():
53 await resource_heavy()
54 ```
55 """
56 if not names:
57 yield
58 return
60 names_normalized: list[str] = names if isinstance(names, list) else [names]
62 acquire_slots = acquire_concurrency_slots(
63 names_normalized,
64 task_run_id=task_run_id,
65 timeout_seconds=timeout_seconds,
66 )
67 if TYPE_CHECKING:
68 assert not isinstance(acquire_slots, list)
69 limits = await acquire_slots
70 acquisition_time = now("UTC")
71 emitted_events = emit_concurrency_acquisition_events(limits, task_run_id)
73 try:
74 yield
75 finally:
76 occupancy_period = now("UTC") - acquisition_time
77 try:
78 release_slots = release_concurrency_slots(
79 names_normalized, task_run_id, occupancy_period.total_seconds()
80 )
81 if TYPE_CHECKING:
82 assert not isinstance(release_slots, list)
83 await release_slots
84 except anyio.get_cancelled_exc_class():
85 # The task was cancelled before it could release the slots. Add the
86 # slots to the cleanup list so they can be released when the
87 # concurrency context is exited.
88 if ctx := ConcurrencyContext.get():
89 ctx.cleanup_slots.append(
90 (names_normalized, occupancy_period.total_seconds(), task_run_id)
91 )
93 emit_concurrency_release_events(limits, emitted_events, task_run_id)