Coverage for /usr/local/lib/python3.12/site-packages/prefect/logging/loggers.py: 35%

135 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1from __future__ import annotations 1a

2 

3import io 1a

4import logging 1a

5import sys 1a

6from builtins import print 1a

7from contextlib import contextmanager 1a

8from functools import lru_cache 1a

9from logging import LogRecord 1a

10from typing import TYPE_CHECKING, Any, List, Mapping, MutableMapping, Optional, Union 1a

11 

12from typing_extensions import Self 1a

13 

14from prefect.exceptions import MissingContextError 1a

15from prefect.logging.filters import ObfuscateApiKeyFilter 1a

16 

17if sys.version_info >= (3, 12): 17 ↛ 20line 17 didn't jump to line 20 because the condition on line 17 was always true1a

18 LoggingAdapter = logging.LoggerAdapter[logging.Logger] 1a

19else: 

20 if TYPE_CHECKING: 

21 LoggingAdapter = logging.LoggerAdapter[logging.Logger] 

22 else: 

23 LoggingAdapter = logging.LoggerAdapter 

24 

25if TYPE_CHECKING: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true1a

26 from prefect.client.schemas.objects import FlowRun, TaskRun 

27 from prefect.context import RunContext 

28 from prefect.flows import Flow 

29 from prefect.tasks import Task 

30 from prefect.workers.base import BaseWorker 

31 

32 

33class PrefectLogAdapter(LoggingAdapter): 1a

34 """ 

35 Adapter that ensures extra kwargs are passed through correctly; without this 

36 the `extra` fields set on the adapter would overshadow any provided on a 

37 log-by-log basis. 

38 

39 See https://bugs.python.org/issue32732 — the Python team has declared that this is 

40 not a bug in the LoggingAdapter and subclassing is the intended workaround. 

41 """ 

42 

43 def process( 1a

44 self, msg: str, kwargs: MutableMapping[str, Any] 

45 ) -> tuple[str, MutableMapping[str, Any]]: 

46 kwargs["extra"] = {**(self.extra or {}), **(kwargs.get("extra") or {})} 

47 return (msg, kwargs) 

48 

49 def getChild( 1a

50 self, suffix: str, extra: dict[str, Any] | None = None 

51 ) -> "PrefectLogAdapter": 

52 _extra: Mapping[str, object] = extra or {} 

53 

54 return PrefectLogAdapter( 

55 self.logger.getChild(suffix), 

56 extra={ 

57 **(self.extra or {}), 

58 **_extra, 

59 }, 

60 ) 

61 

62 

63@lru_cache() 1a

64def get_logger(name: str | None = None) -> logging.Logger: 1a

65 """ 

66 Get a `prefect` logger. These loggers are intended for internal use within the 

67 `prefect` package. 

68 

69 See `get_run_logger` for retrieving loggers for use within task or flow runs. 

70 By default, only run-related loggers are connected to the `APILogHandler`. 

71 """ 

72 parent_logger = logging.getLogger("prefect") 1ab

73 

74 if name: 1ab

75 # Append the name if given but allow explicit full names e.g. "prefect.test" 

76 # should not become "prefect.prefect.test" 

77 if not name.startswith(parent_logger.name + "."): 1ab

78 logger = parent_logger.getChild(name) 1ab

79 else: 

80 logger = logging.getLogger(name) 1ab

81 else: 

82 logger = parent_logger 1a

83 

84 # Prevent the current API key from being logged in plain text 

85 obfuscate_api_key_filter = ObfuscateApiKeyFilter() 1ab

86 logger.addFilter(obfuscate_api_key_filter) 1ab

87 

88 return logger 1ab

89 

90 

91def get_run_logger( 1a

92 context: Optional["RunContext"] = None, **kwargs: Any 

93) -> Union[logging.Logger, LoggingAdapter]: 

94 """ 

95 Get a Prefect logger for the current task run or flow run. 

96 

97 The logger will be named either `prefect.task_runs` or `prefect.flow_runs`. 

98 Contextual data about the run will be attached to the log records. 

99 

100 These loggers are connected to the `APILogHandler` by default to send log records to 

101 the API. 

102 

103 Arguments: 

104 context: A specific context may be provided as an override. By default, the 

105 context is inferred from global state and this should not be needed. 

106 **kwargs: Additional keyword arguments will be attached to the log records in 

107 addition to the run metadata 

108 

109 Raises: 

110 MissingContextError: If no context can be found 

111 """ 

112 from prefect.context import FlowRunContext, TaskRunContext 

113 

114 # Check for existing contexts 

115 task_run_context = TaskRunContext.get() 

116 flow_run_context = FlowRunContext.get() 

117 

118 # Apply the context override 

119 if context: 

120 if isinstance(context, FlowRunContext): 

121 flow_run_context = context 

122 elif isinstance(context, TaskRunContext): 

123 task_run_context = context 

124 else: 

125 raise TypeError( 

126 f"Received unexpected type {type(context).__name__!r} for context. " 

127 "Expected one of 'None', 'FlowRunContext', or 'TaskRunContext'." 

128 ) 

129 

130 # Determine if this is a task or flow run logger 

131 if task_run_context: 

132 logger = task_run_logger( 

133 task_run=task_run_context.task_run, 

134 task=task_run_context.task, 

135 flow_run=flow_run_context.flow_run if flow_run_context else None, 

136 flow=flow_run_context.flow if flow_run_context else None, 

137 **kwargs, 

138 ) 

139 elif flow_run_context: 

140 logger = flow_run_logger( 

141 flow_run=flow_run_context.flow_run, # type: ignore 

142 flow=flow_run_context.flow, 

143 **kwargs, 

144 ) 

145 elif ( 

146 get_logger("prefect.flow_runs").disabled 

147 and get_logger("prefect.task_runs").disabled 

148 ): 

149 logger = logging.getLogger("null") 

150 logger.disabled = True 

151 else: 

152 raise MissingContextError("There is no active flow or task run context.") 

153 

154 return logger 

155 

156 

157def flow_run_logger( 1a

158 flow_run: "FlowRun", 

159 flow: Optional["Flow[Any, Any]"] = None, 

160 **kwargs: str, 

161) -> PrefectLogAdapter: 

162 """ 

163 Create a flow run logger with the run's metadata attached. 

164 

165 Additional keyword arguments can be provided to attach custom data to the log 

166 records. 

167 

168 If the flow run context is available, see `get_run_logger` instead. 

169 """ 

170 return PrefectLogAdapter( 

171 get_logger("prefect.flow_runs"), 

172 extra={ 

173 **{ 

174 "flow_run_name": flow_run.name if flow_run else "<unknown>", 

175 "flow_run_id": str(flow_run.id) if flow_run else "<unknown>", 

176 "flow_name": flow.name if flow else "<unknown>", 

177 }, 

178 **kwargs, 

179 }, 

180 ) 

181 

182 

183def task_run_logger( 1a

184 task_run: "TaskRun", 

185 task: Optional["Task[Any, Any]"] = None, 

186 flow_run: Optional["FlowRun"] = None, 

187 flow: Optional["Flow[Any, Any]"] = None, 

188 **kwargs: Any, 

189) -> LoggingAdapter: 

190 """ 

191 Create a task run logger with the run's metadata attached. 

192 

193 Additional keyword arguments can be provided to attach custom data to the log 

194 records. 

195 

196 If the task run context is available, see `get_run_logger` instead. 

197 

198 If only the flow run context is available, it will be used for default values 

199 of `flow_run` and `flow`. 

200 """ 

201 from prefect.context import FlowRunContext 

202 

203 if not flow_run or not flow: 

204 flow_run_context = FlowRunContext.get() 

205 if flow_run_context: 

206 flow_run = flow_run or flow_run_context.flow_run 

207 flow = flow or flow_run_context.flow 

208 

209 return PrefectLogAdapter( 

210 get_logger("prefect.task_runs"), 

211 extra={ 

212 **{ 

213 "task_run_id": str(task_run.id), 

214 "flow_run_id": str(task_run.flow_run_id), 

215 "task_run_name": task_run.name, 

216 "task_name": task.name if task else "<unknown>", 

217 "flow_run_name": flow_run.name if flow_run else "<unknown>", 

218 "flow_name": flow.name if flow else "<unknown>", 

219 }, 

220 **kwargs, 

221 }, 

222 ) 

223 

224 

225def get_worker_logger( 1a

226 worker: "BaseWorker[Any, Any, Any]", name: Optional[str] = None 

227) -> logging.Logger | LoggingAdapter: 

228 """ 

229 Create a worker logger with the worker's metadata attached. 

230 

231 If the worker has a backend_id, it will be attached to the log records. 

232 If the worker does not have a backend_id a basic logger will be returned. 

233 If the worker does not have a backend_id attribute, a basic logger will be returned. 

234 """ 

235 

236 worker_log_name = name or f"workers.{worker.__class__.type}.{worker.name.lower()}" 

237 

238 worker_id = getattr(worker, "backend_id", None) 

239 if worker_id: 

240 return PrefectLogAdapter( 

241 get_logger(worker_log_name), 

242 extra={ 

243 "worker_id": str(worker.backend_id), 

244 }, 

245 ) 

246 else: 

247 return get_logger(worker_log_name) 

248 

249 

250@contextmanager 1a

251def disable_logger(name: str): 1a

252 """ 

253 Get a logger by name and disables it within the context manager. 

254 Upon exiting the context manager, the logger is returned to its 

255 original state. 

256 """ 

257 logger = logging.getLogger(name=name) 1ab

258 

259 # determine if it's already disabled 

260 base_state = logger.disabled 1ab

261 try: 1ab

262 # disable the logger 

263 logger.disabled = True 1ab

264 yield 1ab

265 finally: 

266 # return to base state 

267 logger.disabled = base_state 1ab

268 

269 

270@contextmanager 1a

271def disable_run_logger(): 1a

272 """ 

273 Gets both `prefect.flow_run` and `prefect.task_run` and disables them 

274 within the context manager. Upon exiting the context manager, both loggers 

275 are returned to their original state. 

276 """ 

277 with disable_logger("prefect.flow_runs"), disable_logger("prefect.task_runs"): 

278 yield 

279 

280 

281def print_as_log(*args: Any, **kwargs: Any) -> None: 1a

282 """ 

283 A patch for `print` to send printed messages to the Prefect run logger. 

284 

285 If no run is active, `print` will behave as if it were not patched. 

286 

287 If `print` sends data to a file other than `sys.stdout` or `sys.stderr`, it will 

288 not be forwarded to the Prefect logger either. 

289 """ 

290 from prefect.context import FlowRunContext, TaskRunContext 

291 

292 # When both contexts exist, we need to determine which one represents the 

293 # currently executing code: 

294 # - If we're in a subflow that's wrapped by a task, FlowRunContext represents 

295 # the subflow and should take precedence 

296 # - If we're in a regular task, TaskRunContext represents the task 

297 # 

298 # We can distinguish by checking flow_run_id: 

299 # - Regular task: flow_ctx.flow_run.id == task_ctx.task_run.flow_run_id 

300 # - Subflow in task: flow_ctx.flow_run.id != task_ctx.task_run.flow_run_id 

301 flow_ctx = FlowRunContext.get() 

302 task_ctx = TaskRunContext.get() 

303 

304 if flow_ctx and task_ctx: 

305 # If the flow_run_id from the flow context differs from the task's flow_run_id, 

306 # we're in a subflow that's running inside a task, so prefer the flow context 

307 if flow_ctx.flow_run and flow_ctx.flow_run.id != task_ctx.task_run.flow_run_id: 

308 context = flow_ctx 

309 else: 

310 # We're in a regular task within the flow 

311 context = task_ctx 

312 else: 

313 context = flow_ctx or task_ctx 

314 

315 if ( 

316 not context 

317 or not context.log_prints 

318 or kwargs.get("file") not in {None, sys.stdout, sys.stderr} 

319 ): 

320 return print(*args, **kwargs) 

321 

322 logger = get_run_logger() 

323 

324 # Print to an in-memory buffer; so we do not need to implement `print` 

325 buffer = io.StringIO() 

326 kwargs["file"] = buffer 

327 print(*args, **kwargs) 

328 

329 # Remove trailing whitespace to prevent duplicates 

330 logger.info(buffer.getvalue().rstrip()) 

331 

332 

333@contextmanager 1a

334def patch_print(): 1a

335 """ 

336 Patches the Python builtin `print` method to use `print_as_log` 

337 """ 

338 import builtins 

339 

340 original = builtins.print 

341 

342 try: 

343 builtins.print = print_as_log 

344 yield 

345 finally: 

346 builtins.print = original 

347 

348 

349class LogEavesdropper(logging.Handler): 1a

350 """A context manager that collects logs for the duration of the context 

351 

352 Example: 

353 

354 ```python 

355 import logging 

356 from prefect.logging import LogEavesdropper 

357 

358 with LogEavesdropper("my_logger") as eavesdropper: 

359 logging.getLogger("my_logger").info("Hello, world!") 

360 logging.getLogger("my_logger.child_module").info("Another one!") 

361 

362 print(eavesdropper.text()) 

363 

364 # Outputs: "Hello, world!\nAnother one!" 

365 """ 

366 

367 _target_logger: Optional[logging.Logger] 1a

368 _lines: List[str] 1a

369 

370 def __init__(self, eavesdrop_on: str, level: int = logging.NOTSET): 1a

371 """ 

372 Args: 

373 eavesdrop_on (str): the name of the logger to eavesdrop on 

374 level (int): the minimum log level to eavesdrop on; if omitted, all levels 

375 are captured 

376 """ 

377 

378 super().__init__(level=level) 

379 self.eavesdrop_on = eavesdrop_on 

380 self._target_logger = None 

381 

382 # It's important that we use a very minimalistic formatter for use cases where 

383 # we may present these logs back to the user. We shouldn't leak filenames, 

384 # versions, or other environmental information. 

385 self.formatter: logging.Formatter | None = logging.Formatter( 

386 "[%(levelname)s]: %(message)s" 

387 ) 

388 

389 def __enter__(self) -> Self: 1a

390 self._target_logger = logging.getLogger(self.eavesdrop_on) 

391 self._original_level = self._target_logger.level 

392 self._target_logger.level = self.level 

393 self._target_logger.addHandler(self) 

394 self._lines = [] 

395 return self 

396 

397 def __exit__(self, *_: Any) -> None: 1a

398 if self._target_logger: 

399 self._target_logger.removeHandler(self) 

400 self._target_logger.level = self._original_level 

401 

402 def emit(self, record: LogRecord) -> None: 1a

403 """The logging.Handler implementation, not intended to be called directly.""" 

404 self._lines.append(self.format(record)) 

405 

406 def text(self) -> str: 1a

407 """Return the collected logs as a single newline-delimited string""" 

408 return "\n".join(self._lines)