Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/context.py: 53%

15 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from contextvars import ContextVar 1a

2from typing import Any, ClassVar 1a

3from uuid import UUID 1a

4 

5from typing_extensions import Self 1a

6 

7from prefect.client.orchestration import get_client 1a

8from prefect.context import ContextModel, Field 1a

9 

10 

11class ConcurrencyContext(ContextModel): 1a

12 __var__: ClassVar[ContextVar[Self]] = ContextVar("concurrency_v1") 1a

13 

14 # Track the limits that have been acquired but were not able to be released 

15 # due to cancellation or some other error. These limits are released when 

16 # the context manager exits. 

17 cleanup_slots: list[tuple[list[str], float, UUID]] = Field(default_factory=list) 1a

18 

19 def __exit__(self, *exc_info: Any) -> None: 1a

20 if self.cleanup_slots: 

21 with get_client(sync_client=True) as client: 

22 for names, occupancy_seconds, task_run_id in self.cleanup_slots: 

23 client.decrement_v1_concurrency_slots( 

24 names=names, 

25 occupancy_seconds=occupancy_seconds, 

26 task_run_id=task_run_id, 

27 ) 

28 

29 return super().__exit__(*exc_info)