Coverage for /usr/local/lib/python3.12/site-packages/prefect/events/utilities.py: 25%

41 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import datetime 1a

4from datetime import timedelta 1a

5from typing import TYPE_CHECKING, Any 1a

6from uuid import UUID 1a

7 

8import prefect.types._datetime 1a

9from prefect.logging.loggers import get_logger 1a

10 

11from .clients import ( 1a

12 AssertingEventsClient, 

13 AssertingPassthroughEventsClient, 

14 PrefectCloudEventsClient, 

15 PrefectEventsClient, 

16) 

17from .schemas.events import Event, RelatedResource 1a

18from .worker import EventsWorker, should_emit_events 1a

19 

20if TYPE_CHECKING: 20 ↛ 21line 20 didn't jump to line 21 because the condition on line 20 was never true1a

21 import logging 

22 

23TIGHT_TIMING = timedelta(minutes=5) 1a

24 

25logger: "logging.Logger" = get_logger(__name__) 1a

26 

27 

28def emit_event( 1a

29 event: str, 

30 resource: dict[str, str], 

31 occurred: datetime.datetime | None = None, 

32 related: list[dict[str, str]] | list[RelatedResource] | None = None, 

33 payload: dict[str, Any] | None = None, 

34 id: UUID | None = None, 

35 follows: Event | None = None, 

36 **kwargs: dict[str, Any] | None, 

37) -> Event | None: 

38 """ 

39 Send an event to Prefect. 

40 

41 Args: 

42 event: The name of the event that happened. 

43 resource: The primary Resource this event concerns. 

44 occurred: When the event happened from the sender's perspective. 

45 Defaults to the current datetime. 

46 related: A list of additional Resources involved in this event. 

47 payload: An open-ended set of data describing what happened. 

48 id: The sender-provided identifier for this event. Defaults to a random 

49 UUID. 

50 follows: The event that preceded this one. If the preceding event 

51 happened more than 5 minutes prior to this event the follows 

52 relationship will not be set. 

53 

54 Returns: 

55 The event that was emitted if worker is using a client that emit 

56 events, otherwise None 

57 """ 

58 if not should_emit_events(): 

59 return None 

60 

61 try: 

62 operational_clients = [ 

63 AssertingPassthroughEventsClient, 

64 AssertingEventsClient, 

65 PrefectCloudEventsClient, 

66 PrefectEventsClient, 

67 ] 

68 worker_instance = EventsWorker.instance() 

69 

70 if worker_instance.client_type not in operational_clients: 

71 return None 

72 

73 event_kwargs: dict[str, Any] = { 

74 "event": event, 

75 "resource": resource, 

76 **kwargs, 

77 } 

78 

79 if occurred is None: 

80 occurred = prefect.types._datetime.now("UTC") 

81 event_kwargs["occurred"] = occurred 

82 

83 if related is not None: 

84 event_kwargs["related"] = related 

85 

86 if payload is not None: 

87 event_kwargs["payload"] = payload 

88 

89 if id is not None: 

90 event_kwargs["id"] = id 

91 

92 if follows is not None: 

93 if -TIGHT_TIMING < (occurred - follows.occurred) < TIGHT_TIMING: 

94 event_kwargs["follows"] = follows.id 

95 

96 event_obj = Event(**event_kwargs) 

97 worker_instance.send(event_obj) 

98 

99 return event_obj 

100 except Exception: 

101 logger.exception(f"Error emitting event: {event}") 

102 return None