Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/base.py: 67%

157 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import abc 1a

4import asyncio 1a

5import inspect 1a

6import signal 1a

7from abc import ABC, abstractmethod 1a

8from contextlib import asynccontextmanager 1a

9from datetime import timedelta 1a

10from logging import Logger 1a

11from operator import methodcaller 1a

12from types import ModuleType 1a

13from typing import Any, AsyncGenerator, List, NoReturn, Optional, Sequence, overload 1a

14 

15import anyio 1a

16from typing_extensions import Self 1a

17 

18from prefect.logging.loggers import get_logger 1a

19from prefect.settings import PREFECT_API_LOG_RETRYABLE_ERRORS 1a

20from prefect.settings.models.root import canonical_environment_prefix 1a

21from prefect.settings.models.server.services import ServicesBaseSetting 1a

22from prefect.types._datetime import now 1a

23from prefect.utilities.processutils import ( 1a

24 _register_signal, # type: ignore[reportPrivateUsage] 

25) 

26 

27logger: Logger = get_logger(__name__) 1a

28 

29 

30def _known_service_modules() -> list[ModuleType]: 1a

31 """Get list of Prefect server modules containing Service subclasses""" 

32 from prefect.server.events import stream 1b

33 from prefect.server.events.services import ( 1b

34 actions, 

35 event_logger, 

36 event_persister, 

37 triggers, 

38 ) 

39 from prefect.server.logs import stream as logs_stream 1b

40 from prefect.server.services import ( 1b

41 cancellation_cleanup, 

42 foreman, 

43 late_runs, 

44 pause_expirations, 

45 repossessor, 

46 scheduler, 

47 task_run_recorder, 

48 telemetry, 

49 ) 

50 

51 return [ 1b

52 # Orchestration services 

53 cancellation_cleanup, 

54 foreman, 

55 late_runs, 

56 pause_expirations, 

57 repossessor, 

58 scheduler, 

59 task_run_recorder, 

60 telemetry, 

61 # Events services 

62 event_logger, 

63 event_persister, 

64 triggers, 

65 actions, 

66 stream, 

67 # Logs services 

68 logs_stream, 

69 ] 

70 

71 

72class Service(ABC): 1a

73 name: str 1a

74 logger: Logger 1a

75 

76 @classmethod 1a

77 @abstractmethod 1a

78 def service_settings(cls) -> ServicesBaseSetting: 1a

79 """The Prefect setting that controls whether the service is enabled""" 

80 ... 

81 

82 @classmethod 1a

83 def environment_variable_name(cls) -> str: 1a

84 return canonical_environment_prefix(cls.service_settings()) + "ENABLED" 

85 

86 @classmethod 1a

87 def enabled(cls) -> bool: 1a

88 """Whether the service is enabled""" 

89 return cls.service_settings().enabled 1b

90 

91 @classmethod 1a

92 def all_services(cls) -> Sequence[type[Self]]: 1a

93 """Get list of all service classes""" 

94 discovered: list[type[Self]] = [] 1b

95 for module in _known_service_modules(): 1b

96 for _, obj in inspect.getmembers(module): 1b

97 if ( 

98 inspect.isclass(obj) 

99 and issubclass(obj, cls) 

100 and not inspect.isabstract(obj) 

101 ): 

102 discovered.append(obj) 1b

103 return discovered 1b

104 

105 @classmethod 1a

106 def enabled_services(cls) -> list[type[Self]]: 1a

107 """Get list of enabled service classes""" 

108 return [svc for svc in cls.all_services() if svc.enabled()] 1b

109 

110 @classmethod 1a

111 @asynccontextmanager 1a

112 async def running(cls) -> AsyncGenerator[None, None]: 1a

113 """A context manager that runs enabled services on entry and stops them on 

114 exit.""" 

115 service_tasks: dict[Service, asyncio.Task[None]] = {} 1b

116 for service_class in cls.enabled_services(): 1b

117 service = service_class() 1b

118 service_tasks[service] = asyncio.create_task(service.start()) 1b

119 

120 try: 1b

121 yield 1b

122 finally: 

123 await asyncio.gather(*[service.stop() for service in service_tasks]) 

124 await asyncio.gather(*service_tasks.values(), return_exceptions=True) 

125 

126 @classmethod 1a

127 async def run_services(cls) -> NoReturn: 1a

128 """Run enabled services until cancelled.""" 

129 async with cls.running(): 

130 heat_death_of_the_universe = asyncio.get_running_loop().create_future() 

131 try: 

132 await heat_death_of_the_universe 

133 except asyncio.CancelledError: 

134 logger.info("Received cancellation, stopping services...") 

135 

136 @abstractmethod 1a

137 async def start(self) -> NoReturn: 1a

138 """Start running the service, which may run indefinitely""" 

139 ... 

140 

141 @abstractmethod 1a

142 async def stop(self) -> None: 1a

143 """Stop the service""" 

144 ... 

145 

146 def __init__(self): 1a

147 self.name = self.__class__.__name__ 1b

148 self.logger = get_logger(f"server.services.{self.name.lower()}") 1b

149 

150 

151class RunInEphemeralServers(Service, abc.ABC): 1a

152 """ 

153 A marker class for services that should run even when running an ephemeral server 

154 """ 

155 

156 pass 1a

157 

158 

159class RunInWebservers(Service, abc.ABC): 1a

160 """ 

161 A marker class for services that should run when running a webserver 

162 """ 

163 

164 pass 1a

165 

166 

167class LoopService(Service, abc.ABC): 1a

168 """ 

169 Loop services are relatively lightweight maintenance routines that need to run 

170 periodically. 

171 

172 This class makes it straightforward to design and integrate them. Users only need to 

173 define the `run_once` coroutine to describe the behavior of the service on each 

174 loop. 

175 """ 

176 

177 loop_seconds = 60 1a

178 

179 def __init__( 1a

180 self, loop_seconds: Optional[float] = None, handle_signals: bool = False 

181 ): 

182 """ 

183 Args: 

184 loop_seconds (float): if provided, overrides the loop interval 

185 otherwise specified as a class variable 

186 handle_signals (bool): if True, SIGINT and SIGTERM are 

187 gracefully intercepted and shut down the running service. 

188 """ 

189 super().__init__() 1b

190 

191 if loop_seconds: 1b

192 self.loop_seconds: float = loop_seconds # seconds between runs 1b

193 self._should_stop: bool = ( 1b

194 False # flag for whether the service should stop running 

195 ) 

196 self._is_running: bool = False # flag for whether the service is running 1b

197 

198 if handle_signals: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true1b

199 _register_signal(signal.SIGINT, self._stop) 

200 _register_signal(signal.SIGTERM, self._stop) 

201 

202 async def _on_start(self) -> None: 1a

203 """ 

204 Called prior to running the service 

205 """ 

206 self._should_stop = False 1b

207 self._is_running = True 1b

208 self.logger.debug(f"Starting {self.name}") 1b

209 

210 async def _on_stop(self) -> None: 1a

211 """ 

212 Called after running the service 

213 """ 

214 self._is_running = False 

215 self.logger.debug(f"Stopped {self.name}") 

216 

217 @overload 1a

218 async def start(self, loops: None = None) -> NoReturn: 1a

219 """ 

220 Run the service indefinitely. 

221 """ 

222 ... 

223 

224 @overload 1ac

225 async def start(self, loops: int) -> None: 1a

226 """ 

227 Run the service `loops` time. 

228 

229 Args: 

230 loops (int): the number of loops to run before exiting. 

231 """ 

232 ... 

233 

234 async def start(self, loops: int | None = None) -> None | NoReturn: 1a

235 """ 

236 Run the service `loops` time. Pass loops=None to run forever. 

237 

238 Args: 

239 loops (int, optional): the number of loops to run before exiting. 

240 """ 

241 await self._on_start() 1b

242 

243 i = 0 1b

244 while not self._should_stop: 244 ↛ 296line 244 didn't jump to line 296 because the condition on line 244 was always true1b

245 start_time = now("UTC") 1b

246 

247 try: 1b

248 self.logger.debug(f"About to run {self.name}...") 1b

249 await self.run_once() 1bcd

250 

251 except asyncio.CancelledError: 

252 self.logger.info(f"Received cancellation signal for {self.name}") 

253 raise 

254 

255 except Exception as exc: 

256 # avoid circular import 

257 from prefect.server.api.server import is_client_retryable_exception 

258 

259 retryable_error = is_client_retryable_exception(exc) 

260 if not retryable_error or ( 

261 retryable_error and PREFECT_API_LOG_RETRYABLE_ERRORS.value() 

262 ): 

263 self.logger.error( 

264 f"Unexpected error in: {repr(exc)}", exc_info=True 

265 ) 

266 

267 end_time = now("UTC") 1bcd

268 

269 # if service took longer than its loop interval, log a warning 

270 # that the interval might be too short 

271 if (end_time - start_time).total_seconds() > self.loop_seconds: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true1bcd

272 self.logger.warning( 

273 f"{self.name} took {(end_time - start_time).total_seconds()} seconds" 

274 " to run, which is longer than its loop interval of" 

275 f" {self.loop_seconds} seconds." 

276 ) 

277 

278 # check if early stopping was requested 

279 i += 1 1bcd

280 if loops is not None and i == loops: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true1bcd

281 self.logger.debug(f"{self.name} exiting after {loops} loop(s).") 

282 await self.stop(block=False) 

283 

284 # next run is every "loop seconds" after each previous run *started*. 

285 # note that if the loop took unexpectedly long, the "next_run" time 

286 # might be in the past, which will result in an instant start 

287 next_run = max( 1bcd

288 start_time + timedelta(seconds=self.loop_seconds), now("UTC") 

289 ) 

290 self.logger.debug(f"Finished running {self.name}. Next run at {next_run}") 1bc

291 

292 # check the `_should_stop` flag every 1 seconds until the next run time is reached 

293 while now("UTC") < next_run and not self._should_stop: 293 ↛ 244line 293 didn't jump to line 244 because the condition on line 293 was always true1bc

294 await asyncio.sleep(min(1, (next_run - now("UTC")).total_seconds())) 1bc

295 

296 await self._on_stop() 

297 

298 async def stop(self, block: bool = True) -> None: 1a

299 """ 

300 Gracefully stops a running LoopService and optionally blocks until the 

301 service stops. 

302 

303 Args: 

304 block (bool): if True, blocks until the service is 

305 finished running. Otherwise it requests a stop and returns but 

306 the service may still be running a final loop. 

307 

308 """ 

309 self.logger.debug(f"Stopping {self.name}...") 

310 self._stop() 

311 

312 if block: 

313 # if block=True, sleep until the service stops running, 

314 # but no more than `loop_seconds` to avoid a deadlock 

315 with anyio.move_on_after(self.loop_seconds): 

316 while self._is_running: 

317 await asyncio.sleep(0.1) 

318 

319 # if the service is still running after `loop_seconds`, something's wrong 

320 if self._is_running: 

321 self.logger.warning( 

322 f"`stop(block=True)` was called on {self.name} but more than one" 

323 f" loop interval ({self.loop_seconds} seconds) has passed. This" 

324 " usually means something is wrong. If `stop()` was called from" 

325 " inside the loop service, use `stop(block=False)` instead." 

326 ) 

327 

328 def _stop(self, *_: Any) -> None: 1a

329 """ 

330 Private, synchronous method for setting the `_should_stop` flag. Takes arbitrary 

331 arguments so it can be used as a signal handler. 

332 """ 

333 self._should_stop = True 

334 

335 @abstractmethod 1a

336 async def run_once(self) -> None: 1a

337 """ 

338 Represents one loop of the service. 

339 

340 Subclasses must override this method. 

341 

342 To actually run the service once, call `LoopService().start(loops=1)` 

343 instead of `LoopService().run_once()`, because this method will not invoke setup 

344 and teardown methods properly. 

345 """ 

346 ... 

347 

348 

349async def run_multiple_services(loop_services: List[LoopService]) -> NoReturn: 1a

350 """ 

351 Only one signal handler can be active at a time, so this function takes a list 

352 of loop services and runs all of them with a global signal handler. 

353 """ 

354 

355 def stop_all_services(*_: Any) -> None: 

356 for service in loop_services: 

357 stop = methodcaller("_stop") 

358 stop(service) 

359 

360 signal.signal(signal.SIGINT, stop_all_services) 

361 signal.signal(signal.SIGTERM, stop_all_services) 

362 await asyncio.gather(*[service.start() for service in loop_services])