Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/pipeline.py: 36%

24 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from prefect.server.events.schemas.events import Event, ReceivedEvent 1a

2from prefect.server.events.services import event_persister 1a

3from prefect.server.services import task_run_recorder 1a

4from prefect.server.utilities.messaging.memory import MemoryMessage 1a

5 

6 

7class EventsPipeline: 1a

8 @staticmethod 1a

9 def events_to_messages(events: list[Event]) -> list[MemoryMessage]: 1a

10 messages: list[MemoryMessage] = [] 

11 for event in events: 

12 received_event = ReceivedEvent(**event.model_dump()) 

13 message = MemoryMessage( 

14 data=received_event.model_dump_json().encode(), 

15 attributes={"id": str(event.id), "event": event.event}, 

16 ) 

17 messages.append(message) 

18 return messages 

19 

20 async def process_events(self, events: list[Event]) -> None: 1a

21 messages = self.events_to_messages(events) 

22 await self.process_messages(messages) 

23 

24 async def process_messages(self, messages: list[MemoryMessage]) -> None: 1a

25 for message in messages: 

26 await self.process_message(message) 

27 

28 async def process_message(self, message: MemoryMessage) -> None: 1a

29 """Process a single event message""" 

30 

31 # TODO: Investigate if we want to include triggers/actions etc. 

32 async with task_run_recorder.consumer() as handler: 

33 await handler(message) 

34 

35 async with event_persister.create_handler(batch_size=1) as handler: 

36 await handler(message)