Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/services.py: 21%
71 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 10:48 +0000
1import threading 1a
2from collections import deque 1a
3from collections.abc import Coroutine 1a
4from logging import Logger 1a
5from traceback import format_exception 1a
6from types import TracebackType 1a
7from typing import Any, Callable, Optional 1a
8from wsgiref.simple_server import WSGIServer 1a
10import anyio 1a
11import httpx 1a
13from prefect.logging.loggers import get_logger 1a
14from prefect.settings import PREFECT_CLIENT_METRICS_ENABLED, PREFECT_CLIENT_METRICS_PORT 1a
15from prefect.utilities.collections import distinct 1a
16from prefect.utilities.math import clamped_poisson_interval 1a
18logger: Logger = get_logger("utilities.services.critical_service_loop") 1a
21async def critical_service_loop( 1a
22 workload: Callable[..., Coroutine[Any, Any, Any]],
23 interval: float,
24 memory: int = 10,
25 consecutive: int = 3,
26 backoff: int = 1,
27 printer: Callable[..., None] = print,
28 run_once: bool = False,
29 jitter_range: Optional[float] = None,
30) -> None:
31 """
32 Runs the given `workload` function on the specified `interval`, while being
33 forgiving of intermittent issues like temporary HTTP errors. If more than a certain
34 number of `consecutive` errors occur, print a summary of up to `memory` recent
35 exceptions to `printer`, then begin backoff.
37 The loop will exit after reaching the consecutive error limit `backoff` times.
38 On each backoff, the interval will be doubled. On a successful loop, the backoff
39 will be reset.
41 Args:
42 workload: the function to call
43 interval: how frequently to call it
44 memory: how many recent errors to remember
45 consecutive: how many consecutive errors must we see before we begin backoff
46 backoff: how many times we should allow consecutive errors before exiting
47 printer: a `print`-like function where errors will be reported
48 run_once: if set, the loop will only run once then return
49 jitter_range: if set, the interval will be a random variable (rv) drawn from
50 a clamped Poisson distribution where lambda = interval and the rv is bound
51 between `interval * (1 - range) < rv < interval * (1 + range)`
52 """
54 track_record: deque[bool] = deque([True] * consecutive, maxlen=consecutive)
55 failures: deque[tuple[Exception, Optional[TracebackType]]] = deque(maxlen=memory)
56 backoff_count = 0
58 while True:
59 try:
60 workload_display_name = (
61 workload.__name__ if hasattr(workload, "__name__") else workload
62 )
63 logger.debug(f"Starting run of {workload_display_name!r}")
64 await workload()
66 # Reset the backoff count on success; we may want to consider resetting
67 # this only if the track record is _all_ successful to avoid ending backoff
68 # prematurely
69 if backoff_count > 0:
70 printer("Resetting backoff due to successful run.")
71 backoff_count = 0
73 track_record.append(True)
74 except httpx.TransportError as exc:
75 # httpx.TransportError is the base class for any kind of communications
76 # error, like timeouts, connection failures, etc. This does _not_ cover
77 # routine HTTP error codes (even 5xx errors like 502/503) so this
78 # handler should not be attempting to cover cases where the Prefect server
79 # or Prefect Cloud is having an outage (which will be covered by the
80 # exception clause below)
81 track_record.append(False)
82 failures.append((exc, exc.__traceback__))
83 logger.debug(
84 f"Run of {workload!r} failed with TransportError", exc_info=exc
85 )
86 except httpx.HTTPStatusError as exc:
87 if exc.response.status_code >= 500:
88 # 5XX codes indicate a potential outage of the Prefect API which is
89 # likely to be temporary and transient. Don't quit over these unless
90 # it is prolonged.
91 track_record.append(False)
92 failures.append((exc, exc.__traceback__))
93 logger.debug(
94 f"Run of {workload!r} failed with HTTPStatusError", exc_info=exc
95 )
96 else:
97 raise
99 # Decide whether to exit now based on recent history.
100 #
101 # Given some typical background error rate of, say, 1%, we may still observe
102 # quite a few errors in our recent samples, but this is not necessarily a cause
103 # for concern.
104 #
105 # Imagine two distributions that could reflect our situation at any time: the
106 # everything-is-fine distribution of errors, and the everything-is-on-fire
107 # distribution of errors. We are trying to determine which of those two worlds
108 # we are currently experiencing. We compare the likelihood that we'd draw N
109 # consecutive errors from each. In the everything-is-fine distribution, that
110 # would be a very low-probability occurrence, but in the everything-is-on-fire
111 # distribution, that is a high-probability occurrence.
112 #
113 # Remarkably, we only need to look back for a small number of consecutive
114 # errors to have reasonable confidence that this is indeed an anomaly.
115 # @anticorrelator and @chrisguidry estimated that we should only need to look
116 # back for 3 consecutive errors.
117 if not any(track_record):
118 # We've failed enough times to be sure something is wrong, the writing is
119 # on the wall. Let's explain what we've seen and exit.
120 printer(
121 f"\nFailed the last {consecutive} attempts. "
122 "Please check your environment and configuration."
123 )
125 printer("Examples of recent errors:\n")
127 failures_by_type = distinct(
128 reversed(failures),
129 key=lambda pair: type(pair[0]), # Group by the type of exception
130 )
131 for exception, traceback in failures_by_type:
132 printer("".join(format_exception(None, exception, traceback)))
133 printer()
135 backoff_count += 1
137 if backoff_count >= backoff:
138 raise RuntimeError("Service exceeded error threshold.")
140 # Reset the track record
141 track_record.extend([True] * consecutive)
142 failures.clear()
143 printer(
144 "Backing off due to consecutive errors, using increased interval of "
145 f" {interval * 2**backoff_count}s."
146 )
148 if run_once:
149 return
151 if jitter_range is not None:
152 sleep = clamped_poisson_interval(interval, clamping_factor=jitter_range)
153 else:
154 sleep = interval * 2**backoff_count
156 await anyio.sleep(sleep)
159_metrics_server: Optional[tuple[WSGIServer, threading.Thread]] = None 1a
162def start_client_metrics_server() -> None: 1a
163 """Start the process-wide Prometheus metrics server for client metrics (if enabled
164 with `PREFECT_CLIENT_METRICS_ENABLED`) on the port `PREFECT_CLIENT_METRICS_PORT`."""
165 if not PREFECT_CLIENT_METRICS_ENABLED:
166 return
168 global _metrics_server
169 if _metrics_server:
170 return
172 from prometheus_client import start_http_server
174 _metrics_server = start_http_server(port=PREFECT_CLIENT_METRICS_PORT.value())
177def stop_client_metrics_server() -> None: 1a
178 """Stop the process-wide Prometheus metrics server for client metrics, if it has
179 previously been started"""
180 global _metrics_server
181 if _metrics_server:
182 server, thread = _metrics_server
183 server.shutdown()
184 thread.join()
185 _metrics_server = None