Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/asyncio.py: 0%

35 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from collections.abc import AsyncGenerator 

2from contextlib import asynccontextmanager 

3from typing import TYPE_CHECKING, Optional, Union 

4from uuid import UUID 

5 

6import anyio 

7 

8from prefect.concurrency.v1._asyncio import ( 

9 acquire_concurrency_slots, 

10 release_concurrency_slots, 

11) 

12from prefect.concurrency.v1._events import ( 

13 emit_concurrency_acquisition_events, 

14 emit_concurrency_release_events, 

15) 

16from prefect.concurrency.v1.context import ConcurrencyContext 

17from prefect.types._datetime import now 

18 

19from ._asyncio import ( 

20 AcquireConcurrencySlotTimeoutError as AcquireConcurrencySlotTimeoutError, 

21) 

22from ._asyncio import ConcurrencySlotAcquisitionError as ConcurrencySlotAcquisitionError 

23 

24 

25@asynccontextmanager 

26async def concurrency( 

27 names: Union[str, list[str]], 

28 task_run_id: UUID, 

29 timeout_seconds: Optional[float] = None, 

30) -> AsyncGenerator[None, None]: 

31 """A context manager that acquires and releases concurrency slots from the 

32 given concurrency limits. 

33 

34 Args: 

35 names: The names of the concurrency limits to acquire slots from. 

36 task_run_id: The name of the task_run_id that is incrementing the slots. 

37 timeout_seconds: The number of seconds to wait for the slots to be acquired before 

38 raising a `TimeoutError`. A timeout of `None` will wait indefinitely. 

39 

40 Raises: 

41 TimeoutError: If the slots are not acquired within the given timeout. 

42 

43 Example: 

44 A simple example of using the async `concurrency` context manager: 

45 ```python 

46 from prefect.concurrency.v1.asyncio import concurrency 

47 

48 async def resource_heavy(): 

49 async with concurrency("test", task_run_id): 

50 print("Resource heavy task") 

51 

52 async def main(): 

53 await resource_heavy() 

54 ``` 

55 """ 

56 if not names: 

57 yield 

58 return 

59 

60 names_normalized: list[str] = names if isinstance(names, list) else [names] 

61 

62 acquire_slots = acquire_concurrency_slots( 

63 names_normalized, 

64 task_run_id=task_run_id, 

65 timeout_seconds=timeout_seconds, 

66 ) 

67 if TYPE_CHECKING: 

68 assert not isinstance(acquire_slots, list) 

69 limits = await acquire_slots 

70 acquisition_time = now("UTC") 

71 emitted_events = emit_concurrency_acquisition_events(limits, task_run_id) 

72 

73 try: 

74 yield 

75 finally: 

76 occupancy_period = now("UTC") - acquisition_time 

77 try: 

78 release_slots = release_concurrency_slots( 

79 names_normalized, task_run_id, occupancy_period.total_seconds() 

80 ) 

81 if TYPE_CHECKING: 

82 assert not isinstance(release_slots, list) 

83 await release_slots 

84 except anyio.get_cancelled_exc_class(): 

85 # The task was cancelled before it could release the slots. Add the 

86 # slots to the cleanup list so they can be released when the 

87 # concurrency context is exited. 

88 if ctx := ConcurrencyContext.get(): 

89 ctx.cleanup_slots.append( 

90 (names_normalized, occupancy_period.total_seconds(), task_run_id) 

91 ) 

92 

93 emit_concurrency_release_events(limits, emitted_events, task_run_id)