Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/pipeline.py: 36%
24 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from prefect.server.events.schemas.events import Event, ReceivedEvent 1a
2from prefect.server.events.services import event_persister 1a
3from prefect.server.services import task_run_recorder 1a
4from prefect.server.utilities.messaging.memory import MemoryMessage 1a
7class EventsPipeline: 1a
8 @staticmethod 1a
9 def events_to_messages(events: list[Event]) -> list[MemoryMessage]: 1a
10 messages: list[MemoryMessage] = []
11 for event in events:
12 received_event = ReceivedEvent(**event.model_dump())
13 message = MemoryMessage(
14 data=received_event.model_dump_json().encode(),
15 attributes={"id": str(event.id), "event": event.event},
16 )
17 messages.append(message)
18 return messages
20 async def process_events(self, events: list[Event]) -> None: 1a
21 messages = self.events_to_messages(events)
22 await self.process_messages(messages)
24 async def process_messages(self, messages: list[MemoryMessage]) -> None: 1a
25 for message in messages:
26 await self.process_message(message)
28 async def process_message(self, message: MemoryMessage) -> None: 1a
29 """Process a single event message"""
31 # TODO: Investigate if we want to include triggers/actions etc.
32 async with task_run_recorder.consumer() as handler:
33 await handler(message)
35 async with event_persister.create_handler(batch_size=1) as handler:
36 await handler(message)