Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/_events.py: 0%

17 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from typing import Literal, Optional, Union 

2from uuid import UUID 

3 

4from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse 

5from prefect.events import Event, RelatedResource, emit_event 

6 

7 

8def emit_concurrency_event( 

9 phase: Union[Literal["acquired"], Literal["released"]], 

10 primary_limit: MinimalConcurrencyLimitResponse, 

11 related_limits: list[MinimalConcurrencyLimitResponse], 

12 task_run_id: UUID, 

13 follows: Union[Event, None] = None, 

14) -> Union[Event, None]: 

15 resource: dict[str, str] = { 

16 "prefect.resource.id": f"prefect.concurrency-limit.v1.{primary_limit.id}", 

17 "prefect.resource.name": primary_limit.name, 

18 "limit": str(primary_limit.limit), 

19 "task_run_id": str(task_run_id), 

20 } 

21 

22 related = [ 

23 RelatedResource.model_validate( 

24 { 

25 "prefect.resource.id": f"prefect.concurrency-limit.v1.{limit.id}", 

26 "prefect.resource.role": "concurrency-limit", 

27 } 

28 ) 

29 for limit in related_limits 

30 if limit.id != primary_limit.id 

31 ] 

32 

33 return emit_event( 

34 f"prefect.concurrency-limit.v1.{phase}", 

35 resource=resource, 

36 related=related, 

37 follows=follows, 

38 ) 

39 

40 

41def emit_concurrency_acquisition_events( 

42 limits: list[MinimalConcurrencyLimitResponse], 

43 task_run_id: UUID, 

44) -> dict[UUID, Optional[Event]]: 

45 events: dict[UUID, Optional[Event]] = {} 

46 for limit in limits: 

47 event = emit_concurrency_event("acquired", limit, limits, task_run_id) 

48 events[limit.id] = event 

49 

50 return events 

51 

52 

53def emit_concurrency_release_events( 

54 limits: list[MinimalConcurrencyLimitResponse], 

55 events: dict[UUID, Optional[Event]], 

56 task_run_id: UUID, 

57) -> None: 

58 for limit in limits: 

59 emit_concurrency_event("released", limit, limits, task_run_id, events[limit.id])