Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/utilities/leasing.py: 80%

25 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3from dataclasses import dataclass, field 1a

4from datetime import datetime, timedelta, timezone 1a

5from functools import partial 1a

6from typing import Generic, Protocol, TypeVar 1a

7from uuid import UUID, uuid4 1a

8 

9T = TypeVar("T") 1a

10 

11 

12@dataclass 1a

13class ResourceLease(Generic[T]): 1a

14 resource_ids: list[UUID] 1a

15 expiration: datetime 1a

16 created_at: datetime = field(default_factory=partial(datetime.now, timezone.utc)) 1a

17 id: UUID = field(default_factory=uuid4) 1a

18 metadata: T | None = None 1a

19 

20 

21class LeaseStorage(Protocol[T]): 1a

22 async def create_lease( 1a

23 self, resource_ids: list[UUID], ttl: timedelta, metadata: T | None = None 

24 ) -> ResourceLease[T]: 

25 """ 

26 Create a new resource lease. 

27 

28 Args: 

29 resource_ids: The IDs of the resources that the lease is associated with. 

30 ttl: How long the lease should initially be held for. 

31 metadata: Additional metadata associated with the lease. 

32 

33 Returns: 

34 A ResourceLease object representing the lease. 

35 """ 

36 ... 

37 

38 async def read_lease(self, lease_id: UUID) -> ResourceLease[T] | None: 1a

39 """ 

40 Read a resource lease. 

41 

42 Args: 

43 lease_id: The ID of the lease to read. 

44 

45 Returns: 

46 A ResourceLease object representing the lease, or None if not found. 

47 """ 

48 ... 

49 

50 async def renew_lease(self, lease_id: UUID, ttl: timedelta) -> bool | None: 1a

51 """ 

52 Renew a resource lease. 

53 

54 Args: 

55 lease_id: The ID of the lease to renew. 

56 ttl: The new amount of time the lease should be held for. 

57 

58 Returns: 

59 True if the lease was successfully renewed, False if the lease 

60 does not exist or has already expired. None may be returned by 

61 legacy implementations for backwards compatibility (treated as success). 

62 """ 

63 ... 

64 

65 async def revoke_lease(self, lease_id: UUID) -> None: 1a

66 """ 

67 Release a resource lease by removing it from list of active leases. 

68 

69 Args: 

70 lease_id: The ID of the lease to release. 

71 """ 

72 ... 

73 

74 async def read_expired_lease_ids(self, limit: int = 100) -> list[UUID]: 1a

75 """ 

76 Read the IDs of expired leases. 

77 

78 Args: 

79 limit: The maximum number of expired leases to read. 

80 

81 Returns: 

82 A list of UUIDs representing the expired leases. 

83 """ 

84 ...