Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/sync.py: 0%

19 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from collections.abc import Generator 

2from contextlib import contextmanager 

3from typing import TYPE_CHECKING, Optional, TypeVar, Union 

4 

5from ._events import ( 

6 emit_concurrency_acquisition_events, 

7) 

8from ._sync import ( 

9 acquire_concurrency_slots as _acquire_concurrency_slots, 

10) 

11from ._sync import ( 

12 concurrency as _concurrency_internal, 

13) 

14 

15if TYPE_CHECKING: 

16 from prefect.client.schemas.objects import ConcurrencyLeaseHolder 

17 

18T = TypeVar("T") 

19 

20 

21@contextmanager 

22def concurrency( 

23 names: Union[str, list[str]], 

24 occupy: int = 1, 

25 timeout_seconds: Optional[float] = None, 

26 max_retries: Optional[int] = None, 

27 lease_duration: float = 300, 

28 strict: bool = False, 

29 holder: "Optional[ConcurrencyLeaseHolder]" = None, 

30) -> Generator[None, None, None]: 

31 """A context manager that acquires and releases concurrency slots from the 

32 given concurrency limits. 

33 

34 Args: 

35 names: The names of the concurrency limits to acquire slots from. 

36 occupy: The number of slots to acquire and hold from each limit. 

37 timeout_seconds: The number of seconds to wait for the slots to be acquired before 

38 raising a `TimeoutError`. A timeout of `None` will wait indefinitely. 

39 max_retries: The maximum number of retries to acquire the concurrency slots. 

40 lease_duration: The duration of the lease for the acquired slots in seconds. 

41 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist. 

42 Defaults to `False`. 

43 holder: A dictionary containing information about the holder of the concurrency slots. 

44 Typically includes 'type' and 'id' keys. 

45 

46 Raises: 

47 TimeoutError: If the slots are not acquired within the given timeout. 

48 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`. 

49 

50 Example: 

51 A simple example of using the sync `concurrency` context manager: 

52 ```python 

53 from prefect.concurrency.sync import concurrency 

54 

55 def resource_heavy(): 

56 with concurrency("test", occupy=1): 

57 print("Resource heavy task") 

58 

59 def main(): 

60 resource_heavy() 

61 ``` 

62 """ 

63 with _concurrency_internal( 

64 names=names, 

65 occupy=occupy, 

66 timeout_seconds=timeout_seconds, 

67 max_retries=max_retries, 

68 lease_duration=lease_duration, 

69 strict=strict, 

70 holder=holder, 

71 suppress_warnings=False, 

72 ): 

73 yield 

74 

75 

76def rate_limit( 

77 names: Union[str, list[str]], 

78 occupy: int = 1, 

79 timeout_seconds: Optional[float] = None, 

80 strict: bool = False, 

81) -> None: 

82 """Block execution until an `occupy` number of slots of the concurrency 

83 limits given in `names` are acquired. Requires that all given concurrency 

84 limits have a slot decay. 

85 

86 Args: 

87 names: The names of the concurrency limits to acquire slots from. 

88 occupy: The number of slots to acquire and hold from each limit. 

89 timeout_seconds: The number of seconds to wait for the slots to be acquired before 

90 raising a `TimeoutError`. A timeout of `None` will wait indefinitely. 

91 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist. 

92 Defaults to `False`. 

93 

94 Raises: 

95 TimeoutError: If the slots are not acquired within the given timeout. 

96 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`. 

97 """ 

98 if not names: 

99 return 

100 

101 names = names if isinstance(names, list) else [names] 

102 

103 limits = _acquire_concurrency_slots( 

104 names, 

105 occupy, 

106 mode="rate_limit", 

107 timeout_seconds=timeout_seconds, 

108 strict=strict, 

109 ) 

110 emit_concurrency_acquisition_events(limits, occupy)