Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/repossessor.py: 61%
30 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from datetime import datetime, timezone 1a
3from prefect.server.concurrency.lease_storage import ( 1a
4 ConcurrencyLeaseStorage,
5 get_concurrency_lease_storage,
6)
7from prefect.server.database.dependencies import provide_database_interface 1a
8from prefect.server.models.concurrency_limits_v2 import bulk_decrement_active_slots 1a
9from prefect.server.services.base import LoopService 1a
10from prefect.settings.context import get_current_settings 1a
11from prefect.settings.models.server.services import ServicesBaseSetting 1a
14class Repossessor(LoopService): 1a
15 """
16 Handles the reconciliation of expired leases; no tow truck dependency.
17 """
19 def __init__(self): 1a
20 super().__init__( 1b
21 loop_seconds=get_current_settings().server.services.repossessor.loop_seconds,
22 )
23 self.concurrency_lease_storage: ConcurrencyLeaseStorage = ( 1b
24 get_concurrency_lease_storage()
25 )
27 @classmethod 1a
28 def service_settings(cls) -> ServicesBaseSetting: 1a
29 return get_current_settings().server.services.repossessor 1b
31 async def run_once(self) -> None: 1a
32 expired_lease_ids = ( 1b
33 await self.concurrency_lease_storage.read_expired_lease_ids()
34 )
35 if expired_lease_ids: 35 ↛ 36line 35 didn't jump to line 36 because the condition on line 35 was never true1b
36 self.logger.info(f"Revoking {len(expired_lease_ids)} expired leases")
38 db = provide_database_interface() 1b
39 async with db.session_context() as session: 1b
40 for expired_lease_id in expired_lease_ids: 40 ↛ 41line 40 didn't jump to line 41 because the loop on line 40 never started1b
41 expired_lease = await self.concurrency_lease_storage.read_lease(
42 expired_lease_id
43 )
44 if expired_lease is None or expired_lease.metadata is None:
45 self.logger.warning(
46 f"Lease {expired_lease_id} should be revoked but was not found or has no metadata"
47 )
48 continue
49 occupancy_seconds = (
50 datetime.now(timezone.utc) - expired_lease.created_at
51 ).total_seconds()
52 self.logger.info(
53 f"Revoking lease {expired_lease_id} for {len(expired_lease.resource_ids)} concurrency limits with {expired_lease.metadata.slots} slots"
54 )
55 await bulk_decrement_active_slots(
56 session=session,
57 concurrency_limit_ids=expired_lease.resource_ids,
58 slots=expired_lease.metadata.slots,
59 occupancy_seconds=occupancy_seconds,
60 )
61 await self.concurrency_lease_storage.revoke_lease(expired_lease_id)
63 await session.commit()