Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/asyncio.py: 0%

20 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from collections.abc import AsyncGenerator 

2from contextlib import asynccontextmanager 

3from typing import TYPE_CHECKING, Optional, Union 

4 

5from ._asyncio import ( 

6 AcquireConcurrencySlotTimeoutError as AcquireConcurrencySlotTimeoutError, 

7) 

8from ._asyncio import ConcurrencySlotAcquisitionError as ConcurrencySlotAcquisitionError 

9from ._asyncio import ( 

10 aacquire_concurrency_slots, 

11) 

12from ._asyncio import ( 

13 concurrency as _concurrency_internal, 

14) 

15from ._events import ( 

16 emit_concurrency_acquisition_events, 

17) 

18 

19if TYPE_CHECKING: 

20 from prefect.client.schemas.objects import ConcurrencyLeaseHolder 

21 

22 

23@asynccontextmanager 

24async def concurrency( 

25 names: Union[str, list[str]], 

26 occupy: int = 1, 

27 timeout_seconds: Optional[float] = None, 

28 max_retries: Optional[int] = None, 

29 lease_duration: float = 300, 

30 strict: bool = False, 

31 holder: "Optional[ConcurrencyLeaseHolder]" = None, 

32) -> AsyncGenerator[None, None]: 

33 """A 

34 context manager that acquires and releases concurrency slots from the 

35 given concurrency limits. 

36 

37 Args: 

38 names: The names of the concurrency limits to acquire slots from. 

39 occupy: The number of slots to acquire and hold from each limit. 

40 timeout_seconds: The number of seconds to wait for the slots to be acquired before 

41 raising a `TimeoutError`. A timeout of `None` will wait indefinitely. 

42 max_retries: The maximum number of retries to acquire the concurrency slots. 

43 lease_duration: The duration of the lease for the acquired slots in seconds. 

44 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist. 

45 Defaults to `False`. 

46 holder: A dictionary containing information about the holder of the concurrency slots. 

47 Typically includes 'type' and 'id' keys. 

48 

49 Raises: 

50 TimeoutError: If the slots are not acquired within the given timeout. 

51 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`. 

52 

53 Example: 

54 A simple example of using the async `concurrency` context manager: 

55 ```python 

56 from prefect.concurrency.asyncio import concurrency 

57 

58 async def resource_heavy(): 

59 async with concurrency("test", occupy=1): 

60 print("Resource heavy task") 

61 

62 async def main(): 

63 await resource_heavy() 

64 ``` 

65 """ 

66 async with _concurrency_internal( 

67 names=names, 

68 occupy=occupy, 

69 timeout_seconds=timeout_seconds, 

70 max_retries=max_retries, 

71 lease_duration=lease_duration, 

72 strict=strict, 

73 holder=holder, 

74 suppress_warnings=False, 

75 ): 

76 yield 

77 

78 

79async def rate_limit( 

80 names: Union[str, list[str]], 

81 occupy: int = 1, 

82 timeout_seconds: Optional[float] = None, 

83 strict: bool = False, 

84) -> None: 

85 """ 

86 Block execution until an `occupy` number of slots of the concurrency 

87 limits given in `names` are acquired. 

88 

89 Requires that all given concurrency limits have a slot decay. 

90 

91 Args: 

92 names: The names of the concurrency limits to acquire slots from. 

93 occupy: The number of slots to acquire and hold from each limit. 

94 timeout_seconds: The number of seconds to wait for the slots to be acquired before 

95 raising a `TimeoutError`. A timeout of `None` will wait indefinitely. 

96 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist. 

97 Defaults to `False`. 

98 

99 Raises: 

100 TimeoutError: If the slots are not acquired within the given timeout. 

101 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`. 

102 """ 

103 if not names: 

104 return 

105 

106 names = names if isinstance(names, list) else [names] 

107 

108 limits = await aacquire_concurrency_slots( 

109 names=names, 

110 slots=occupy, 

111 mode="rate_limit", 

112 timeout_seconds=timeout_seconds, 

113 strict=strict, 

114 ) 

115 emit_concurrency_acquisition_events(limits, occupy)