Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/logs/messaging.py: 53%

22 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1""" 

2Log messaging for streaming logs through the messaging system. 

3""" 

4 

5from __future__ import annotations 1a

6 

7from contextlib import asynccontextmanager 1a

8from typing import TYPE_CHECKING, AsyncGenerator 1a

9 

10from prefect.logging import get_logger 1a

11from prefect.server.schemas.core import Log 1a

12from prefect.server.utilities import messaging 1a

13from prefect.settings.context import get_current_settings 1a

14 

15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a

16 import logging 

17 

18logger: "logging.Logger" = get_logger(__name__) 1a

19 

20 

21@asynccontextmanager 1a

22async def create_log_publisher() -> AsyncGenerator[messaging.Publisher, None]: 1a

23 """ 

24 Creates a publisher for sending logs to the messaging system. 

25 

26 Returns: 

27 A messaging publisher configured for the "logs" topic 

28 """ 

29 async with messaging.create_publisher(topic="logs") as publisher: 

30 yield publisher 

31 

32 

33async def publish_logs(logs: list[Log]) -> None: 1a

34 """ 

35 Publishes logs to the messaging system. 

36 

37 Args: 

38 logs: The logs to publish 

39 """ 

40 if not get_current_settings().server.logs.stream_publishing_enabled: 40 ↛ 43line 40 didn't jump to line 43 because the condition on line 40 was always true1b

41 return 1b

42 

43 if not logs: 

44 return 

45 

46 async with create_log_publisher() as publisher: 

47 for log in logs: 

48 await publisher.publish_data( 

49 data=log.model_dump_json().encode(), 

50 attributes={"log_id": str(log.id)} if log.id else {}, 

51 )