Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/concurrency/lease_storage/__init__.py: 79%
31 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3from datetime import timedelta 1a
4import importlib 1a
5from typing import ClassVar, Optional, Protocol, runtime_checkable 1a
6from uuid import UUID 1a
8from pydantic import BaseModel, ConfigDict 1a
10from prefect.server.utilities.leasing import LeaseStorage, ResourceLease 1a
11from prefect.settings.context import get_current_settings 1a
13from prefect.types._concurrency import ConcurrencyLeaseHolder 1a
16@runtime_checkable 1a
17class ConcurrencyLeaseStorageModule(Protocol): 1a
18 ConcurrencyLeaseStorage: type[ConcurrencyLeaseStorage] 1a
21class ConcurrencyLimitLeaseMetadata(BaseModel): 1a
22 """Model for validating concurrency limit lease metadata."""
24 model_config: ClassVar[ConfigDict] = ConfigDict(extra="forbid") 1a
26 slots: int 1a
27 holder: Optional[ConcurrencyLeaseHolder] = None 1a
30class ConcurrencyLeaseStorage(LeaseStorage[ConcurrencyLimitLeaseMetadata]): 1a
31 async def create_lease( 31 ↛ exitline 31 didn't return from function 'create_lease' because 1a
32 self,
33 resource_ids: list[UUID],
34 ttl: timedelta,
35 metadata: ConcurrencyLimitLeaseMetadata | None = None,
36 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata]: ...
38 async def read_lease( 38 ↛ exitline 38 didn't return from function 'read_lease' because 1a
39 self, lease_id: UUID
40 ) -> ResourceLease[ConcurrencyLimitLeaseMetadata] | None: ...
42 async def renew_lease(self, lease_id: UUID, ttl: timedelta) -> bool | None: 1a
43 """
44 Renew a resource lease.
46 Args:
47 lease_id: The ID of the lease to renew.
48 ttl: The new amount of time the lease should be held for.
50 Returns:
51 True if the lease was successfully renewed, False if the lease
52 does not exist or has already expired. None may be returned by
53 legacy implementations for backwards compatibility (treated as success).
54 """
55 ...
57 async def revoke_lease(self, lease_id: UUID) -> None: ... 57 ↛ exitline 57 didn't return from function 'revoke_lease' because 1a
59 async def read_active_lease_ids( 59 ↛ exitline 59 didn't return from function 'read_active_lease_ids' because 1a
60 self, limit: int = 100, offset: int = 0
61 ) -> list[UUID]: ...
63 async def read_expired_lease_ids(self, limit: int = 100) -> list[UUID]: ... 63 ↛ exitline 63 didn't return from function 'read_expired_lease_ids' because 1a
65 async def list_holders_for_limit( 1a
66 self, limit_id: UUID
67 ) -> list[tuple[UUID, ConcurrencyLeaseHolder]]:
68 """
69 List all holders for a given concurrency limit.
71 Args:
72 limit_id: The ID of the concurrency limit to list holders for.
74 Returns:
75 A list of tuples containing the lease ID and ConcurrencyLeaseHolder objects representing active holders.
76 """
77 ...
80def get_concurrency_lease_storage() -> ConcurrencyLeaseStorage: 1a
81 """
82 Returns a ConcurrencyLeaseStorage instance based on the configured lease storage module.
84 Will raise a ValueError if the configured module does not pass a type check.
85 """
86 concurrency_lease_storage_module = importlib.import_module( 1bcdef
87 get_current_settings().server.concurrency.lease_storage
88 )
89 if not isinstance(concurrency_lease_storage_module, ConcurrencyLeaseStorageModule): 89 ↛ 90line 89 didn't jump to line 90 because the condition on line 89 was never true1bcdef
90 raise ValueError(
91 f"The module {get_current_settings().server.concurrency.lease_storage} does not contain a ConcurrencyLeaseStorage class"
92 )
93 return concurrency_lease_storage_module.ConcurrencyLeaseStorage() 1bcdef