Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/logs/stream.py: 24%

133 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1""" 

2Log streaming for live log distribution via websockets. 

3""" 

4 

5from __future__ import annotations 1a

6 

7import asyncio 1a

8from asyncio import Queue 1a

9from contextlib import asynccontextmanager 1a

10from typing import ( 1a

11 TYPE_CHECKING, 

12 AsyncGenerator, 

13 AsyncIterable, 

14 NoReturn, 

15) 

16 

17from prefect.logging import get_logger 1a

18from prefect.server.schemas.core import Log 1a

19from prefect.server.schemas.filters import LogFilter 1a

20from prefect.server.services.base import RunInEphemeralServers, RunInWebservers, Service 1a

21from prefect.server.utilities import messaging 1a

22from prefect.settings.context import get_current_settings 1a

23from prefect.settings.models.server.services import ServicesBaseSetting 1a

24 

25if TYPE_CHECKING: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true1a

26 import logging 

27 

28logger: "logging.Logger" = get_logger(__name__) 1a

29 

30subscribers: set["Queue[Log]"] = set() 1a

31filters: dict["Queue[Log]", LogFilter] = {} 1a

32 

33# The maximum number of messages that can be waiting for one subscriber, after which 

34# new messages will be dropped 

35SUBSCRIPTION_BACKLOG = 256 1a

36 

37 

38@asynccontextmanager 1a

39async def subscribed( 1a

40 filter: LogFilter, 

41) -> AsyncGenerator["Queue[Log]", None]: 

42 """ 

43 Subscribe to a stream of logs matching the given filter. 

44 

45 Args: 

46 filter: The log filter to apply 

47 

48 Yields: 

49 A queue that will receive matching logs 

50 """ 

51 queue: "Queue[Log]" = Queue(maxsize=SUBSCRIPTION_BACKLOG) 

52 

53 subscribers.add(queue) 

54 filters[queue] = filter 

55 

56 try: 

57 yield queue 

58 finally: 

59 subscribers.remove(queue) 

60 del filters[queue] 

61 

62 

63@asynccontextmanager 1a

64async def logs( 1a

65 filter: LogFilter, 

66) -> AsyncGenerator[AsyncIterable[Log | None], None]: 

67 """ 

68 Create a stream of logs matching the given filter. 

69 

70 Args: 

71 filter: The log filter to apply 

72 

73 Yields: 

74 An async iterable of logs (or None for timeouts) 

75 """ 

76 async with subscribed(filter) as queue: 

77 

78 async def consume() -> AsyncGenerator[Log | None, None]: 

79 while True: 

80 # Use a brief timeout to allow for cancellation, especially when a 

81 # client disconnects. Without a timeout here, a consumer may block 

82 # forever waiting for a message to be put on the queue, and never notice 

83 # that their client (like a websocket) has actually disconnected. 

84 try: 

85 log = await asyncio.wait_for(queue.get(), timeout=1) 

86 except asyncio.TimeoutError: 

87 # If the queue is empty, we'll yield to the caller with a None in 

88 # order to give it control over what happens next. This helps with 

89 # the outbound websocket, where we want to check if the client is 

90 # still connected periodically. 

91 yield None 

92 continue 

93 

94 yield log 

95 

96 yield consume() 

97 

98 

99def log_matches_filter(log: Log, filter: LogFilter) -> bool: 1a

100 """ 

101 Check if a log matches the given filter criteria. 

102 

103 Args: 

104 log: The log to check 

105 filter: The filter to apply 

106 

107 Returns: 

108 True if the log matches the filter, False otherwise 

109 """ 

110 # Check level filter 

111 if filter.level: 

112 if filter.level.ge_ is not None and log.level < filter.level.ge_: 

113 return False 

114 if filter.level.le_ is not None and log.level > filter.level.le_: 

115 return False 

116 

117 # Check timestamp filter 

118 if filter.timestamp: 

119 if ( 

120 filter.timestamp.before_ is not None 

121 and log.timestamp > filter.timestamp.before_ 

122 ): 

123 return False 

124 if ( 

125 filter.timestamp.after_ is not None 

126 and log.timestamp < filter.timestamp.after_ 

127 ): 

128 return False 

129 

130 # Check flow_run_id filter 

131 if filter.flow_run_id: 

132 if filter.flow_run_id.any_ is not None: 

133 if log.flow_run_id not in filter.flow_run_id.any_: 

134 return False 

135 

136 # Check task_run_id filter 

137 if filter.task_run_id: 

138 if filter.task_run_id.any_ is not None: 

139 if log.task_run_id not in filter.task_run_id.any_: 

140 return False 

141 if filter.task_run_id.is_null_ is not None: 

142 is_null = log.task_run_id is None 

143 if filter.task_run_id.is_null_ != is_null: 

144 return False 

145 

146 return True 

147 

148 

149@asynccontextmanager 1a

150async def distributor() -> AsyncGenerator[messaging.MessageHandler, None]: 1a

151 """ 

152 Create a message handler that distributes logs to subscribed clients. 

153 

154 Yields: 

155 A message handler function 

156 """ 

157 

158 async def message_handler(message: messaging.Message): 

159 assert message.data 

160 

161 try: 

162 assert message.attributes 

163 except Exception: 

164 return 

165 

166 if subscribers: 

167 try: 

168 log = Log.model_validate_json(message.data) 

169 except Exception as e: 

170 logger.warning(f"Failed to parse log message: {e}") 

171 return 

172 

173 for queue in subscribers: 

174 filter = filters[queue] 

175 if not log_matches_filter(log, filter): 

176 continue 

177 

178 try: 

179 queue.put_nowait(log) 

180 except asyncio.QueueFull: 

181 continue 

182 

183 yield message_handler 

184 

185 

186_distributor_task: asyncio.Task[None] | None = None 1a

187_distributor_started: asyncio.Event | None = None 1a

188 

189 

190async def start_distributor() -> None: 1a

191 """Starts the distributor consumer as a global background task""" 

192 global _distributor_task 

193 global _distributor_started 

194 if _distributor_task: 

195 return 

196 

197 _distributor_started = asyncio.Event() 

198 _distributor_task = asyncio.create_task(run_distributor(_distributor_started)) 

199 await _distributor_started.wait() 

200 

201 

202async def stop_distributor() -> None: 1a

203 """Stops the distributor consumer global background task""" 

204 global _distributor_task 

205 global _distributor_started 

206 if not _distributor_task: 

207 return 

208 

209 task = _distributor_task 

210 _distributor_task = None 

211 _distributor_started = None 

212 

213 task.cancel() 

214 try: 

215 await asyncio.shield(task) 

216 except asyncio.CancelledError: 

217 pass 

218 

219 

220class LogDistributor(RunInEphemeralServers, RunInWebservers, Service): 1a

221 """Service for distributing logs to websocket subscribers""" 

222 

223 name: str = "LogDistributor" 1a

224 

225 @classmethod 1a

226 def service_settings(cls) -> ServicesBaseSetting: 1a

227 raise NotImplementedError("LogDistributor does not have settings") 

228 

229 @classmethod 1a

230 def environment_variable_name(cls) -> str: 1a

231 return "PREFECT_SERVER_LOGS_STREAM_OUT_ENABLED" 

232 

233 @classmethod 1a

234 def enabled(cls) -> bool: 1a

235 return get_current_settings().server.logs.stream_out_enabled 1b

236 

237 async def start(self) -> NoReturn: 1a

238 await start_distributor() 

239 try: 

240 # start_distributor should have set _distributor_task 

241 assert _distributor_task is not None 

242 await _distributor_task 

243 except asyncio.CancelledError: 

244 pass 

245 

246 # This should never be reached due to the infinite loop above 

247 raise RuntimeError("LogDistributor service unexpectedly terminated") 

248 

249 async def stop(self) -> None: 1a

250 await stop_distributor() 

251 

252 

253async def run_distributor(started: asyncio.Event) -> NoReturn: 1a

254 """Runs the distributor consumer forever until it is cancelled""" 

255 global _distributor_started 

256 async with messaging.ephemeral_subscription( 

257 topic="logs", 

258 ) as create_consumer_kwargs: 

259 started.set() 

260 async with distributor() as handler: 

261 consumer = messaging.create_consumer(**create_consumer_kwargs) 

262 await consumer.run( 

263 handler=handler, 

264 ) 

265 

266 # This should never be reached due to the infinite nature of consumer.run() 

267 raise RuntimeError("Log distributor consumer unexpectedly terminated")