Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/repossessor.py: 89%

30 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from datetime import datetime, timezone 1a

2 

3from prefect.server.concurrency.lease_storage import ( 1a

4 ConcurrencyLeaseStorage, 

5 get_concurrency_lease_storage, 

6) 

7from prefect.server.database.dependencies import provide_database_interface 1a

8from prefect.server.models.concurrency_limits_v2 import bulk_decrement_active_slots 1a

9from prefect.server.services.base import LoopService 1a

10from prefect.settings.context import get_current_settings 1a

11from prefect.settings.models.server.services import ServicesBaseSetting 1a

12 

13 

14class Repossessor(LoopService): 1a

15 """ 

16 Handles the reconciliation of expired leases; no tow truck dependency. 

17 """ 

18 

19 def __init__(self): 1a

20 super().__init__( 1c

21 loop_seconds=get_current_settings().server.services.repossessor.loop_seconds, 

22 ) 

23 self.concurrency_lease_storage: ConcurrencyLeaseStorage = ( 1c

24 get_concurrency_lease_storage() 

25 ) 

26 

27 @classmethod 1a

28 def service_settings(cls) -> ServicesBaseSetting: 1a

29 return get_current_settings().server.services.repossessor 1c

30 

31 async def run_once(self) -> None: 1a

32 expired_lease_ids = ( 1cbd

33 await self.concurrency_lease_storage.read_expired_lease_ids() 

34 ) 

35 if expired_lease_ids: 1cbd

36 self.logger.info(f"Revoking {len(expired_lease_ids)} expired leases") 1b

37 

38 db = provide_database_interface() 1cbd

39 async with db.session_context() as session: 1cbd

40 for expired_lease_id in expired_lease_ids: 1cbd

41 expired_lease = await self.concurrency_lease_storage.read_lease( 1b

42 expired_lease_id 

43 ) 

44 if expired_lease is None or expired_lease.metadata is None: 44 ↛ 45line 44 didn't jump to line 45 because the condition on line 44 was never true1b

45 self.logger.warning( 

46 f"Lease {expired_lease_id} should be revoked but was not found or has no metadata" 

47 ) 

48 continue 

49 occupancy_seconds = ( 1b

50 datetime.now(timezone.utc) - expired_lease.created_at 

51 ).total_seconds() 

52 self.logger.info( 1b

53 f"Revoking lease {expired_lease_id} for {len(expired_lease.resource_ids)} concurrency limits with {expired_lease.metadata.slots} slots" 

54 ) 

55 await bulk_decrement_active_slots( 1b

56 session=session, 

57 concurrency_limit_ids=expired_lease.resource_ids, 

58 slots=expired_lease.metadata.slots, 

59 occupancy_seconds=occupancy_seconds, 

60 ) 

61 await self.concurrency_lease_storage.revoke_lease(expired_lease_id) 

62 

63 await session.commit() 1b