Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/_sync.py: 45%

31 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3from contextlib import contextmanager 1a

4from typing import Generator, Literal, Optional 1a

5from uuid import UUID 1a

6 

7from prefect.client.schemas.objects import ConcurrencyLeaseHolder 1a

8from prefect.client.schemas.responses import ( 1a

9 ConcurrencyLimitWithLeaseResponse, 

10 MinimalConcurrencyLimitResponse, 

11) 

12from prefect.concurrency._asyncio import ( 1a

13 aacquire_concurrency_slots, 

14 aacquire_concurrency_slots_with_lease, 

15 arelease_concurrency_slots_with_lease, 

16) 

17from prefect.concurrency._events import ( 1a

18 emit_concurrency_acquisition_events, 

19 emit_concurrency_release_events, 

20) 

21from prefect.concurrency._leases import maintain_concurrency_lease 1a

22from prefect.utilities.asyncutils import run_coro_as_sync 1a

23 

24 

25def release_concurrency_slots_with_lease(lease_id: UUID) -> None: 1a

26 run_coro_as_sync(arelease_concurrency_slots_with_lease(lease_id)) 

27 

28 

29def acquire_concurrency_slots( 1a

30 names: list[str], 

31 slots: int, 

32 mode: Literal["concurrency", "rate_limit"] = "concurrency", 

33 timeout_seconds: Optional[float] = None, 

34 max_retries: Optional[int] = None, 

35 strict: bool = False, 

36) -> list[MinimalConcurrencyLimitResponse]: 

37 result = run_coro_as_sync( 

38 aacquire_concurrency_slots( 

39 names, slots, mode, timeout_seconds, max_retries, strict 

40 ) 

41 ) 

42 return result 

43 

44 

45def acquire_concurrency_slots_with_lease( 1a

46 names: list[str], 

47 slots: int, 

48 mode: Literal["concurrency", "rate_limit"] = "concurrency", 

49 timeout_seconds: Optional[float] = None, 

50 max_retries: Optional[int] = None, 

51 lease_duration: float = 300, 

52 strict: bool = False, 

53 holder: "Optional[ConcurrencyLeaseHolder]" = None, 

54 suppress_warnings: bool = False, 

55) -> ConcurrencyLimitWithLeaseResponse: 

56 result = run_coro_as_sync( 

57 aacquire_concurrency_slots_with_lease( 

58 names, 

59 slots, 

60 mode, 

61 timeout_seconds, 

62 max_retries, 

63 lease_duration, 

64 strict, 

65 holder, 

66 suppress_warnings, 

67 ) 

68 ) 

69 return result 

70 

71 

72@contextmanager 1a

73def concurrency( 1a

74 names: str | list[str], 

75 occupy: int = 1, 

76 timeout_seconds: Optional[float] = None, 

77 max_retries: Optional[int] = None, 

78 lease_duration: float = 300, 

79 strict: bool = False, 

80 holder: "Optional[ConcurrencyLeaseHolder]" = None, 

81 suppress_warnings: bool = False, 

82) -> Generator[None, None, None]: 

83 """A context manager that acquires and releases concurrency slots from the 

84 given concurrency limits. 

85 

86 Args: 

87 names: The names of the concurrency limits to acquire slots from. 

88 occupy: The number of slots to acquire and hold from each limit. 

89 timeout_seconds: The number of seconds to wait for the slots to be acquired before 

90 raising a `TimeoutError`. A timeout of `None` will wait indefinitely. 

91 max_retries: The maximum number of retries to acquire the concurrency slots. 

92 lease_duration: The duration of the lease for the acquired slots in seconds. 

93 strict: A boolean specifying whether to raise an error if the concurrency limit does not exist. 

94 Defaults to `False`. 

95 holder: A dictionary containing information about the holder of the concurrency slots. 

96 Typically includes 'type' and 'id' keys. 

97 

98 Raises: 

99 TimeoutError: If the slots are not acquired within the given timeout. 

100 ConcurrencySlotAcquisitionError: If the concurrency limit does not exist and `strict` is `True`. 

101 

102 Example: 

103 A simple example of using the sync `concurrency` context manager: 

104 ```python 

105 from prefect.concurrency.sync import concurrency 

106 

107 def resource_heavy(): 

108 with concurrency("test", occupy=1): 

109 print("Resource heavy task") 

110 

111 def main(): 

112 resource_heavy() 

113 ``` 

114 """ 

115 if not names: 

116 yield 

117 return 

118 

119 names = names if isinstance(names, list) else [names] 

120 

121 acquisition_response = acquire_concurrency_slots_with_lease( 

122 names, 

123 occupy, 

124 timeout_seconds=timeout_seconds, 

125 strict=strict, 

126 lease_duration=lease_duration, 

127 max_retries=max_retries, 

128 holder=holder, 

129 suppress_warnings=suppress_warnings, 

130 ) 

131 emitted_events = emit_concurrency_acquisition_events( 

132 acquisition_response.limits, occupy 

133 ) 

134 

135 try: 

136 with maintain_concurrency_lease( 

137 acquisition_response.lease_id, 

138 lease_duration, 

139 raise_on_lease_renewal_failure=strict, 

140 suppress_warnings=suppress_warnings, 

141 ): 

142 yield 

143 finally: 

144 release_concurrency_slots_with_lease(acquisition_response.lease_id) 

145 emit_concurrency_release_events( 

146 acquisition_response.limits, occupy, emitted_events 

147 )