Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/concurrency/lease_storage/memory.py: 54%
51 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3from datetime import datetime, timedelta, timezone 1a
4from uuid import UUID 1a
6from prefect.server.concurrency.lease_storage import ( 1a
7 ConcurrencyLeaseHolder,
8 ConcurrencyLimitLeaseMetadata,
9)
10from prefect.server.concurrency.lease_storage import ( 1a
11 ConcurrencyLeaseStorage as _ConcurrencyLeaseStorage,
12)
13from prefect.server.utilities.leasing import ResourceLease 1a
16class ConcurrencyLeaseStorage(_ConcurrencyLeaseStorage): 1a
17 """
18 A singleton concurrency lease storage implementation that stores leases in memory.
19 """
21 _instance: "ConcurrencyLeaseStorage | None" = None 1a
22 _initialized: bool = False 1a
24 def __new__(cls) -> "ConcurrencyLeaseStorage": 1a
25 if cls._instance is None: 1a
26 cls._instance = super().__new__(cls) 1a
27 return cls._instance 1a
29 def __init__(self): 1a
30 if self.__class__._initialized: 1a
31 return 1a
33 self.leases: dict[UUID, ResourceLease[ConcurrencyLimitLeaseMetadata]] = {} 1a
34 self.expirations: dict[UUID, datetime] = {} 1a
35 self.__class__._initialized = True 1a
37 async def create_lease( 1a
38 self,
39 resource_ids: list[UUID],
40 ttl: timedelta,
41 metadata: ConcurrencyLimitLeaseMetadata | None = None,
42 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata]:
43 expiration = datetime.now(timezone.utc) + ttl
44 lease = ResourceLease(
45 resource_ids=resource_ids, metadata=metadata, expiration=expiration
46 )
47 self.leases[lease.id] = lease
48 self.expirations[lease.id] = expiration
49 return lease
51 async def read_lease( 1a
52 self, lease_id: UUID
53 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata] | None:
54 return self.leases.get(lease_id)
56 async def renew_lease(self, lease_id: UUID, ttl: timedelta) -> bool: 1a
57 """
58 Atomically renew a concurrency lease by updating its expiration.
60 Checks if the lease exists before updating the expiration index,
61 preventing orphaned index entries.
63 Args:
64 lease_id: The ID of the lease to renew
65 ttl: The new time-to-live duration
67 Returns:
68 True if the lease was renewed, False if it didn't exist
69 """
70 if lease_id not in self.leases:
71 # Clean up any orphaned expiration entry
72 self.expirations.pop(lease_id, None)
73 return False
75 self.expirations[lease_id] = datetime.now(timezone.utc) + ttl
76 return True
78 async def revoke_lease(self, lease_id: UUID) -> None: 1a
79 self.leases.pop(lease_id, None)
80 self.expirations.pop(lease_id, None)
82 async def read_active_lease_ids( 1a
83 self, limit: int = 100, offset: int = 0
84 ) -> list[UUID]:
85 now = datetime.now(timezone.utc)
86 active_leases = [
87 lease_id
88 for lease_id, expiration in self.expirations.items()
89 if expiration > now
90 ]
91 return active_leases[offset : offset + limit]
93 async def read_expired_lease_ids(self, limit: int = 100) -> list[UUID]: 1a
94 now = datetime.now(timezone.utc) 1a
95 expired_leases = [ 1a
96 lease_id
97 for lease_id, expiration in self.expirations.items()
98 if expiration < now
99 ]
100 return expired_leases[:limit] 1a
102 async def list_holders_for_limit( 1a
103 self, limit_id: UUID
104 ) -> list[tuple[UUID, ConcurrencyLeaseHolder]]:
105 """List all holders for a given concurrency limit."""
106 now = datetime.now(timezone.utc)
107 holders_with_leases: list[tuple[UUID, ConcurrencyLeaseHolder]] = []
109 for lease_id, lease in self.leases.items():
110 # Check if lease is active and for the specified limit
111 if (
112 limit_id in lease.resource_ids
113 and self.expirations.get(lease_id, now) > now
114 and lease.metadata
115 and lease.metadata.holder
116 ):
117 holders_with_leases.append((lease.id, lease.metadata.holder))
119 return holders_with_leases