Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/sync.py: 0%

26 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1import asyncio 

2from collections.abc import Generator 

3from contextlib import contextmanager 

4from typing import Optional, TypeVar, Union 

5from uuid import UUID 

6 

7from prefect.types._datetime import now 

8 

9from ._asyncio import acquire_concurrency_slots, release_concurrency_slots 

10from ._events import ( 

11 emit_concurrency_acquisition_events, 

12 emit_concurrency_release_events, 

13) 

14 

15T = TypeVar("T") 

16 

17 

18@contextmanager 

19def concurrency( 

20 names: Union[str, list[str]], 

21 task_run_id: UUID, 

22 timeout_seconds: Optional[float] = None, 

23) -> Generator[None, None, None]: 

24 """ 

25 A context manager that acquires and releases concurrency slots from the 

26 given concurrency limits. 

27 

28 Args: 

29 names: The names of the concurrency limits to acquire. 

30 task_run_id: The task run ID acquiring the limits. 

31 timeout_seconds: The number of seconds to wait to acquire the limits before 

32 raising a `TimeoutError`. A timeout of `None` will wait indefinitely. 

33 

34 Raises: 

35 TimeoutError: If the limits are not acquired within the given timeout. 

36 

37 Example: 

38 A simple example of using the sync `concurrency` context manager: 

39 ```python 

40 from prefect.concurrency.v1.sync import concurrency 

41 

42 def resource_heavy(): 

43 with concurrency("test"): 

44 print("Resource heavy task") 

45 

46 def main(): 

47 resource_heavy() 

48 ``` 

49 """ 

50 if not names: 

51 yield 

52 return 

53 

54 names = names if isinstance(names, list) else [names] 

55 

56 force = {"_sync": True} 

57 result = acquire_concurrency_slots( 

58 names, timeout_seconds=timeout_seconds, task_run_id=task_run_id, **force 

59 ) 

60 assert not asyncio.iscoroutine(result) 

61 limits = result 

62 acquisition_time = now("UTC") 

63 emitted_events = emit_concurrency_acquisition_events(limits, task_run_id) 

64 

65 try: 

66 yield 

67 finally: 

68 occupancy_period = now("UTC") - acquisition_time 

69 release_concurrency_slots( 

70 names, task_run_id, occupancy_period.total_seconds(), **force 

71 ) 

72 emit_concurrency_release_events(limits, emitted_events, task_run_id)