Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/concurrency/lease_storage/memory.py: 95%
51 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3from datetime import datetime, timedelta, timezone 1a
4from uuid import UUID 1a
6from prefect.server.concurrency.lease_storage import ( 1a
7 ConcurrencyLeaseHolder,
8 ConcurrencyLimitLeaseMetadata,
9)
10from prefect.server.concurrency.lease_storage import ( 1a
11 ConcurrencyLeaseStorage as _ConcurrencyLeaseStorage,
12)
13from prefect.server.utilities.leasing import ResourceLease 1a
16class ConcurrencyLeaseStorage(_ConcurrencyLeaseStorage): 1a
17 """
18 A singleton concurrency lease storage implementation that stores leases in memory.
19 """
21 _instance: "ConcurrencyLeaseStorage | None" = None 1a
22 _initialized: bool = False 1a
24 def __new__(cls) -> "ConcurrencyLeaseStorage": 1a
25 if cls._instance is None: 1abdec
26 cls._instance = super().__new__(cls) 1a
27 return cls._instance 1abdec
29 def __init__(self): 1a
30 if self.__class__._initialized: 1abdec
31 return 1abdec
33 self.leases: dict[UUID, ResourceLease[ConcurrencyLimitLeaseMetadata]] = {} 1a
34 self.expirations: dict[UUID, datetime] = {} 1a
35 self.__class__._initialized = True 1a
37 async def create_lease( 1a
38 self,
39 resource_ids: list[UUID],
40 ttl: timedelta,
41 metadata: ConcurrencyLimitLeaseMetadata | None = None,
42 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata]:
43 expiration = datetime.now(timezone.utc) + ttl 1bc
44 lease = ResourceLease( 1bc
45 resource_ids=resource_ids, metadata=metadata, expiration=expiration
46 )
47 self.leases[lease.id] = lease 1bc
48 self.expirations[lease.id] = expiration 1bc
49 return lease 1bc
51 async def read_lease( 1a
52 self, lease_id: UUID
53 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata] | None:
54 return self.leases.get(lease_id) 1bc
56 async def renew_lease(self, lease_id: UUID, ttl: timedelta) -> bool: 1a
57 """
58 Atomically renew a concurrency lease by updating its expiration.
60 Checks if the lease exists before updating the expiration index,
61 preventing orphaned index entries.
63 Args:
64 lease_id: The ID of the lease to renew
65 ttl: The new time-to-live duration
67 Returns:
68 True if the lease was renewed, False if it didn't exist
69 """
70 if lease_id not in self.leases: 70 ↛ 75line 70 didn't jump to line 75 because the condition on line 70 was always true1b
71 # Clean up any orphaned expiration entry
72 self.expirations.pop(lease_id, None) 1b
73 return False 1b
75 self.expirations[lease_id] = datetime.now(timezone.utc) + ttl
76 return True
78 async def revoke_lease(self, lease_id: UUID) -> None: 1a
79 self.leases.pop(lease_id, None) 1bc
80 self.expirations.pop(lease_id, None) 1bc
82 async def read_active_lease_ids( 1a
83 self, limit: int = 100, offset: int = 0
84 ) -> list[UUID]:
85 now = datetime.now(timezone.utc) 1bc
86 active_leases = [ 1bc
87 lease_id
88 for lease_id, expiration in self.expirations.items()
89 if expiration > now
90 ]
91 return active_leases[offset : offset + limit] 1bc
93 async def read_expired_lease_ids(self, limit: int = 100) -> list[UUID]: 1a
94 now = datetime.now(timezone.utc) 1abc
95 expired_leases = [ 1abc
96 lease_id
97 for lease_id, expiration in self.expirations.items()
98 if expiration < now
99 ]
100 return expired_leases[:limit] 1abc
102 async def list_holders_for_limit( 1a
103 self, limit_id: UUID
104 ) -> list[tuple[UUID, ConcurrencyLeaseHolder]]:
105 """List all holders for a given concurrency limit."""
106 now = datetime.now(timezone.utc) 1bdec
107 holders_with_leases: list[tuple[UUID, ConcurrencyLeaseHolder]] = [] 1bdec
109 for lease_id, lease in self.leases.items(): 1bdec
110 # Check if lease is active and for the specified limit
111 if (
112 limit_id in lease.resource_ids
113 and self.expirations.get(lease_id, now) > now
114 and lease.metadata
115 and lease.metadata.holder
116 ):
117 holders_with_leases.append((lease.id, lease.metadata.holder))
119 return holders_with_leases 1bdec