Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/services/event_logger.py: 47%
45 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3import asyncio 1a
4from typing import TYPE_CHECKING, NoReturn 1a
6import rich 1a
8from prefect.logging import get_logger 1a
9from prefect.server.events.schemas.events import ReceivedEvent 1a
10from prefect.server.services.base import RunInEphemeralServers, Service 1a
11from prefect.server.utilities.messaging import Consumer, Message, create_consumer 1a
12from prefect.server.utilities.messaging._consumer_names import ( 1a
13 generate_unique_consumer_name,
14)
15from prefect.settings.context import get_current_settings 1a
16from prefect.settings.models.server.services import ServicesBaseSetting 1a
17from prefect.types._datetime import now 1a
19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a
20 import logging
22logger: "logging.Logger" = get_logger(__name__) 1a
25class EventLogger(RunInEphemeralServers, Service): 1a
26 """A debugging service that logs events to the console as they arrive."""
28 consumer_task: asyncio.Task[None] | None = None 1a
30 @classmethod 1a
31 def service_settings(cls) -> ServicesBaseSetting: 1a
32 return get_current_settings().server.services.event_logger 1a
34 async def start(self) -> NoReturn: 1a
35 assert self.consumer_task is None, "Logger already started"
36 self.consumer: Consumer = create_consumer(
37 "events",
38 group="event-logger",
39 name=generate_unique_consumer_name("event-logger"),
40 )
42 console = rich.console.Console()
44 async def handler(message: Message):
45 right_now = now("UTC")
46 event: ReceivedEvent = ReceivedEvent.model_validate_json(message.data)
48 console.print(
49 "Event:",
50 str(event.id).partition("-")[0],
51 f"{event.occurred.isoformat()}",
52 f" ({(event.occurred - right_now).total_seconds():>6,.2f})",
53 f"\\[[bold green]{event.event}[/]]",
54 event.resource.id,
55 )
56 console.file.flush()
58 self.consumer_task = asyncio.create_task(self.consumer.run(handler))
59 logger.debug("Event logger started")
61 try:
62 await self.consumer_task
63 except asyncio.CancelledError:
64 pass
66 async def stop(self) -> None: 1a
67 assert self.consumer_task is not None, "Logger not started"
68 self.consumer_task.cancel()
69 try:
70 await self.consumer_task
71 except asyncio.CancelledError:
72 pass
73 finally:
74 await self.consumer.cleanup()
75 self.consumer_task = None
76 logger.debug("Event logger stopped")