Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/logs/stream.py: 24%
133 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1"""
2Log streaming for live log distribution via websockets.
3"""
5from __future__ import annotations 1a
7import asyncio 1a
8from asyncio import Queue 1a
9from contextlib import asynccontextmanager 1a
10from typing import ( 1a
11 TYPE_CHECKING,
12 AsyncGenerator,
13 AsyncIterable,
14 NoReturn,
15)
17from prefect.logging import get_logger 1a
18from prefect.server.schemas.core import Log 1a
19from prefect.server.schemas.filters import LogFilter 1a
20from prefect.server.services.base import RunInEphemeralServers, RunInWebservers, Service 1a
21from prefect.server.utilities import messaging 1a
22from prefect.settings.context import get_current_settings 1a
23from prefect.settings.models.server.services import ServicesBaseSetting 1a
25if TYPE_CHECKING: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true1a
26 import logging
28logger: "logging.Logger" = get_logger(__name__) 1a
30subscribers: set["Queue[Log]"] = set() 1a
31filters: dict["Queue[Log]", LogFilter] = {} 1a
33# The maximum number of messages that can be waiting for one subscriber, after which
34# new messages will be dropped
35SUBSCRIPTION_BACKLOG = 256 1a
38@asynccontextmanager 1a
39async def subscribed( 1a
40 filter: LogFilter,
41) -> AsyncGenerator["Queue[Log]", None]:
42 """
43 Subscribe to a stream of logs matching the given filter.
45 Args:
46 filter: The log filter to apply
48 Yields:
49 A queue that will receive matching logs
50 """
51 queue: "Queue[Log]" = Queue(maxsize=SUBSCRIPTION_BACKLOG)
53 subscribers.add(queue)
54 filters[queue] = filter
56 try:
57 yield queue
58 finally:
59 subscribers.remove(queue)
60 del filters[queue]
63@asynccontextmanager 1a
64async def logs( 1a
65 filter: LogFilter,
66) -> AsyncGenerator[AsyncIterable[Log | None], None]:
67 """
68 Create a stream of logs matching the given filter.
70 Args:
71 filter: The log filter to apply
73 Yields:
74 An async iterable of logs (or None for timeouts)
75 """
76 async with subscribed(filter) as queue:
78 async def consume() -> AsyncGenerator[Log | None, None]:
79 while True:
80 # Use a brief timeout to allow for cancellation, especially when a
81 # client disconnects. Without a timeout here, a consumer may block
82 # forever waiting for a message to be put on the queue, and never notice
83 # that their client (like a websocket) has actually disconnected.
84 try:
85 log = await asyncio.wait_for(queue.get(), timeout=1)
86 except asyncio.TimeoutError:
87 # If the queue is empty, we'll yield to the caller with a None in
88 # order to give it control over what happens next. This helps with
89 # the outbound websocket, where we want to check if the client is
90 # still connected periodically.
91 yield None
92 continue
94 yield log
96 yield consume()
99def log_matches_filter(log: Log, filter: LogFilter) -> bool: 1a
100 """
101 Check if a log matches the given filter criteria.
103 Args:
104 log: The log to check
105 filter: The filter to apply
107 Returns:
108 True if the log matches the filter, False otherwise
109 """
110 # Check level filter
111 if filter.level:
112 if filter.level.ge_ is not None and log.level < filter.level.ge_:
113 return False
114 if filter.level.le_ is not None and log.level > filter.level.le_:
115 return False
117 # Check timestamp filter
118 if filter.timestamp:
119 if (
120 filter.timestamp.before_ is not None
121 and log.timestamp > filter.timestamp.before_
122 ):
123 return False
124 if (
125 filter.timestamp.after_ is not None
126 and log.timestamp < filter.timestamp.after_
127 ):
128 return False
130 # Check flow_run_id filter
131 if filter.flow_run_id:
132 if filter.flow_run_id.any_ is not None:
133 if log.flow_run_id not in filter.flow_run_id.any_:
134 return False
136 # Check task_run_id filter
137 if filter.task_run_id:
138 if filter.task_run_id.any_ is not None:
139 if log.task_run_id not in filter.task_run_id.any_:
140 return False
141 if filter.task_run_id.is_null_ is not None:
142 is_null = log.task_run_id is None
143 if filter.task_run_id.is_null_ != is_null:
144 return False
146 return True
149@asynccontextmanager 1a
150async def distributor() -> AsyncGenerator[messaging.MessageHandler, None]: 1a
151 """
152 Create a message handler that distributes logs to subscribed clients.
154 Yields:
155 A message handler function
156 """
158 async def message_handler(message: messaging.Message):
159 assert message.data
161 try:
162 assert message.attributes
163 except Exception:
164 return
166 if subscribers:
167 try:
168 log = Log.model_validate_json(message.data)
169 except Exception as e:
170 logger.warning(f"Failed to parse log message: {e}")
171 return
173 for queue in subscribers:
174 filter = filters[queue]
175 if not log_matches_filter(log, filter):
176 continue
178 try:
179 queue.put_nowait(log)
180 except asyncio.QueueFull:
181 continue
183 yield message_handler
186_distributor_task: asyncio.Task[None] | None = None 1a
187_distributor_started: asyncio.Event | None = None 1a
190async def start_distributor() -> None: 1a
191 """Starts the distributor consumer as a global background task"""
192 global _distributor_task
193 global _distributor_started
194 if _distributor_task:
195 return
197 _distributor_started = asyncio.Event()
198 _distributor_task = asyncio.create_task(run_distributor(_distributor_started))
199 await _distributor_started.wait()
202async def stop_distributor() -> None: 1a
203 """Stops the distributor consumer global background task"""
204 global _distributor_task
205 global _distributor_started
206 if not _distributor_task:
207 return
209 task = _distributor_task
210 _distributor_task = None
211 _distributor_started = None
213 task.cancel()
214 try:
215 await asyncio.shield(task)
216 except asyncio.CancelledError:
217 pass
220class LogDistributor(RunInEphemeralServers, RunInWebservers, Service): 1a
221 """Service for distributing logs to websocket subscribers"""
223 name: str = "LogDistributor" 1a
225 @classmethod 1a
226 def service_settings(cls) -> ServicesBaseSetting: 1a
227 raise NotImplementedError("LogDistributor does not have settings")
229 @classmethod 1a
230 def environment_variable_name(cls) -> str: 1a
231 return "PREFECT_SERVER_LOGS_STREAM_OUT_ENABLED"
233 @classmethod 1a
234 def enabled(cls) -> bool: 1a
235 return get_current_settings().server.logs.stream_out_enabled 1b
237 async def start(self) -> NoReturn: 1a
238 await start_distributor()
239 try:
240 # start_distributor should have set _distributor_task
241 assert _distributor_task is not None
242 await _distributor_task
243 except asyncio.CancelledError:
244 pass
246 # This should never be reached due to the infinite loop above
247 raise RuntimeError("LogDistributor service unexpectedly terminated")
249 async def stop(self) -> None: 1a
250 await stop_distributor()
253async def run_distributor(started: asyncio.Event) -> NoReturn: 1a
254 """Runs the distributor consumer forever until it is cancelled"""
255 global _distributor_started
256 async with messaging.ephemeral_subscription(
257 topic="logs",
258 ) as create_consumer_kwargs:
259 started.set()
260 async with distributor() as handler:
261 consumer = messaging.create_consumer(**create_consumer_kwargs)
262 await consumer.run(
263 handler=handler,
264 )
266 # This should never be reached due to the infinite nature of consumer.run()
267 raise RuntimeError("Log distributor consumer unexpectedly terminated")