Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/context.py: 53%

15 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from contextvars import ContextVar 1a

2from typing import Any, ClassVar 1a

3from uuid import UUID 1a

4 

5from typing_extensions import Self 1a

6 

7from prefect.client.orchestration import get_client 1a

8from prefect.context import ContextModel, Field 1a

9 

10 

11class ConcurrencyContext(ContextModel): 1a

12 __var__: ClassVar[ContextVar[Self]] = ContextVar("concurrency") 1a

13 

14 # Track the leases that have been acquired but were not able to be released 

15 # due to cancellation or some other error. These leases are revoked when 

16 # the context manager exits. 

17 cleanup_lease_ids: list[UUID] = Field(default_factory=lambda: []) 1a

18 

19 def __exit__(self, *exc_info: Any) -> None: 1a

20 if self.cleanup_lease_ids: 

21 with get_client(sync_client=True) as client: 

22 for lease_id in self.cleanup_lease_ids: 

23 client.release_concurrency_slots_with_lease(lease_id=lease_id) 

24 

25 return super().__exit__(*exc_info)