Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/_leases.py: 18%
67 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import asyncio 1a
2import concurrent.futures 1a
3from contextlib import asynccontextmanager, contextmanager 1a
4from typing import AsyncGenerator, Generator 1a
5from uuid import UUID 1a
7from prefect._internal.concurrency.api import create_call 1a
8from prefect._internal.concurrency.cancellation import ( 1a
9 AsyncCancelScope,
10 WatcherThreadCancelScope,
11)
12from prefect._internal.concurrency.threads import get_global_loop 1a
13from prefect.client.orchestration import get_client 1a
14from prefect.logging.loggers import get_logger, get_run_logger 1a
17async def _lease_renewal_loop( 1a
18 lease_id: UUID,
19 lease_duration: float,
20) -> None:
21 """
22 Maintain a concurrency lease by renewing it after the given interval.
24 Args:
25 lease_id: The ID of the lease to maintain.
26 lease_duration: The duration of the lease in seconds.
27 """
28 async with get_client() as client:
29 while True:
30 await client.renew_concurrency_lease(
31 lease_id=lease_id, lease_duration=lease_duration
32 )
33 await asyncio.sleep( # Renew the lease 3/4 of the way through the lease duration
34 lease_duration * 0.75
35 )
38@contextmanager 1a
39def maintain_concurrency_lease( 1a
40 lease_id: UUID,
41 lease_duration: float,
42 raise_on_lease_renewal_failure: bool = False,
43 suppress_warnings: bool = False,
44) -> Generator[None, None, None]:
45 """
46 Maintain a concurrency lease for the given lease ID.
48 Args:
49 lease_id: The ID of the lease to maintain.
50 lease_duration: The duration of the lease in seconds.
51 raise_on_lease_renewal_failure: A boolean specifying whether to raise an error if the lease renewal fails.
52 """
53 # Start a loop to renew the lease on the global event loop to avoid blocking the main thread
54 global_loop = get_global_loop()
55 lease_renewal_call = create_call(
56 _lease_renewal_loop,
57 lease_id,
58 lease_duration,
59 )
60 global_loop.submit(lease_renewal_call)
62 with WatcherThreadCancelScope() as cancel_scope:
64 def handle_lease_renewal_failure(future: concurrent.futures.Future[None]):
65 if future.cancelled():
66 return
67 exc = future.exception()
68 if exc:
69 try:
70 # Use a run logger if available
71 logger = get_run_logger()
72 except Exception:
73 logger = get_logger("concurrency")
74 if raise_on_lease_renewal_failure:
75 logger.error(
76 "Concurrency lease renewal failed - slots are no longer reserved. Terminating execution to prevent over-allocation."
77 )
78 assert cancel_scope.cancel()
79 else:
80 if suppress_warnings:
81 logger.debug(
82 "Concurrency lease renewal failed - slots are no longer reserved. Execution will continue, but concurrency limits may be exceeded."
83 )
84 else:
85 logger.warning(
86 "Concurrency lease renewal failed - slots are no longer reserved. Execution will continue, but concurrency limits may be exceeded."
87 )
89 lease_renewal_call.future.add_done_callback(handle_lease_renewal_failure)
91 try:
92 yield
93 finally:
94 # Cancel the lease renewal loop
95 lease_renewal_call.cancel()
98@asynccontextmanager 1a
99async def amaintain_concurrency_lease( 1a
100 lease_id: UUID,
101 lease_duration: float,
102 raise_on_lease_renewal_failure: bool = False,
103 suppress_warnings: bool = False,
104) -> AsyncGenerator[None, None]:
105 """
106 Maintain a concurrency lease for the given lease ID.
108 Args:
109 lease_id: The ID of the lease to maintain.
110 lease_duration: The duration of the lease in seconds.
111 raise_on_lease_renewal_failure: A boolean specifying whether to raise an error if the lease renewal fails.
112 """
113 lease_renewal_task = asyncio.create_task(
114 _lease_renewal_loop(lease_id, lease_duration)
115 )
116 with AsyncCancelScope() as cancel_scope:
118 def handle_lease_renewal_failure(task: asyncio.Task[None]):
119 if task.cancelled():
120 # Cancellation is the expected way for this loop to stop
121 return
122 exc = task.exception()
123 if exc:
124 try:
125 # Use a run logger if available
126 logger = get_run_logger()
127 except Exception:
128 logger = get_logger("concurrency")
129 if raise_on_lease_renewal_failure:
130 logger.error(
131 "Concurrency lease renewal failed - slots are no longer reserved. Terminating execution to prevent over-allocation."
132 )
133 cancel_scope.cancel()
134 else:
135 if suppress_warnings:
136 logger.debug(
137 "Concurrency lease renewal failed - slots are no longer reserved. Execution will continue, but concurrency limits may be exceeded."
138 )
139 else:
140 logger.warning(
141 "Concurrency lease renewal failed - slots are no longer reserved. Execution will continue, but concurrency limits may be exceeded."
142 )
144 # Add a callback to stop execution if the lease renewal fails and strict is True
145 lease_renewal_task.add_done_callback(handle_lease_renewal_failure)
146 try:
147 yield
148 finally:
149 lease_renewal_task.cancel()
150 try:
151 await lease_renewal_task
152 except (asyncio.CancelledError, Exception):
153 # Handling for errors will be done in the callback
154 pass