Coverage for /usr/local/lib/python3.12/site-packages/prefect/logging/handlers.py: 33%
166 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import inspect 1a
4import json 1a
5import logging 1a
6import sys 1a
7import time 1a
8import traceback 1a
9import uuid 1a
10import warnings 1a
11from contextlib import asynccontextmanager 1a
12from typing import TYPE_CHECKING, Any, Dict, TextIO, Type 1a
14from rich.console import Console 1a
15from rich.highlighter import Highlighter, NullHighlighter 1a
16from rich.theme import Theme 1a
17from typing_extensions import Self 1a
19import prefect.context 1a
20from prefect._internal.concurrency.api import create_call, from_sync 1a
21from prefect._internal.concurrency.event_loop import get_running_loop 1a
22from prefect._internal.concurrency.services import BatchedQueueService 1a
23from prefect._internal.concurrency.threads import in_global_loop 1a
24from prefect.client.orchestration import get_client 1a
25from prefect.client.schemas.actions import LogCreate 1a
26from prefect.exceptions import MissingContextError 1a
27from prefect.logging.highlighters import PrefectConsoleHighlighter 1a
28from prefect.settings import ( 1a
29 PREFECT_API_URL,
30 PREFECT_LOGGING_COLORS,
31 PREFECT_LOGGING_INTERNAL_LEVEL,
32 PREFECT_LOGGING_MARKUP,
33 PREFECT_LOGGING_TO_API_BATCH_INTERVAL,
34 PREFECT_LOGGING_TO_API_BATCH_SIZE,
35 PREFECT_LOGGING_TO_API_MAX_LOG_SIZE,
36 PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW,
37)
38from prefect.types._datetime import from_timestamp 1a
40if sys.version_info >= (3, 12): 40 ↛ 43line 40 didn't jump to line 43 because the condition on line 40 was always true1a
41 StreamHandler = logging.StreamHandler[TextIO] 1a
42else:
43 if TYPE_CHECKING:
44 StreamHandler = logging.StreamHandler[TextIO]
45 else:
46 StreamHandler = logging.StreamHandler
48if TYPE_CHECKING: 48 ↛ 49line 48 didn't jump to line 49 because the condition on line 48 was never true1a
49 from prefect.client.schemas.objects import FlowRun, TaskRun
52class APILogWorker(BatchedQueueService[Dict[str, Any]]): 1a
53 @property 1a
54 def max_batch_size(self) -> int: 1a
55 return max(
56 PREFECT_LOGGING_TO_API_BATCH_SIZE.value()
57 - PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
58 PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
59 )
61 @property 1a
62 def min_interval(self) -> float | None: 1a
63 return PREFECT_LOGGING_TO_API_BATCH_INTERVAL.value()
65 async def _handle_batch(self, items: list[dict[str, Any]]): 1a
66 try:
67 await self._client.create_logs(items)
68 except Exception as e:
69 # Roughly replicate the behavior of the stdlib logger error handling
70 if logging.raiseExceptions and sys.stderr:
71 sys.stderr.write("--- Error logging to API ---\n")
72 if PREFECT_LOGGING_INTERNAL_LEVEL.value() == "DEBUG":
73 traceback.print_exc(file=sys.stderr)
74 else:
75 # Only log the exception message in non-DEBUG mode
76 sys.stderr.write(str(e))
78 @asynccontextmanager 1a
79 async def _lifespan(self): 1a
80 async with get_client() as self._client:
81 yield
83 @classmethod 1a
84 def instance(cls: Type[Self], *args: Any) -> Self: 1a
85 settings = (
86 PREFECT_LOGGING_TO_API_BATCH_SIZE.value(),
87 PREFECT_API_URL.value(),
88 PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value(),
89 )
91 # Ensure a unique worker is retrieved per relevant logging settings
92 return super().instance(*settings, *args)
94 def _get_size(self, item: Dict[str, Any]) -> int: 1a
95 return item.pop("__payload_size__", None) or len(json.dumps(item).encode())
98class APILogHandler(logging.Handler): 1a
99 """
100 A logging handler that sends logs to the Prefect API.
102 Sends log records to the `APILogWorker` which manages sending batches of logs in
103 the background.
104 """
106 def flush(self) -> None: 1a
107 """
108 Tell the `APILogWorker` to send any currently enqueued logs and block until
109 completion.
111 Use `aflush` from async contexts instead.
112 """
113 loop = get_running_loop() 1a
114 if loop: 114 ↛ 115line 114 didn't jump to line 115 because the condition on line 114 was never true1a
115 if in_global_loop(): # Guard against internal misuse
116 raise RuntimeError(
117 "Cannot call `APILogWorker.flush` from the global event loop; it"
118 " would block the event loop and cause a deadlock. Use"
119 " `APILogWorker.aflush` instead."
120 )
122 # Not ideal, but this method is called by the stdlib and cannot return a
123 # coroutine so we just schedule the drain in a new thread and continue
124 from_sync.call_soon_in_new_thread(create_call(APILogWorker.drain_all))
125 else:
126 # We set a timeout of 5s because we don't want to block forever if the worker
127 # is stuck. This can occur when the handler is being shutdown and the
128 # `logging._lock` is held but the worker is attempting to emit logs resulting
129 # in a deadlock.
130 APILogWorker.drain_all(timeout=5) 1a
132 @classmethod 1a
133 async def aflush(cls) -> None: 1a
134 """
135 Tell the `APILogWorker` to send any currently enqueued logs and block until
136 completion.
137 """
139 result = APILogWorker.drain_all()
140 if inspect.isawaitable(result):
141 await result
143 def emit(self, record: logging.LogRecord) -> None: 1a
144 """
145 Send a log to the `APILogWorker`
146 """
147 try:
148 profile = prefect.context.get_settings_context()
150 if not profile.settings.logging.to_api.enabled:
151 return # Respect the global settings toggle
152 if not getattr(record, "send_to_api", True):
153 return # Do not send records that have opted out
155 log = self.prepare(record)
156 APILogWorker.instance().send(log)
158 except Exception:
159 self.handleError(record)
161 def handleError(self, record: logging.LogRecord) -> None: 1a
162 _, exc, _ = sys.exc_info()
164 if isinstance(exc, MissingContextError):
165 log_handling_when_missing_flow = (
166 PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW.value()
167 )
168 if log_handling_when_missing_flow == "warn":
169 # Warn when a logger is used outside of a run context, the stack level here
170 # gets us to the user logging call
171 warnings.warn(
172 f"{exc} Set PREFECT_LOGGING_TO_API_WHEN_MISSING_FLOW=ignore to suppress this warning.",
173 stacklevel=8,
174 )
175 return
176 elif log_handling_when_missing_flow == "ignore":
177 return
178 else:
179 raise exc
181 # Display a longer traceback for other errors
182 return super().handleError(record)
184 def prepare(self, record: logging.LogRecord) -> Dict[str, Any]: 1a
185 """
186 Convert a `logging.LogRecord` to the API `LogCreate` schema and serialize.
188 This infers the linked flow or task run from the log record or the current
189 run context.
191 If a flow run id cannot be found, the log will be dropped.
193 Logs exceeding the maximum size will be dropped.
194 """
195 flow_run_id = getattr(record, "flow_run_id", None)
196 task_run_id = getattr(record, "task_run_id", None)
197 worker_id = getattr(record, "worker_id", None)
199 if not flow_run_id:
200 try:
201 context = prefect.context.get_run_context()
202 except MissingContextError:
203 raise MissingContextError(
204 f"Logger {record.name!r} attempted to send logs to the API without"
205 " a flow run id. The API log handler can only send logs within"
206 " flow run contexts unless the flow run id is manually provided."
207 ) from None
209 if flow_run := getattr(context, "flow_run", None):
210 if TYPE_CHECKING:
211 assert isinstance(flow_run, FlowRun)
212 flow_run_id = flow_run.id
213 elif task_run := getattr(context, "task_run", None):
214 if TYPE_CHECKING:
215 assert isinstance(task_run, TaskRun)
216 flow_run_id = task_run.flow_run_id
217 task_run_id = task_run_id or task_run.id
218 else:
219 raise ValueError(
220 "Encountered malformed run context. Does not contain flow or task "
221 "run information."
222 )
224 # Parsing to a `LogCreate` object here gives us nice parsing error messages
225 # from the standard lib `handleError` method if something goes wrong and
226 # prevents malformed logs from entering the queue
227 if isinstance(flow_run_id, str):
228 try:
229 flow_run_id = uuid.UUID(flow_run_id)
230 except ValueError:
231 flow_run_id = None
233 formatted_message = self.format(record)
235 log = LogCreate(
236 flow_run_id=flow_run_id,
237 task_run_id=task_run_id,
238 worker_id=worker_id,
239 name=record.name,
240 level=record.levelno,
241 timestamp=from_timestamp(getattr(record, "created", None) or time.time()), # pyright: ignore[reportArgumentType]
242 message=formatted_message,
243 ).model_dump(mode="json")
245 log_size = log["__payload_size__"] = self._get_payload_size(log)
246 if log_size > PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value():
247 max_size = PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value()
248 oversize = log_size - max_size
249 BUFFER = 50
250 truncated_length = max(len(formatted_message) - oversize - BUFFER, 0)
251 truncated_message = formatted_message[:truncated_length] + "... [truncated]"
253 log = LogCreate(
254 flow_run_id=flow_run_id,
255 task_run_id=task_run_id,
256 worker_id=worker_id,
257 name=record.name,
258 level=record.levelno,
259 timestamp=from_timestamp(
260 getattr(record, "created", None) or time.time() # pyright: ignore[reportArgumentType] DateTime is split into two types depending on Python version
261 ),
262 message=truncated_message,
263 ).model_dump(mode="json")
265 log["__payload_size__"] = self._get_payload_size(log)
267 return log
269 def _get_payload_size(self, log: Dict[str, Any]) -> int: 1a
270 return len(json.dumps(log).encode())
273class WorkerAPILogHandler(APILogHandler): 1a
274 def emit(self, record: logging.LogRecord) -> None: 1a
275 # Open-source API servers do not currently support worker logs, and
276 # worker logs only have an associated worker ID when connected to Cloud,
277 # so we won't send worker logs to the API unless they have a worker ID.
278 if not getattr(record, "worker_id", None):
279 return
280 super().emit(record)
282 def prepare(self, record: logging.LogRecord) -> Dict[str, Any]: 1a
283 """
284 Convert a `logging.LogRecord` to the API `LogCreate` schema and serialize.
286 This will add in the worker id to the log.
288 Logs exceeding the maximum size will be dropped.
289 """
291 worker_id = getattr(record, "worker_id", None)
293 log = LogCreate(
294 worker_id=worker_id,
295 name=record.name,
296 level=record.levelno,
297 timestamp=from_timestamp(getattr(record, "created", None) or time.time()), # pyright: ignore[reportArgumentType] DateTime is split into two types depending on Python version
298 message=self.format(record),
299 ).model_dump(mode="json")
301 log_size = log["__payload_size__"] = self._get_payload_size(log)
302 if log_size > PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value():
303 raise ValueError(
304 f"Log of size {log_size} is greater than the max size of "
305 f"{PREFECT_LOGGING_TO_API_MAX_LOG_SIZE.value()}"
306 )
308 return log
311class PrefectConsoleHandler(StreamHandler): 1a
312 def __init__( 1a
313 self,
314 stream: TextIO | None = None,
315 highlighter: type[Highlighter] = PrefectConsoleHighlighter,
316 styles: dict[str, str] | None = None,
317 level: int | str = logging.NOTSET,
318 ):
319 """
320 The default console handler for Prefect, which highlights log levels,
321 web and file URLs, flow and task (run) names, and state types in the
322 local console (terminal).
324 Highlighting can be toggled on/off with the PREFECT_LOGGING_COLORS setting.
325 For finer control, use logging.yml to add or remove styles, and/or
326 adjust colors.
327 """
328 super().__init__(stream=stream) 1a
330 styled_console = PREFECT_LOGGING_COLORS.value() 1a
331 markup_console = PREFECT_LOGGING_MARKUP.value() 1a
332 if styled_console: 332 ↛ 336line 332 didn't jump to line 336 because the condition on line 332 was always true1a
333 highlighter_instance = highlighter() 1a
334 theme = Theme(styles, inherit=False) 1a
335 else:
336 highlighter_instance = NullHighlighter()
337 theme = Theme(inherit=False)
339 if isinstance(level, str): 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true1a
340 self.level: int = logging.getLevelNamesMapping()[level]
341 else:
342 self.level: int = level 1a
344 self.console: Console = Console( 1a
345 highlighter=highlighter_instance,
346 theme=theme,
347 file=self.stream,
348 markup=markup_console,
349 )
351 def emit(self, record: logging.LogRecord) -> None: 1a
352 try:
353 message = self.format(record)
354 self.console.print(message, soft_wrap=True)
355 except RecursionError:
356 # This was copied over from logging.StreamHandler().emit()
357 # https://bugs.python.org/issue36272
358 raise
359 except Exception:
360 self.handleError(record)