Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/context.py: 53%
15 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from contextvars import ContextVar 1a
2from typing import Any, ClassVar 1a
3from uuid import UUID 1a
5from typing_extensions import Self 1a
7from prefect.client.orchestration import get_client 1a
8from prefect.context import ContextModel, Field 1a
11class ConcurrencyContext(ContextModel): 1a
12 __var__: ClassVar[ContextVar[Self]] = ContextVar("concurrency") 1a
14 # Track the leases that have been acquired but were not able to be released
15 # due to cancellation or some other error. These leases are revoked when
16 # the context manager exits.
17 cleanup_lease_ids: list[UUID] = Field(default_factory=lambda: []) 1a
19 def __exit__(self, *exc_info: Any) -> None: 1a
20 if self.cleanup_lease_ids:
21 with get_client(sync_client=True) as client:
22 for lease_id in self.cleanup_lease_ids:
23 client.release_concurrency_slots_with_lease(lease_id=lease_id)
25 return super().__exit__(*exc_info)