Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/sync.py: 0%
26 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import asyncio
2from collections.abc import Generator
3from contextlib import contextmanager
4from typing import Optional, TypeVar, Union
5from uuid import UUID
7from prefect.types._datetime import now
9from ._asyncio import acquire_concurrency_slots, release_concurrency_slots
10from ._events import (
11 emit_concurrency_acquisition_events,
12 emit_concurrency_release_events,
13)
15T = TypeVar("T")
18@contextmanager
19def concurrency(
20 names: Union[str, list[str]],
21 task_run_id: UUID,
22 timeout_seconds: Optional[float] = None,
23) -> Generator[None, None, None]:
24 """
25 A context manager that acquires and releases concurrency slots from the
26 given concurrency limits.
28 Args:
29 names: The names of the concurrency limits to acquire.
30 task_run_id: The task run ID acquiring the limits.
31 timeout_seconds: The number of seconds to wait to acquire the limits before
32 raising a `TimeoutError`. A timeout of `None` will wait indefinitely.
34 Raises:
35 TimeoutError: If the limits are not acquired within the given timeout.
37 Example:
38 A simple example of using the sync `concurrency` context manager:
39 ```python
40 from prefect.concurrency.v1.sync import concurrency
42 def resource_heavy():
43 with concurrency("test"):
44 print("Resource heavy task")
46 def main():
47 resource_heavy()
48 ```
49 """
50 if not names:
51 yield
52 return
54 names = names if isinstance(names, list) else [names]
56 force = {"_sync": True}
57 result = acquire_concurrency_slots(
58 names, timeout_seconds=timeout_seconds, task_run_id=task_run_id, **force
59 )
60 assert not asyncio.iscoroutine(result)
61 limits = result
62 acquisition_time = now("UTC")
63 emitted_events = emit_concurrency_acquisition_events(limits, task_run_id)
65 try:
66 yield
67 finally:
68 occupancy_period = now("UTC") - acquisition_time
69 release_concurrency_slots(
70 names, task_run_id, occupancy_period.total_seconds(), **force
71 )
72 emit_concurrency_release_events(limits, emitted_events, task_run_id)