Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/_leases.py: 18%

67 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import asyncio 1a

2import concurrent.futures 1a

3from contextlib import asynccontextmanager, contextmanager 1a

4from typing import AsyncGenerator, Generator 1a

5from uuid import UUID 1a

6 

7from prefect._internal.concurrency.api import create_call 1a

8from prefect._internal.concurrency.cancellation import ( 1a

9 AsyncCancelScope, 

10 WatcherThreadCancelScope, 

11) 

12from prefect._internal.concurrency.threads import get_global_loop 1a

13from prefect.client.orchestration import get_client 1a

14from prefect.logging.loggers import get_logger, get_run_logger 1a

15 

16 

17async def _lease_renewal_loop( 1a

18 lease_id: UUID, 

19 lease_duration: float, 

20) -> None: 

21 """ 

22 Maintain a concurrency lease by renewing it after the given interval. 

23 

24 Args: 

25 lease_id: The ID of the lease to maintain. 

26 lease_duration: The duration of the lease in seconds. 

27 """ 

28 async with get_client() as client: 

29 while True: 

30 await client.renew_concurrency_lease( 

31 lease_id=lease_id, lease_duration=lease_duration 

32 ) 

33 await asyncio.sleep( # Renew the lease 3/4 of the way through the lease duration 

34 lease_duration * 0.75 

35 ) 

36 

37 

38@contextmanager 1a

39def maintain_concurrency_lease( 1a

40 lease_id: UUID, 

41 lease_duration: float, 

42 raise_on_lease_renewal_failure: bool = False, 

43 suppress_warnings: bool = False, 

44) -> Generator[None, None, None]: 

45 """ 

46 Maintain a concurrency lease for the given lease ID. 

47 

48 Args: 

49 lease_id: The ID of the lease to maintain. 

50 lease_duration: The duration of the lease in seconds. 

51 raise_on_lease_renewal_failure: A boolean specifying whether to raise an error if the lease renewal fails. 

52 """ 

53 # Start a loop to renew the lease on the global event loop to avoid blocking the main thread 

54 global_loop = get_global_loop() 

55 lease_renewal_call = create_call( 

56 _lease_renewal_loop, 

57 lease_id, 

58 lease_duration, 

59 ) 

60 global_loop.submit(lease_renewal_call) 

61 

62 with WatcherThreadCancelScope() as cancel_scope: 

63 

64 def handle_lease_renewal_failure(future: concurrent.futures.Future[None]): 

65 if future.cancelled(): 

66 return 

67 exc = future.exception() 

68 if exc: 

69 try: 

70 # Use a run logger if available 

71 logger = get_run_logger() 

72 except Exception: 

73 logger = get_logger("concurrency") 

74 if raise_on_lease_renewal_failure: 

75 logger.error( 

76 "Concurrency lease renewal failed - slots are no longer reserved. Terminating execution to prevent over-allocation." 

77 ) 

78 assert cancel_scope.cancel() 

79 else: 

80 if suppress_warnings: 

81 logger.debug( 

82 "Concurrency lease renewal failed - slots are no longer reserved. Execution will continue, but concurrency limits may be exceeded." 

83 ) 

84 else: 

85 logger.warning( 

86 "Concurrency lease renewal failed - slots are no longer reserved. Execution will continue, but concurrency limits may be exceeded." 

87 ) 

88 

89 lease_renewal_call.future.add_done_callback(handle_lease_renewal_failure) 

90 

91 try: 

92 yield 

93 finally: 

94 # Cancel the lease renewal loop 

95 lease_renewal_call.cancel() 

96 

97 

98@asynccontextmanager 1a

99async def amaintain_concurrency_lease( 1a

100 lease_id: UUID, 

101 lease_duration: float, 

102 raise_on_lease_renewal_failure: bool = False, 

103 suppress_warnings: bool = False, 

104) -> AsyncGenerator[None, None]: 

105 """ 

106 Maintain a concurrency lease for the given lease ID. 

107 

108 Args: 

109 lease_id: The ID of the lease to maintain. 

110 lease_duration: The duration of the lease in seconds. 

111 raise_on_lease_renewal_failure: A boolean specifying whether to raise an error if the lease renewal fails. 

112 """ 

113 lease_renewal_task = asyncio.create_task( 

114 _lease_renewal_loop(lease_id, lease_duration) 

115 ) 

116 with AsyncCancelScope() as cancel_scope: 

117 

118 def handle_lease_renewal_failure(task: asyncio.Task[None]): 

119 if task.cancelled(): 

120 # Cancellation is the expected way for this loop to stop 

121 return 

122 exc = task.exception() 

123 if exc: 

124 try: 

125 # Use a run logger if available 

126 logger = get_run_logger() 

127 except Exception: 

128 logger = get_logger("concurrency") 

129 if raise_on_lease_renewal_failure: 

130 logger.error( 

131 "Concurrency lease renewal failed - slots are no longer reserved. Terminating execution to prevent over-allocation." 

132 ) 

133 cancel_scope.cancel() 

134 else: 

135 if suppress_warnings: 

136 logger.debug( 

137 "Concurrency lease renewal failed - slots are no longer reserved. Execution will continue, but concurrency limits may be exceeded." 

138 ) 

139 else: 

140 logger.warning( 

141 "Concurrency lease renewal failed - slots are no longer reserved. Execution will continue, but concurrency limits may be exceeded." 

142 ) 

143 

144 # Add a callback to stop execution if the lease renewal fails and strict is True 

145 lease_renewal_task.add_done_callback(handle_lease_renewal_failure) 

146 try: 

147 yield 

148 finally: 

149 lease_renewal_task.cancel() 

150 try: 

151 await lease_renewal_task 

152 except (asyncio.CancelledError, Exception): 

153 # Handling for errors will be done in the callback 

154 pass