Coverage for /usr/local/lib/python3.12/site-packages/prefect/logging/loggers.py: 35%
135 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1from __future__ import annotations 1a
3import io 1a
4import logging 1a
5import sys 1a
6from builtins import print 1a
7from contextlib import contextmanager 1a
8from functools import lru_cache 1a
9from logging import LogRecord 1a
10from typing import TYPE_CHECKING, Any, List, Mapping, MutableMapping, Optional, Union 1a
12from typing_extensions import Self 1a
14from prefect.exceptions import MissingContextError 1a
15from prefect.logging.filters import ObfuscateApiKeyFilter 1a
17if sys.version_info >= (3, 12): 17 ↛ 20line 17 didn't jump to line 20 because the condition on line 17 was always true1a
18 LoggingAdapter = logging.LoggerAdapter[logging.Logger] 1a
19else:
20 if TYPE_CHECKING:
21 LoggingAdapter = logging.LoggerAdapter[logging.Logger]
22 else:
23 LoggingAdapter = logging.LoggerAdapter
25if TYPE_CHECKING: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true1a
26 from prefect.client.schemas.objects import FlowRun, TaskRun
27 from prefect.context import RunContext
28 from prefect.flows import Flow
29 from prefect.tasks import Task
30 from prefect.workers.base import BaseWorker
33class PrefectLogAdapter(LoggingAdapter): 1a
34 """
35 Adapter that ensures extra kwargs are passed through correctly; without this
36 the `extra` fields set on the adapter would overshadow any provided on a
37 log-by-log basis.
39 See https://bugs.python.org/issue32732 — the Python team has declared that this is
40 not a bug in the LoggingAdapter and subclassing is the intended workaround.
41 """
43 def process( 1a
44 self, msg: str, kwargs: MutableMapping[str, Any]
45 ) -> tuple[str, MutableMapping[str, Any]]:
46 kwargs["extra"] = {**(self.extra or {}), **(kwargs.get("extra") or {})}
47 return (msg, kwargs)
49 def getChild( 1a
50 self, suffix: str, extra: dict[str, Any] | None = None
51 ) -> "PrefectLogAdapter":
52 _extra: Mapping[str, object] = extra or {}
54 return PrefectLogAdapter(
55 self.logger.getChild(suffix),
56 extra={
57 **(self.extra or {}),
58 **_extra,
59 },
60 )
63@lru_cache() 1a
64def get_logger(name: str | None = None) -> logging.Logger: 1a
65 """
66 Get a `prefect` logger. These loggers are intended for internal use within the
67 `prefect` package.
69 See `get_run_logger` for retrieving loggers for use within task or flow runs.
70 By default, only run-related loggers are connected to the `APILogHandler`.
71 """
72 parent_logger = logging.getLogger("prefect") 1abc
74 if name: 1abc
75 # Append the name if given but allow explicit full names e.g. "prefect.test"
76 # should not become "prefect.prefect.test"
77 if not name.startswith(parent_logger.name + "."): 1abc
78 logger = parent_logger.getChild(name) 1ab
79 else:
80 logger = logging.getLogger(name) 1abc
81 else:
82 logger = parent_logger 1a
84 # Prevent the current API key from being logged in plain text
85 obfuscate_api_key_filter = ObfuscateApiKeyFilter() 1abc
86 logger.addFilter(obfuscate_api_key_filter) 1abc
88 return logger 1abc
91def get_run_logger( 1a
92 context: Optional["RunContext"] = None, **kwargs: Any
93) -> Union[logging.Logger, LoggingAdapter]:
94 """
95 Get a Prefect logger for the current task run or flow run.
97 The logger will be named either `prefect.task_runs` or `prefect.flow_runs`.
98 Contextual data about the run will be attached to the log records.
100 These loggers are connected to the `APILogHandler` by default to send log records to
101 the API.
103 Arguments:
104 context: A specific context may be provided as an override. By default, the
105 context is inferred from global state and this should not be needed.
106 **kwargs: Additional keyword arguments will be attached to the log records in
107 addition to the run metadata
109 Raises:
110 MissingContextError: If no context can be found
111 """
112 from prefect.context import FlowRunContext, TaskRunContext
114 # Check for existing contexts
115 task_run_context = TaskRunContext.get()
116 flow_run_context = FlowRunContext.get()
118 # Apply the context override
119 if context:
120 if isinstance(context, FlowRunContext):
121 flow_run_context = context
122 elif isinstance(context, TaskRunContext):
123 task_run_context = context
124 else:
125 raise TypeError(
126 f"Received unexpected type {type(context).__name__!r} for context. "
127 "Expected one of 'None', 'FlowRunContext', or 'TaskRunContext'."
128 )
130 # Determine if this is a task or flow run logger
131 if task_run_context:
132 logger = task_run_logger(
133 task_run=task_run_context.task_run,
134 task=task_run_context.task,
135 flow_run=flow_run_context.flow_run if flow_run_context else None,
136 flow=flow_run_context.flow if flow_run_context else None,
137 **kwargs,
138 )
139 elif flow_run_context:
140 logger = flow_run_logger(
141 flow_run=flow_run_context.flow_run, # type: ignore
142 flow=flow_run_context.flow,
143 **kwargs,
144 )
145 elif (
146 get_logger("prefect.flow_runs").disabled
147 and get_logger("prefect.task_runs").disabled
148 ):
149 logger = logging.getLogger("null")
150 logger.disabled = True
151 else:
152 raise MissingContextError("There is no active flow or task run context.")
154 return logger
157def flow_run_logger( 1a
158 flow_run: "FlowRun",
159 flow: Optional["Flow[Any, Any]"] = None,
160 **kwargs: str,
161) -> PrefectLogAdapter:
162 """
163 Create a flow run logger with the run's metadata attached.
165 Additional keyword arguments can be provided to attach custom data to the log
166 records.
168 If the flow run context is available, see `get_run_logger` instead.
169 """
170 return PrefectLogAdapter(
171 get_logger("prefect.flow_runs"),
172 extra={
173 **{
174 "flow_run_name": flow_run.name if flow_run else "<unknown>",
175 "flow_run_id": str(flow_run.id) if flow_run else "<unknown>",
176 "flow_name": flow.name if flow else "<unknown>",
177 },
178 **kwargs,
179 },
180 )
183def task_run_logger( 1a
184 task_run: "TaskRun",
185 task: Optional["Task[Any, Any]"] = None,
186 flow_run: Optional["FlowRun"] = None,
187 flow: Optional["Flow[Any, Any]"] = None,
188 **kwargs: Any,
189) -> LoggingAdapter:
190 """
191 Create a task run logger with the run's metadata attached.
193 Additional keyword arguments can be provided to attach custom data to the log
194 records.
196 If the task run context is available, see `get_run_logger` instead.
198 If only the flow run context is available, it will be used for default values
199 of `flow_run` and `flow`.
200 """
201 from prefect.context import FlowRunContext
203 if not flow_run or not flow:
204 flow_run_context = FlowRunContext.get()
205 if flow_run_context:
206 flow_run = flow_run or flow_run_context.flow_run
207 flow = flow or flow_run_context.flow
209 return PrefectLogAdapter(
210 get_logger("prefect.task_runs"),
211 extra={
212 **{
213 "task_run_id": str(task_run.id),
214 "flow_run_id": str(task_run.flow_run_id),
215 "task_run_name": task_run.name,
216 "task_name": task.name if task else "<unknown>",
217 "flow_run_name": flow_run.name if flow_run else "<unknown>",
218 "flow_name": flow.name if flow else "<unknown>",
219 },
220 **kwargs,
221 },
222 )
225def get_worker_logger( 1a
226 worker: "BaseWorker[Any, Any, Any]", name: Optional[str] = None
227) -> logging.Logger | LoggingAdapter:
228 """
229 Create a worker logger with the worker's metadata attached.
231 If the worker has a backend_id, it will be attached to the log records.
232 If the worker does not have a backend_id a basic logger will be returned.
233 If the worker does not have a backend_id attribute, a basic logger will be returned.
234 """
236 worker_log_name = name or f"workers.{worker.__class__.type}.{worker.name.lower()}"
238 worker_id = getattr(worker, "backend_id", None)
239 if worker_id:
240 return PrefectLogAdapter(
241 get_logger(worker_log_name),
242 extra={
243 "worker_id": str(worker.backend_id),
244 },
245 )
246 else:
247 return get_logger(worker_log_name)
250@contextmanager 1a
251def disable_logger(name: str): 1a
252 """
253 Get a logger by name and disables it within the context manager.
254 Upon exiting the context manager, the logger is returned to its
255 original state.
256 """
257 logger = logging.getLogger(name=name) 1abd
259 # determine if it's already disabled
260 base_state = logger.disabled 1abd
261 try: 1abd
262 # disable the logger
263 logger.disabled = True 1abd
264 yield 1abd
265 finally:
266 # return to base state
267 logger.disabled = base_state 1abd
270@contextmanager 1a
271def disable_run_logger(): 1a
272 """
273 Gets both `prefect.flow_run` and `prefect.task_run` and disables them
274 within the context manager. Upon exiting the context manager, both loggers
275 are returned to their original state.
276 """
277 with disable_logger("prefect.flow_runs"), disable_logger("prefect.task_runs"):
278 yield
281def print_as_log(*args: Any, **kwargs: Any) -> None: 1a
282 """
283 A patch for `print` to send printed messages to the Prefect run logger.
285 If no run is active, `print` will behave as if it were not patched.
287 If `print` sends data to a file other than `sys.stdout` or `sys.stderr`, it will
288 not be forwarded to the Prefect logger either.
289 """
290 from prefect.context import FlowRunContext, TaskRunContext
292 # When both contexts exist, we need to determine which one represents the
293 # currently executing code:
294 # - If we're in a subflow that's wrapped by a task, FlowRunContext represents
295 # the subflow and should take precedence
296 # - If we're in a regular task, TaskRunContext represents the task
297 #
298 # We can distinguish by checking flow_run_id:
299 # - Regular task: flow_ctx.flow_run.id == task_ctx.task_run.flow_run_id
300 # - Subflow in task: flow_ctx.flow_run.id != task_ctx.task_run.flow_run_id
301 flow_ctx = FlowRunContext.get()
302 task_ctx = TaskRunContext.get()
304 if flow_ctx and task_ctx:
305 # If the flow_run_id from the flow context differs from the task's flow_run_id,
306 # we're in a subflow that's running inside a task, so prefer the flow context
307 if flow_ctx.flow_run and flow_ctx.flow_run.id != task_ctx.task_run.flow_run_id:
308 context = flow_ctx
309 else:
310 # We're in a regular task within the flow
311 context = task_ctx
312 else:
313 context = flow_ctx or task_ctx
315 if (
316 not context
317 or not context.log_prints
318 or kwargs.get("file") not in {None, sys.stdout, sys.stderr}
319 ):
320 return print(*args, **kwargs)
322 logger = get_run_logger()
324 # Print to an in-memory buffer; so we do not need to implement `print`
325 buffer = io.StringIO()
326 kwargs["file"] = buffer
327 print(*args, **kwargs)
329 # Remove trailing whitespace to prevent duplicates
330 logger.info(buffer.getvalue().rstrip())
333@contextmanager 1a
334def patch_print(): 1a
335 """
336 Patches the Python builtin `print` method to use `print_as_log`
337 """
338 import builtins
340 original = builtins.print
342 try:
343 builtins.print = print_as_log
344 yield
345 finally:
346 builtins.print = original
349class LogEavesdropper(logging.Handler): 1a
350 """A context manager that collects logs for the duration of the context
352 Example:
354 ```python
355 import logging
356 from prefect.logging import LogEavesdropper
358 with LogEavesdropper("my_logger") as eavesdropper:
359 logging.getLogger("my_logger").info("Hello, world!")
360 logging.getLogger("my_logger.child_module").info("Another one!")
362 print(eavesdropper.text())
364 # Outputs: "Hello, world!\nAnother one!"
365 """
367 _target_logger: Optional[logging.Logger] 1a
368 _lines: List[str] 1a
370 def __init__(self, eavesdrop_on: str, level: int = logging.NOTSET): 1a
371 """
372 Args:
373 eavesdrop_on (str): the name of the logger to eavesdrop on
374 level (int): the minimum log level to eavesdrop on; if omitted, all levels
375 are captured
376 """
378 super().__init__(level=level)
379 self.eavesdrop_on = eavesdrop_on
380 self._target_logger = None
382 # It's important that we use a very minimalistic formatter for use cases where
383 # we may present these logs back to the user. We shouldn't leak filenames,
384 # versions, or other environmental information.
385 self.formatter: logging.Formatter | None = logging.Formatter(
386 "[%(levelname)s]: %(message)s"
387 )
389 def __enter__(self) -> Self: 1a
390 self._target_logger = logging.getLogger(self.eavesdrop_on)
391 self._original_level = self._target_logger.level
392 self._target_logger.level = self.level
393 self._target_logger.addHandler(self)
394 self._lines = []
395 return self
397 def __exit__(self, *_: Any) -> None: 1a
398 if self._target_logger:
399 self._target_logger.removeHandler(self)
400 self._target_logger.level = self._original_level
402 def emit(self, record: LogRecord) -> None: 1a
403 """The logging.Handler implementation, not intended to be called directly."""
404 self._lines.append(self.format(record))
406 def text(self) -> str: 1a
407 """Return the collected logs as a single newline-delimited string"""
408 return "\n".join(self._lines)