Coverage for /usr/local/lib/python3.12/site-packages/prefect/concurrency/v1/_events.py: 0%
17 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from typing import Literal, Optional, Union
2from uuid import UUID
4from prefect.client.schemas.responses import MinimalConcurrencyLimitResponse
5from prefect.events import Event, RelatedResource, emit_event
8def emit_concurrency_event(
9 phase: Union[Literal["acquired"], Literal["released"]],
10 primary_limit: MinimalConcurrencyLimitResponse,
11 related_limits: list[MinimalConcurrencyLimitResponse],
12 task_run_id: UUID,
13 follows: Union[Event, None] = None,
14) -> Union[Event, None]:
15 resource: dict[str, str] = {
16 "prefect.resource.id": f"prefect.concurrency-limit.v1.{primary_limit.id}",
17 "prefect.resource.name": primary_limit.name,
18 "limit": str(primary_limit.limit),
19 "task_run_id": str(task_run_id),
20 }
22 related = [
23 RelatedResource.model_validate(
24 {
25 "prefect.resource.id": f"prefect.concurrency-limit.v1.{limit.id}",
26 "prefect.resource.role": "concurrency-limit",
27 }
28 )
29 for limit in related_limits
30 if limit.id != primary_limit.id
31 ]
33 return emit_event(
34 f"prefect.concurrency-limit.v1.{phase}",
35 resource=resource,
36 related=related,
37 follows=follows,
38 )
41def emit_concurrency_acquisition_events(
42 limits: list[MinimalConcurrencyLimitResponse],
43 task_run_id: UUID,
44) -> dict[UUID, Optional[Event]]:
45 events: dict[UUID, Optional[Event]] = {}
46 for limit in limits:
47 event = emit_concurrency_event("acquired", limit, limits, task_run_id)
48 events[limit.id] = event
50 return events
53def emit_concurrency_release_events(
54 limits: list[MinimalConcurrencyLimitResponse],
55 events: dict[UUID, Optional[Event]],
56 task_run_id: UUID,
57) -> None:
58 for limit in limits:
59 emit_concurrency_event("released", limit, limits, task_run_id, events[limit.id])