Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/_events.py: 33%

17 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from typing import Literal, Optional, Union 1a

2from uuid import UUID 1a

3 

4from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse 1a

5from prefect.events import Event, RelatedResource, emit_event 1a

6 

7 

8def _emit_concurrency_event( 1a

9 phase: Union[Literal["acquired"], Literal["released"]], 

10 primary_limit: MinimalConcurrencyLimitResponse, 

11 related_limits: list[MinimalConcurrencyLimitResponse], 

12 slots: int, 

13 follows: Union[Event, None] = None, 

14) -> Union[Event, None]: 

15 resource: dict[str, str] = { 

16 "prefect.resource.id": f"prefect.concurrency-limit.{primary_limit.id}", 

17 "prefect.resource.name": primary_limit.name, 

18 "slots-acquired": str(slots), 

19 "limit": str(primary_limit.limit), 

20 } 

21 

22 related = [ 

23 RelatedResource.model_validate( 

24 { 

25 "prefect.resource.id": f"prefect.concurrency-limit.{limit.id}", 

26 "prefect.resource.role": "concurrency-limit", 

27 } 

28 ) 

29 for limit in related_limits 

30 if limit.id != primary_limit.id 

31 ] 

32 

33 return emit_event( 

34 f"prefect.concurrency-limit.{phase}", 

35 resource=resource, 

36 related=related, 

37 follows=follows, 

38 ) 

39 

40 

41def emit_concurrency_acquisition_events( 1a

42 limits: list[MinimalConcurrencyLimitResponse], 

43 occupy: int, 

44) -> dict[UUID, Optional[Event]]: 

45 events: dict[UUID, Optional[Event]] = {} 

46 for limit in limits: 

47 event = _emit_concurrency_event("acquired", limit, limits, occupy) 

48 events[limit.id] = event 

49 

50 return events 

51 

52 

53def emit_concurrency_release_events( 1a

54 limits: list[MinimalConcurrencyLimitResponse], 

55 occupy: int, 

56 events: dict[UUID, Optional[Event]], 

57) -> None: 

58 for limit in limits: 

59 _emit_concurrency_event("released", limit, limits, occupy, events[limit.id])