Coverage for /usr/local/lib/python3.12/site-packages/prefect/logging/handlers.py: 33%

166 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3import inspect 1a

4import json 1a

5import logging 1a

6import sys 1a

7import time 1a

8import traceback 1a

9import uuid 1a

10import warnings 1a

11from contextlib import asynccontextmanager 1a

12from typing import TYPE_CHECKING, Any, Dict, TextIO, Type 1a

13 

14from rich.console import Console 1a

15from rich.highlighter import Highlighter, NullHighlighter 1a

16from rich.theme import Theme 1a

17from typing_extensions import Self 1a

18 

19import prefect.context 1a

20from prefect._internal.concurrency.api import create_call, from_sync 1a

21from prefect._internal.concurrency.event_loop import get_running_loop 1a

22from prefect._internal.concurrency.services import BatchedQueueService 1a

23from prefect._internal.concurrency.threads import in_global_loop 1a

24from prefect.client.orchestration import get_client 1a

25from prefect.client.schemas.actions import LogCreate 1a

26from prefect.exceptions import MissingContextError 1a

27from prefect.logging.highlighters import PrefectConsoleHighlighter 1a

28from prefect.settings import ( 1a

29 PREFECT_API_URL, 

30 PREFECT_LOGGING_COLORS, 

31 PREFECT_LOGGING_INTERNAL_LEVEL, 

32 PREFECT_LOGGING_MARKUP, 

33 PREFECT_LOGGING_TO_API_BATCH_INTERVAL, 

34 PREFECT_LOGGING_TO_API_BATCH_SIZE, 

35 PREFECT_LOGGING_TO_API_MAX_LOG_SIZE, 

36 PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW, 

37) 

38from prefect.types._datetime import from_timestamp 1a

39 

40if sys.version_info >= (3, 12): 40 ↛ 43line 40 didn't jump to line 43 because the condition on line 40 was always true1a

41 StreamHandler = logging.StreamHandler[TextIO] 1a

42else: 

43 if TYPE_CHECKING: 

44 StreamHandler = logging.StreamHandler[TextIO] 

45 else: 

46 StreamHandler = logging.StreamHandler 

47 

48if TYPE_CHECKING: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true1a

49 from prefect.client.schemas.objects import FlowRun, TaskRun 

50 

51 

52class APILogWorker(BatchedQueueService[Dict[str, Any]]): 1a

53 @property 1a

54 def max_batch_size(self) -> int: 1a

55 return max( 

56 PREFECT_LOGGING_TO_API_BATCH_SIZE.value() 

57 - PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(), 

58 PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(), 

59 ) 

60 

61 @property 1a

62 def min_interval(self) -> float | None: 1a

63 return PREFECT_LOGGING_TO_API_BATCH_INTERVAL.value() 

64 

65 async def _handle_batch(self, items: list[dict[str, Any]]): 1a

66 try: 

67 await self._client.create_logs(items) 

68 except Exception as e: 

69 # Roughly replicate the behavior of the stdlib logger error handling 

70 if logging.raiseExceptions and sys.stderr: 

71 sys.stderr.write("--- Error logging to API ---\n") 

72 if PREFECT_LOGGING_INTERNAL_LEVEL.value() == "DEBUG": 

73 traceback.print_exc(file=sys.stderr) 

74 else: 

75 # Only log the exception message in non-DEBUG mode 

76 sys.stderr.write(str(e)) 

77 

78 @asynccontextmanager 1a

79 async def _lifespan(self): 1a

80 async with get_client() as self._client: 

81 yield 

82 

83 @classmethod 1a

84 def instance(cls: Type[Self], *args: Any) -> Self: 1a

85 settings = ( 

86 PREFECT_LOGGING_TO_API_BATCH_SIZE.value(), 

87 PREFECT_API_URL.value(), 

88 PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(), 

89 ) 

90 

91 # Ensure a unique worker is retrieved per relevant logging settings 

92 return super().instance(*settings, *args) 

93 

94 def _get_size(self, item: Dict[str, Any]) -> int: 1a

95 return item.pop("__payload_size__", None) or len(json.dumps(item).encode()) 

96 

97 

98class APILogHandler(logging.Handler): 1a

99 """ 

100 A logging handler that sends logs to the Prefect API. 

101 

102 Sends log records to the `APILogWorker` which manages sending batches of logs in 

103 the background. 

104 """ 

105 

106 def flush(self) -> None: 1a

107 """ 

108 Tell the `APILogWorker` to send any currently enqueued logs and block until 

109 completion. 

110 

111 Use `aflush` from async contexts instead. 

112 """ 

113 loop = get_running_loop() 1a

114 if loop: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true1a

115 if in_global_loop(): # Guard against internal misuse 

116 raise RuntimeError( 

117 "Cannot call `APILogWorker.flush` from the global event loop; it" 

118 " would block the event loop and cause a deadlock. Use" 

119 " `APILogWorker.aflush` instead." 

120 ) 

121 

122 # Not ideal, but this method is called by the stdlib and cannot return a 

123 # coroutine so we just schedule the drain in a new thread and continue 

124 from_sync.call_soon_in_new_thread(create_call(APILogWorker.drain_all)) 

125 else: 

126 # We set a timeout of 5s because we don't want to block forever if the worker 

127 # is stuck. This can occur when the handler is being shutdown and the 

128 # `logging._lock` is held but the worker is attempting to emit logs resulting 

129 # in a deadlock. 

130 APILogWorker.drain_all(timeout=5) 1a

131 

132 @classmethod 1a

133 async def aflush(cls) -> None: 1a

134 """ 

135 Tell the `APILogWorker` to send any currently enqueued logs and block until 

136 completion. 

137 """ 

138 

139 result = APILogWorker.drain_all() 

140 if inspect.isawaitable(result): 

141 await result 

142 

143 def emit(self, record: logging.LogRecord) -> None: 1a

144 """ 

145 Send a log to the `APILogWorker` 

146 """ 

147 try: 

148 profile = prefect.context.get_settings_context() 

149 

150 if not profile.settings.logging.to_api.enabled: 

151 return # Respect the global settings toggle 

152 if not getattr(record, "send_to_api", True): 

153 return # Do not send records that have opted out 

154 

155 log = self.prepare(record) 

156 APILogWorker.instance().send(log) 

157 

158 except Exception: 

159 self.handleError(record) 

160 

161 def handleError(self, record: logging.LogRecord) -> None: 1a

162 _, exc, _ = sys.exc_info() 

163 

164 if isinstance(exc, MissingContextError): 

165 log_handling_when_missing_flow = ( 

166 PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW.value() 

167 ) 

168 if log_handling_when_missing_flow == "warn": 

169 # Warn when a logger is used outside of a run context, the stack level here 

170 # gets us to the user logging call 

171 warnings.warn( 

172 f"{exc} Set PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW=ignore to suppress this warning.", 

173 stacklevel=8, 

174 ) 

175 return 

176 elif log_handling_when_missing_flow == "ignore": 

177 return 

178 else: 

179 raise exc 

180 

181 # Display a longer traceback for other errors 

182 return super().handleError(record) 

183 

184 def prepare(self, record: logging.LogRecord) -> Dict[str, Any]: 1a

185 """ 

186 Convert a `logging.LogRecord` to the API `LogCreate` schema and serialize. 

187 

188 This infers the linked flow or task run from the log record or the current 

189 run context. 

190 

191 If a flow run id cannot be found, the log will be dropped. 

192 

193 Logs exceeding the maximum size will be dropped. 

194 """ 

195 flow_run_id = getattr(record, "flow_run_id", None) 

196 task_run_id = getattr(record, "task_run_id", None) 

197 worker_id = getattr(record, "worker_id", None) 

198 

199 if not flow_run_id: 

200 try: 

201 context = prefect.context.get_run_context() 

202 except MissingContextError: 

203 raise MissingContextError( 

204 f"Logger {record.name!r} attempted to send logs to the API without" 

205 " a flow run id. The API log handler can only send logs within" 

206 " flow run contexts unless the flow run id is manually provided." 

207 ) from None 

208 

209 if flow_run := getattr(context, "flow_run", None): 

210 if TYPE_CHECKING: 

211 assert isinstance(flow_run, FlowRun) 

212 flow_run_id = flow_run.id 

213 elif task_run := getattr(context, "task_run", None): 

214 if TYPE_CHECKING: 

215 assert isinstance(task_run, TaskRun) 

216 flow_run_id = task_run.flow_run_id 

217 task_run_id = task_run_id or task_run.id 

218 else: 

219 raise ValueError( 

220 "Encountered malformed run context. Does not contain flow or task " 

221 "run information." 

222 ) 

223 

224 # Parsing to a `LogCreate` object here gives us nice parsing error messages 

225 # from the standard lib `handleError` method if something goes wrong and 

226 # prevents malformed logs from entering the queue 

227 if isinstance(flow_run_id, str): 

228 try: 

229 flow_run_id = uuid.UUID(flow_run_id) 

230 except ValueError: 

231 flow_run_id = None 

232 

233 formatted_message = self.format(record) 

234 

235 log = LogCreate( 

236 flow_run_id=flow_run_id, 

237 task_run_id=task_run_id, 

238 worker_id=worker_id, 

239 name=record.name, 

240 level=record.levelno, 

241 timestamp=from_timestamp(getattr(record, "created", None) or time.time()), # pyright: ignore[reportArgumentType] 

242 message=formatted_message, 

243 ).model_dump(mode="json") 

244 

245 log_size = log["__payload_size__"] = self._get_payload_size(log) 

246 if log_size > PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(): 

247 max_size = PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value() 

248 oversize = log_size - max_size 

249 BUFFER = 50 

250 truncated_length = max(len(formatted_message) - oversize - BUFFER, 0) 

251 truncated_message = formatted_message[:truncated_length] + "... [truncated]" 

252 

253 log = LogCreate( 

254 flow_run_id=flow_run_id, 

255 task_run_id=task_run_id, 

256 worker_id=worker_id, 

257 name=record.name, 

258 level=record.levelno, 

259 timestamp=from_timestamp( 

260 getattr(record, "created", None) or time.time() # pyright: ignore[reportArgumentType] DateTime is split into two types depending on Python version 

261 ), 

262 message=truncated_message, 

263 ).model_dump(mode="json") 

264 

265 log["__payload_size__"] = self._get_payload_size(log) 

266 

267 return log 

268 

269 def _get_payload_size(self, log: Dict[str, Any]) -> int: 1a

270 return len(json.dumps(log).encode()) 

271 

272 

273class WorkerAPILogHandler(APILogHandler): 1a

274 def emit(self, record: logging.LogRecord) -> None: 1a

275 # Open-source API servers do not currently support worker logs, and 

276 # worker logs only have an associated worker ID when connected to Cloud, 

277 # so we won't send worker logs to the API unless they have a worker ID. 

278 if not getattr(record, "worker_id", None): 

279 return 

280 super().emit(record) 

281 

282 def prepare(self, record: logging.LogRecord) -> Dict[str, Any]: 1a

283 """ 

284 Convert a `logging.LogRecord` to the API `LogCreate` schema and serialize. 

285 

286 This will add in the worker id to the log. 

287 

288 Logs exceeding the maximum size will be dropped. 

289 """ 

290 

291 worker_id = getattr(record, "worker_id", None) 

292 

293 log = LogCreate( 

294 worker_id=worker_id, 

295 name=record.name, 

296 level=record.levelno, 

297 timestamp=from_timestamp(getattr(record, "created", None) or time.time()), # pyright: ignore[reportArgumentType] DateTime is split into two types depending on Python version 

298 message=self.format(record), 

299 ).model_dump(mode="json") 

300 

301 log_size = log["__payload_size__"] = self._get_payload_size(log) 

302 if log_size > PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(): 

303 raise ValueError( 

304 f"Log of size {log_size} is greater than the max size of " 

305 f"{PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value()}" 

306 ) 

307 

308 return log 

309 

310 

311class PrefectConsoleHandler(StreamHandler): 1a

312 def __init__( 1a

313 self, 

314 stream: TextIO | None = None, 

315 highlighter: type[Highlighter] = PrefectConsoleHighlighter, 

316 styles: dict[str, str] | None = None, 

317 level: int | str = logging.NOTSET, 

318 ): 

319 """ 

320 The default console handler for Prefect, which highlights log levels, 

321 web and file URLs, flow and task (run) names, and state types in the 

322 local console (terminal). 

323 

324 Highlighting can be toggled on/off with the PREFECT_LOGGING_COLORS setting. 

325 For finer control, use logging.yml to add or remove styles, and/or 

326 adjust colors. 

327 """ 

328 super().__init__(stream=stream) 1a

329 

330 styled_console = PREFECT_LOGGING_COLORS.value() 1a

331 markup_console = PREFECT_LOGGING_MARKUP.value() 1a

332 if styled_console: 332 ↛ 336line 332 didn't jump to line 336 because the condition on line 332 was always true1a

333 highlighter_instance = highlighter() 1a

334 theme = Theme(styles, inherit=False) 1a

335 else: 

336 highlighter_instance = NullHighlighter() 

337 theme = Theme(inherit=False) 

338 

339 if isinstance(level, str): 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true1a

340 self.level: int = logging.getLevelNamesMapping()[level] 

341 else: 

342 self.level: int = level 1a

343 

344 self.console: Console = Console( 1a

345 highlighter=highlighter_instance, 

346 theme=theme, 

347 file=self.stream, 

348 markup=markup_console, 

349 ) 

350 

351 def emit(self, record: logging.LogRecord) -> None: 1a

352 try: 

353 message = self.format(record) 

354 self.console.print(message, soft_wrap=True) 

355 except RecursionError: 

356 # This was copied over from logging.StreamHandler().emit() 

357 # https://bugs.python.org/issue36272 

358 raise 

359 except Exception: 

360 self.handleError(record)