Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/concurrency/lease_storage/__init__.py: 79%

31 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3from datetime import timedelta 1a

4import importlib 1a

5from typing import ClassVar, Optional, Protocol, runtime_checkable 1a

6from uuid import UUID 1a

7 

8from pydantic import BaseModel, ConfigDict 1a

9 

10from prefect.server.utilities.leasing import LeaseStorage, ResourceLease 1a

11from prefect.settings.context import get_current_settings 1a

12 

13from prefect.types._concurrency import ConcurrencyLeaseHolder 1a

14 

15 

16@runtime_checkable 1a

17class ConcurrencyLeaseStorageModule(Protocol): 1a

18 ConcurrencyLeaseStorage: type[ConcurrencyLeaseStorage] 1a

19 

20 

21class ConcurrencyLimitLeaseMetadata(BaseModel): 1a

22 """Model for validating concurrency limit lease metadata.""" 

23 

24 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a

25 

26 slots: int 1a

27 holder: Optional[ConcurrencyLeaseHolder] = None 1a

28 

29 

30class ConcurrencyLeaseStorage(LeaseStorage[ConcurrencyLimitLeaseMetadata]): 1a

31 async def create_lease( 31 ↛ exitline 31 didn't return from function 'create_lease' because 1a

32 self, 

33 resource_ids: list[UUID], 

34 ttl: timedelta, 

35 metadata: ConcurrencyLimitLeaseMetadata | None = None, 

36 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata]: ... 

37 

38 async def read_lease( 38 ↛ exitline 38 didn't return from function 'read_lease' because 1a

39 self, lease_id: UUID 

40 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata] | None: ... 

41 

42 async def renew_lease(self, lease_id: UUID, ttl: timedelta) -> bool | None: 1a

43 """ 

44 Renew a resource lease. 

45 

46 Args: 

47 lease_id: The ID of the lease to renew. 

48 ttl: The new amount of time the lease should be held for. 

49 

50 Returns: 

51 True if the lease was successfully renewed, False if the lease 

52 does not exist or has already expired. None may be returned by 

53 legacy implementations for backwards compatibility (treated as success). 

54 """ 

55 ... 

56 

57 async def revoke_lease(self, lease_id: UUID) -> None: ... 57 ↛ exitline 57 didn't return from function 'revoke_lease' because 1a

58 

59 async def read_active_lease_ids( 59 ↛ exitline 59 didn't return from function 'read_active_lease_ids' because 1a

60 self, limit: int = 100, offset: int = 0 

61 ) -> list[UUID]: ... 

62 

63 async def read_expired_lease_ids(self, limit: int = 100) -> list[UUID]: ... 63 ↛ exitline 63 didn't return from function 'read_expired_lease_ids' because 1a

64 

65 async def list_holders_for_limit( 1a

66 self, limit_id: UUID 

67 ) -> list[tuple[UUID, ConcurrencyLeaseHolder]]: 

68 """ 

69 List all holders for a given concurrency limit. 

70 

71 Args: 

72 limit_id: The ID of the concurrency limit to list holders for. 

73 

74 Returns: 

75 A list of tuples containing the lease ID and ConcurrencyLeaseHolder objects representing active holders. 

76 """ 

77 ... 

78 

79 

80def get_concurrency_lease_storage() -> ConcurrencyLeaseStorage: 1a

81 """ 

82 Returns a ConcurrencyLeaseStorage instance based on the configured lease storage module. 

83 

84 Will raise a ValueError if the configured module does not pass a type check. 

85 """ 

86 concurrency_lease_storage_module = importlib.import_module( 1b

87 get_current_settings().server.concurrency.lease_storage 

88 ) 

89 if not isinstance(concurrency_lease_storage_module, ConcurrencyLeaseStorageModule): 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true1b

90 raise ValueError( 

91 f"The module {get_current_settings().server.concurrency.lease_storage} does not contain a ConcurrencyLeaseStorage class" 

92 ) 

93 return concurrency_lease_storage_module.ConcurrencyLeaseStorage() 1b