Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/messaging.py: 89%

38 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3from collections.abc import Iterable 1a

4from typing import TYPE_CHECKING, Any, Mapping 1a

5 

6from typing_extensions import Self 1a

7 

8from prefect.logging import get_logger 1a

9from prefect.server.events.schemas.events import ReceivedEvent 1a

10from prefect.server.utilities.messaging import Publisher, create_publisher 1a

11from prefect.settings import PREFECT_EVENTS_MAXIMUM_SIZE_BYTES 1a

12 

13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a

14 import logging 

15 

16logger: "logging.Logger" = get_logger(__name__) 1a

17 

18 

19async def publish(events: Iterable[ReceivedEvent]) -> None: 1a

20 """Send the given events as a batch via the default publisher""" 

21 async with create_event_publisher() as publisher: 1dbc

22 for event in events: 1dbc

23 await publisher.publish_event(event) 1bc

24 

25 

26class EventPublisher(Publisher): 1a

27 _publisher: Publisher 1a

28 

29 def __init__(self, publisher: Publisher | None = None): 1a

30 self._publisher = publisher or create_publisher( 1debfgc

31 topic="events", deduplicate_by="id" 

32 ) 

33 

34 async def __aenter__(self) -> Self: 1a

35 await self._publisher.__aenter__() 1debfgc

36 return self 1debfgc

37 

38 async def __aexit__(self, *args: Any) -> None: 1a

39 await self._publisher.__aexit__(*args) 1debfgc

40 

41 async def publish_data(self, data: bytes, attributes: Mapping[str, str]) -> None: 1a

42 await self._publisher.publish_data(data, attributes) 1debfgc

43 

44 async def publish_event(self, event: ReceivedEvent) -> None: 1a

45 """ 

46 Publishes the given events 

47 

48 Args: 

49 event: the event to publish 

50 """ 

51 encoded = event.model_dump_json().encode() 1debfgc

52 if len(encoded) > PREFECT_EVENTS_MAXIMUM_SIZE_BYTES.value(): 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true1debfgc

53 logger.warning( 

54 "Refusing to publish event of size %s", 

55 extra={ 

56 "event_id": str(event.id), 

57 "event": event.event[:100], 

58 "length": len(encoded), 

59 }, 

60 ) 

61 return 

62 

63 logger.debug( 1debfgc

64 "Publishing event: %s with id: %s for resource: %s", 

65 event.event, 

66 event.id, 

67 event.resource.get("prefect.resource.id"), 

68 ) 

69 await self.publish_data( 1debfgc

70 encoded, 

71 { 

72 "id": str(event.id), 

73 "event": event.event, 

74 }, 

75 ) 

76 

77 

78def create_event_publisher() -> EventPublisher: 1a

79 publisher = create_publisher(topic="events", deduplicate_by="id") 1debfgc

80 return EventPublisher(publisher=publisher) 1debfgc

81 

82 

83def create_actions_publisher() -> Publisher: 1a

84 return create_publisher(topic="actions", deduplicate_by=None) 1bc