Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/messaging.py: 89%
38 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3from collections.abc import Iterable 1a
4from typing import TYPE_CHECKING, Any, Mapping 1a
6from typing_extensions import Self 1a
8from prefect.logging import get_logger 1a
9from prefect.server.events.schemas.events import ReceivedEvent 1a
10from prefect.server.utilities.messaging import Publisher, create_publisher 1a
11from prefect.settings import PREFECT_EVENTS_MAXIMUM_SIZE_BYTES 1a
13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a
14 import logging
16logger: "logging.Logger" = get_logger(__name__) 1a
19async def publish(events: Iterable[ReceivedEvent]) -> None: 1a
20 """Send the given events as a batch via the default publisher"""
21 async with create_event_publisher() as publisher: 1dbc
22 for event in events: 1dbc
23 await publisher.publish_event(event) 1bc
26class EventPublisher(Publisher): 1a
27 _publisher: Publisher 1a
29 def __init__(self, publisher: Publisher | None = None): 1a
30 self._publisher = publisher or create_publisher( 1debfgc
31 topic="events", deduplicate_by="id"
32 )
34 async def __aenter__(self) -> Self: 1a
35 await self._publisher.__aenter__() 1debfgc
36 return self 1debfgc
38 async def __aexit__(self, *args: Any) -> None: 1a
39 await self._publisher.__aexit__(*args) 1debfgc
41 async def publish_data(self, data: bytes, attributes: Mapping[str, str]) -> None: 1a
42 await self._publisher.publish_data(data, attributes) 1debfgc
44 async def publish_event(self, event: ReceivedEvent) -> None: 1a
45 """
46 Publishes the given events
48 Args:
49 event: the event to publish
50 """
51 encoded = event.model_dump_json().encode() 1debfgc
52 if len(encoded) > PREFECT_EVENTS_MAXIMUM_SIZE_BYTES.value(): 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true1debfgc
53 logger.warning(
54 "Refusing to publish event of size %s",
55 extra={
56 "event_id": str(event.id),
57 "event": event.event[:100],
58 "length": len(encoded),
59 },
60 )
61 return
63 logger.debug( 1debfgc
64 "Publishing event: %s with id: %s for resource: %s",
65 event.event,
66 event.id,
67 event.resource.get("prefect.resource.id"),
68 )
69 await self.publish_data( 1debfgc
70 encoded,
71 {
72 "id": str(event.id),
73 "event": event.event,
74 },
75 )
78def create_event_publisher() -> EventPublisher: 1a
79 publisher = create_publisher(topic="events", deduplicate_by="id") 1debfgc
80 return EventPublisher(publisher=publisher) 1debfgc
83def create_actions_publisher() -> Publisher: 1a
84 return create_publisher(topic="actions", deduplicate_by=None) 1bc