Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/services/event_logger.py: 47%

45 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import asyncio 1a

4from typing import TYPE_CHECKING, NoReturn 1a

5 

6import rich 1a

7 

8from prefect.logging import get_logger 1a

9from prefect.server.events.schemas.events import ReceivedEvent 1a

10from prefect.server.services.base import RunInEphemeralServers, Service 1a

11from prefect.server.utilities.messaging import Consumer, Message, create_consumer 1a

12from prefect.server.utilities.messaging._consumer_names import ( 1a

13 generate_unique_consumer_name, 

14) 

15from prefect.settings.context import get_current_settings 1a

16from prefect.settings.models.server.services import ServicesBaseSetting 1a

17from prefect.types._datetime import now 1a

18 

19if TYPE_CHECKING: 19 ↛ 20line 19 didn't jump to line 20 because the condition on line 19 was never true1a

20 import logging 

21 

22logger: "logging.Logger" = get_logger(__name__) 1a

23 

24 

25class EventLogger(RunInEphemeralServers, Service): 1a

26 """A debugging service that logs events to the console as they arrive.""" 

27 

28 consumer_task: asyncio.Task[None] | None = None 1a

29 

30 @classmethod 1a

31 def service_settings(cls) -> ServicesBaseSetting: 1a

32 return get_current_settings().server.services.event_logger 1a

33 

34 async def start(self) -> NoReturn: 1a

35 assert self.consumer_task is None, "Logger already started" 

36 self.consumer: Consumer = create_consumer( 

37 "events", 

38 group="event-logger", 

39 name=generate_unique_consumer_name("event-logger"), 

40 ) 

41 

42 console = rich.console.Console() 

43 

44 async def handler(message: Message): 

45 right_now = now("UTC") 

46 event: ReceivedEvent = ReceivedEvent.model_validate_json(message.data) 

47 

48 console.print( 

49 "Event:", 

50 str(event.id).partition("-")[0], 

51 f"{event.occurred.isoformat()}", 

52 f" ({(event.occurred - right_now).total_seconds():>6,.2f})", 

53 f"\\[[bold green]{event.event}[/]]", 

54 event.resource.id, 

55 ) 

56 console.file.flush() 

57 

58 self.consumer_task = asyncio.create_task(self.consumer.run(handler)) 

59 logger.debug("Event logger started") 

60 

61 try: 

62 await self.consumer_task 

63 except asyncio.CancelledError: 

64 pass 

65 

66 async def stop(self) -> None: 1a

67 assert self.consumer_task is not None, "Logger not started" 

68 self.consumer_task.cancel() 

69 try: 

70 await self.consumer_task 

71 except asyncio.CancelledError: 

72 pass 

73 finally: 

74 await self.consumer.cleanup() 

75 self.consumer_task = None 

76 logger.debug("Event logger stopped")