Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/concurrency/lease_storage/memory.py: 95%

51 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3from datetime import datetime, timedelta, timezone 1a

4from uuid import UUID 1a

5 

6from prefect.server.concurrency.lease_storage import ( 1a

7 ConcurrencyLeaseHolder, 

8 ConcurrencyLimitLeaseMetadata, 

9) 

10from prefect.server.concurrency.lease_storage import ( 1a

11 ConcurrencyLeaseStorage as _ConcurrencyLeaseStorage, 

12) 

13from prefect.server.utilities.leasing import ResourceLease 1a

14 

15 

16class ConcurrencyLeaseStorage(_ConcurrencyLeaseStorage): 1a

17 """ 

18 A singleton concurrency lease storage implementation that stores leases in memory. 

19 """ 

20 

21 _instance: "ConcurrencyLeaseStorage | None" = None 1a

22 _initialized: bool = False 1a

23 

24 def __new__(cls) -> "ConcurrencyLeaseStorage": 1a

25 if cls._instance is None: 1abdec

26 cls._instance = super().__new__(cls) 1a

27 return cls._instance 1abdec

28 

29 def __init__(self): 1a

30 if self.__class__._initialized: 1abdec

31 return 1abdec

32 

33 self.leases: dict[UUID, ResourceLease[ConcurrencyLimitLeaseMetadata]] = {} 1a

34 self.expirations: dict[UUID, datetime] = {} 1a

35 self.__class__._initialized = True 1a

36 

37 async def create_lease( 1a

38 self, 

39 resource_ids: list[UUID], 

40 ttl: timedelta, 

41 metadata: ConcurrencyLimitLeaseMetadata | None = None, 

42 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata]: 

43 expiration = datetime.now(timezone.utc) + ttl 1bc

44 lease = ResourceLease( 1bc

45 resource_ids=resource_ids, metadata=metadata, expiration=expiration 

46 ) 

47 self.leases[lease.id] = lease 1bc

48 self.expirations[lease.id] = expiration 1bc

49 return lease 1bc

50 

51 async def read_lease( 1a

52 self, lease_id: UUID 

53 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata] | None: 

54 return self.leases.get(lease_id) 1bc

55 

56 async def renew_lease(self, lease_id: UUID, ttl: timedelta) -> bool: 1a

57 """ 

58 Atomically renew a concurrency lease by updating its expiration. 

59 

60 Checks if the lease exists before updating the expiration index, 

61 preventing orphaned index entries. 

62 

63 Args: 

64 lease_id: The ID of the lease to renew 

65 ttl: The new time-to-live duration 

66 

67 Returns: 

68 True if the lease was renewed, False if it didn't exist 

69 """ 

70 if lease_id not in self.leases: 70 ↛ 75line 70 didn't jump to line 75 because the condition on line 70 was always true1b

71 # Clean up any orphaned expiration entry 

72 self.expirations.pop(lease_id, None) 1b

73 return False 1b

74 

75 self.expirations[lease_id] = datetime.now(timezone.utc) + ttl 

76 return True 

77 

78 async def revoke_lease(self, lease_id: UUID) -> None: 1a

79 self.leases.pop(lease_id, None) 1bc

80 self.expirations.pop(lease_id, None) 1bc

81 

82 async def read_active_lease_ids( 1a

83 self, limit: int = 100, offset: int = 0 

84 ) -> list[UUID]: 

85 now = datetime.now(timezone.utc) 1bc

86 active_leases = [ 1bc

87 lease_id 

88 for lease_id, expiration in self.expirations.items() 

89 if expiration > now 

90 ] 

91 return active_leases[offset : offset + limit] 1bc

92 

93 async def read_expired_lease_ids(self, limit: int = 100) -> list[UUID]: 1a

94 now = datetime.now(timezone.utc) 1abc

95 expired_leases = [ 1abc

96 lease_id 

97 for lease_id, expiration in self.expirations.items() 

98 if expiration < now 

99 ] 

100 return expired_leases[:limit] 1abc

101 

102 async def list_holders_for_limit( 1a

103 self, limit_id: UUID 

104 ) -> list[tuple[UUID, ConcurrencyLeaseHolder]]: 

105 """List all holders for a given concurrency limit.""" 

106 now = datetime.now(timezone.utc) 1bdec

107 holders_with_leases: list[tuple[UUID, ConcurrencyLeaseHolder]] = [] 1bdec

108 

109 for lease_id, lease in self.leases.items(): 1bdec

110 # Check if lease is active and for the specified limit 

111 if ( 

112 limit_id in lease.resource_ids 

113 and self.expirations.get(lease_id, now) > now 

114 and lease.metadata 

115 and lease.metadata.holder 

116 ): 

117 holders_with_leases.append((lease.id, lease.metadata.holder)) 

118 

119 return holders_with_leases 1bdec