Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/logs/messaging.py: 53%
22 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1"""
2Log messaging for streaming logs through the messaging system.
3"""
5from __future__ import annotations 1a
7from contextlib import asynccontextmanager 1a
8from typing import TYPE_CHECKING, AsyncGenerator 1a
10from prefect.logging import get_logger 1a
11from prefect.server.schemas.core import Log 1a
12from prefect.server.utilities import messaging 1a
13from prefect.settings.context import get_current_settings 1a
15if TYPE_CHECKING: 15 ↛ 16line 15 didn't jump to line 16 because the condition on line 15 was never true1a
16 import logging
18logger: "logging.Logger" = get_logger(__name__) 1a
21@asynccontextmanager 1a
22async def create_log_publisher() -> AsyncGenerator[messaging.Publisher, None]: 1a
23 """
24 Creates a publisher for sending logs to the messaging system.
26 Returns:
27 A messaging publisher configured for the "logs" topic
28 """
29 async with messaging.create_publisher(topic="logs") as publisher:
30 yield publisher
33async def publish_logs(logs: list[Log]) -> None: 1a
34 """
35 Publishes logs to the messaging system.
37 Args:
38 logs: The logs to publish
39 """
40 if not get_current_settings().server.logs.stream_publishing_enabled: 40 ↛ 43line 40 didn't jump to line 43 because the condition on line 40 was always true1b
41 return 1b
43 if not logs:
44 return
46 async with create_log_publisher() as publisher:
47 for log in logs:
48 await publisher.publish_data(
49 data=log.model_dump_json().encode(),
50 attributes={"log_id": str(log.id)} if log.id else {},
51 )