Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/utilities.py: 25%
41 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3import datetime 1a
4from datetime import timedelta 1a
5from typing import TYPE_CHECKING, Any 1a
6from uuid import UUID 1a
8import prefect.types._datetime 1a
9from prefect.logging.loggers import get_logger 1a
11from .clients import ( 1a
12 AssertingEventsClient,
13 AssertingPassthroughEventsClient,
14 PrefectCloudEventsClient,
15 PrefectEventsClient,
16)
17from .schemas.events import Event, RelatedResource 1a
18from .worker import EventsWorker, should_emit_events 1a
20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true1a
21 import logging
23TIGHT_TIMING = timedelta(minutes=5) 1a
25logger: "logging.Logger" = get_logger(__name__) 1a
28def emit_event( 1a
29 event: str,
30 resource: dict[str, str],
31 occurred: datetime.datetime | None = None,
32 related: list[dict[str, str]] | list[RelatedResource] | None = None,
33 payload: dict[str, Any] | None = None,
34 id: UUID | None = None,
35 follows: Event | None = None,
36 **kwargs: dict[str, Any] | None,
37) -> Event | None:
38 """
39 Send an event to Prefect.
41 Args:
42 event: The name of the event that happened.
43 resource: The primary Resource this event concerns.
44 occurred: When the event happened from the sender's perspective.
45 Defaults to the current datetime.
46 related: A list of additional Resources involved in this event.
47 payload: An open-ended set of data describing what happened.
48 id: The sender-provided identifier for this event. Defaults to a random
49 UUID.
50 follows: The event that preceded this one. If the preceding event
51 happened more than 5 minutes prior to this event the follows
52 relationship will not be set.
54 Returns:
55 The event that was emitted if worker is using a client that emit
56 events, otherwise None
57 """
58 if not should_emit_events():
59 return None
61 try:
62 operational_clients = [
63 AssertingPassthroughEventsClient,
64 AssertingEventsClient,
65 PrefectCloudEventsClient,
66 PrefectEventsClient,
67 ]
68 worker_instance = EventsWorker.instance()
70 if worker_instance.client_type not in operational_clients:
71 return None
73 event_kwargs: dict[str, Any] = {
74 "event": event,
75 "resource": resource,
76 **kwargs,
77 }
79 if occurred is None:
80 occurred = prefect.types._datetime.now("UTC")
81 event_kwargs["occurred"] = occurred
83 if related is not None:
84 event_kwargs["related"] = related
86 if payload is not None:
87 event_kwargs["payload"] = payload
89 if id is not None:
90 event_kwargs["id"] = id
92 if follows is not None:
93 if -TIGHT_TIMING < (occurred - follows.occurred) < TIGHT_TIMING:
94 event_kwargs["follows"] = follows.id
96 event_obj = Event(**event_kwargs)
97 worker_instance.send(event_obj)
99 return event_obj
100 except Exception:
101 logger.exception(f"Error emitting event: {event}")
102 return None