Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/repossessor.py: 89%
30 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from datetime import datetime, timezone 1a
3from prefect.server.concurrency.lease_storage import ( 1a
4 ConcurrencyLeaseStorage,
5 get_concurrency_lease_storage,
6)
7from prefect.server.database.dependencies import provide_database_interface 1a
8from prefect.server.models.concurrency_limits_v2 import bulk_decrement_active_slots 1a
9from prefect.server.services.base import LoopService 1a
10from prefect.settings.context import get_current_settings 1a
11from prefect.settings.models.server.services import ServicesBaseSetting 1a
14class Repossessor(LoopService): 1a
15 """
16 Handles the reconciliation of expired leases; no tow truck dependency.
17 """
19 def __init__(self): 1a
20 super().__init__( 1c
21 loop_seconds=get_current_settings().server.services.repossessor.loop_seconds,
22 )
23 self.concurrency_lease_storage: ConcurrencyLeaseStorage = ( 1c
24 get_concurrency_lease_storage()
25 )
27 @classmethod 1a
28 def service_settings(cls) -> ServicesBaseSetting: 1a
29 return get_current_settings().server.services.repossessor 1c
31 async def run_once(self) -> None: 1a
32 expired_lease_ids = ( 1cbd
33 await self.concurrency_lease_storage.read_expired_lease_ids()
34 )
35 if expired_lease_ids: 1cbd
36 self.logger.info(f"Revoking {len(expired_lease_ids)} expired leases") 1b
38 db = provide_database_interface() 1cbd
39 async with db.session_context() as session: 1cbd
40 for expired_lease_id in expired_lease_ids: 1cbd
41 expired_lease = await self.concurrency_lease_storage.read_lease( 1b
42 expired_lease_id
43 )
44 if expired_lease is None or expired_lease.metadata is None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true1b
45 self.logger.warning(
46 f"Lease {expired_lease_id} should be revoked but was not found or has no metadata"
47 )
48 continue
49 occupancy_seconds = ( 1b
50 datetime.now(timezone.utc) - expired_lease.created_at
51 ).total_seconds()
52 self.logger.info( 1b
53 f"Revoking lease {expired_lease_id} for {len(expired_lease.resource_ids)} concurrency limits with {expired_lease.metadata.slots} slots"
54 )
55 await bulk_decrement_active_slots( 1b
56 session=session,
57 concurrency_limit_ids=expired_lease.resource_ids,
58 slots=expired_lease.metadata.slots,
59 occupancy_seconds=occupancy_seconds,
60 )
61 await self.concurrency_lease_storage.revoke_lease(expired_lease_id)
63 await session.commit() 1b