Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/context.py: 53%
15 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from contextvars import ContextVar 1a
2from typing import Any, ClassVar 1a
3from uuid import UUID 1a
5from typing_extensions import Self 1a
7from prefect.client.orchestration import get_client 1a
8from prefect.context import ContextModel, Field 1a
11class ConcurrencyContext(ContextModel): 1a
12 __var__: ClassVar[ContextVar[Self]] = ContextVar("concurrency_v1") 1a
14 # Track the limits that have been acquired but were not able to be released
15 # due to cancellation or some other error. These limits are released when
16 # the context manager exits.
17 cleanup_slots: list[tuple[list[str], float, UUID]] = Field(default_factory=list) 1a
19 def __exit__(self, *exc_info: Any) -> None: 1a
20 if self.cleanup_slots:
21 with get_client(sync_client=True) as client:
22 for names, occupancy_seconds, task_run_id in self.cleanup_slots:
23 client.decrement_v1_concurrency_slots(
24 names=names,
25 occupancy_seconds=occupancy_seconds,
26 task_run_id=task_run_id,
27 )
29 return super().__exit__(*exc_info)