Coverage for /usr/local/lib/python3.12/site-packages/prefect/utilities/services.py: 21%

71 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 13:38 +0000

1import threading 1a

2from collections import deque 1a

3from collections.abc import Coroutine 1a

4from logging import Logger 1a

5from traceback import format_exception 1a

6from types import TracebackType 1a

7from typing import Any, Callable, Optional 1a

8from wsgiref.simple_server import WSGIServer 1a

9 

10import anyio 1a

11import httpx 1a

12 

13from prefect.logging.loggers import get_logger 1a

14from prefect.settings import PREFECT_CLIENT_METRICS_ENABLED, PREFECT_CLIENT_METRICS_PORT 1a

15from prefect.utilities.collections import distinct 1a

16from prefect.utilities.math import clamped_poisson_interval 1a

17 

18logger: Logger = get_logger("utilities.services.critical_service_loop") 1a

19 

20 

21async def critical_service_loop( 1a

22 workload: Callable[..., Coroutine[Any, Any, Any]], 

23 interval: float, 

24 memory: int = 10, 

25 consecutive: int = 3, 

26 backoff: int = 1, 

27 printer: Callable[..., None] = print, 

28 run_once: bool = False, 

29 jitter_range: Optional[float] = None, 

30) -> None: 

31 """ 

32 Runs the given `workload` function on the specified `interval`, while being 

33 forgiving of intermittent issues like temporary HTTP errors. If more than a certain 

34 number of `consecutive` errors occur, print a summary of up to `memory` recent 

35 exceptions to `printer`, then begin backoff. 

36 

37 The loop will exit after reaching the consecutive error limit `backoff` times. 

38 On each backoff, the interval will be doubled. On a successful loop, the backoff 

39 will be reset. 

40 

41 Args: 

42 workload: the function to call 

43 interval: how frequently to call it 

44 memory: how many recent errors to remember 

45 consecutive: how many consecutive errors must we see before we begin backoff 

46 backoff: how many times we should allow consecutive errors before exiting 

47 printer: a `print`-like function where errors will be reported 

48 run_once: if set, the loop will only run once then return 

49 jitter_range: if set, the interval will be a random variable (rv) drawn from 

50 a clamped Poisson distribution where lambda = interval and the rv is bound 

51 between `interval * (1 - range) < rv < interval * (1 + range)` 

52 """ 

53 

54 track_record: deque[bool] = deque([True] * consecutive, maxlen=consecutive) 

55 failures: deque[tuple[Exception, Optional[TracebackType]]] = deque(maxlen=memory) 

56 backoff_count = 0 

57 

58 while True: 

59 try: 

60 workload_display_name = ( 

61 workload.__name__ if hasattr(workload, "__name__") else workload 

62 ) 

63 logger.debug(f"Starting run of {workload_display_name!r}") 

64 await workload() 

65 

66 # Reset the backoff count on success; we may want to consider resetting 

67 # this only if the track record is _all_ successful to avoid ending backoff 

68 # prematurely 

69 if backoff_count > 0: 

70 printer("Resetting backoff due to successful run.") 

71 backoff_count = 0 

72 

73 track_record.append(True) 

74 except httpx.TransportError as exc: 

75 # httpx.TransportError is the base class for any kind of communications 

76 # error, like timeouts, connection failures, etc. This does _not_ cover 

77 # routine HTTP error codes (even 5xx errors like 502/503) so this 

78 # handler should not be attempting to cover cases where the Prefect server 

79 # or Prefect Cloud is having an outage (which will be covered by the 

80 # exception clause below) 

81 track_record.append(False) 

82 failures.append((exc, exc.__traceback__)) 

83 logger.debug( 

84 f"Run of {workload!r} failed with TransportError", exc_info=exc 

85 ) 

86 except httpx.HTTPStatusError as exc: 

87 if exc.response.status_code >= 500: 

88 # 5XX codes indicate a potential outage of the Prefect API which is 

89 # likely to be temporary and transient. Don't quit over these unless 

90 # it is prolonged. 

91 track_record.append(False) 

92 failures.append((exc, exc.__traceback__)) 

93 logger.debug( 

94 f"Run of {workload!r} failed with HTTPStatusError", exc_info=exc 

95 ) 

96 else: 

97 raise 

98 

99 # Decide whether to exit now based on recent history. 

100 # 

101 # Given some typical background error rate of, say, 1%, we may still observe 

102 # quite a few errors in our recent samples, but this is not necessarily a cause 

103 # for concern. 

104 # 

105 # Imagine two distributions that could reflect our situation at any time: the 

106 # everything-is-fine distribution of errors, and the everything-is-on-fire 

107 # distribution of errors. We are trying to determine which of those two worlds 

108 # we are currently experiencing. We compare the likelihood that we'd draw N 

109 # consecutive errors from each. In the everything-is-fine distribution, that 

110 # would be a very low-probability occurrence, but in the everything-is-on-fire 

111 # distribution, that is a high-probability occurrence. 

112 # 

113 # Remarkably, we only need to look back for a small number of consecutive 

114 # errors to have reasonable confidence that this is indeed an anomaly. 

115 # @anticorrelator and @chrisguidry estimated that we should only need to look 

116 # back for 3 consecutive errors. 

117 if not any(track_record): 

118 # We've failed enough times to be sure something is wrong, the writing is 

119 # on the wall. Let's explain what we've seen and exit. 

120 printer( 

121 f"\nFailed the last {consecutive} attempts. " 

122 "Please check your environment and configuration." 

123 ) 

124 

125 printer("Examples of recent errors:\n") 

126 

127 failures_by_type = distinct( 

128 reversed(failures), 

129 key=lambda pair: type(pair[0]), # Group by the type of exception 

130 ) 

131 for exception, traceback in failures_by_type: 

132 printer("".join(format_exception(None, exception, traceback))) 

133 printer() 

134 

135 backoff_count += 1 

136 

137 if backoff_count >= backoff: 

138 raise RuntimeError("Service exceeded error threshold.") 

139 

140 # Reset the track record 

141 track_record.extend([True] * consecutive) 

142 failures.clear() 

143 printer( 

144 "Backing off due to consecutive errors, using increased interval of " 

145 f" {interval * 2**backoff_count}s." 

146 ) 

147 

148 if run_once: 

149 return 

150 

151 if jitter_range is not None: 

152 sleep = clamped_poisson_interval(interval, clamping_factor=jitter_range) 

153 else: 

154 sleep = interval * 2**backoff_count 

155 

156 await anyio.sleep(sleep) 

157 

158 

159_metrics_server: Optional[tuple[WSGIServer, threading.Thread]] = None 1a

160 

161 

162def start_client_metrics_server() -> None: 1a

163 """Start the process-wide Prometheus metrics server for client metrics (if enabled 

164 with `PREFECT_CLIENT_METRICS_ENABLED`) on the port `PREFECT_CLIENT_METRICS_PORT`.""" 

165 if not PREFECT_CLIENT_METRICS_ENABLED: 

166 return 

167 

168 global _metrics_server 

169 if _metrics_server: 

170 return 

171 

172 from prometheus_client import start_http_server 

173 

174 _metrics_server = start_http_server(port=PREFECT_CLIENT_METRICS_PORT.value()) 

175 

176 

177def stop_client_metrics_server() -> None: 1a

178 """Stop the process-wide Prometheus metrics server for client metrics, if it has 

179 previously been started""" 

180 global _metrics_server 

181 if _metrics_server: 

182 server, thread = _metrics_server 

183 server.shutdown() 

184 thread.join() 

185 _metrics_server = None