Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/services/base.py: 67%
157 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 11:21 +0000
1from __future__ import annotations 1a
3import abc 1a
4import asyncio 1a
5import inspect 1a
6import signal 1a
7from abc import ABC, abstractmethod 1a
8from contextlib import asynccontextmanager 1a
9from datetime import timedelta 1a
10from logging import Logger 1a
11from operator import methodcaller 1a
12from types import ModuleType 1a
13from typing import Any, AsyncGenerator, List, NoReturn, Optional, Sequence, overload 1a
15import anyio 1a
16from typing_extensions import Self 1a
18from prefect.logging.loggers import get_logger 1a
19from prefect.settings import PREFECT_API_LOG_RETRYABLE_ERRORS 1a
20from prefect.settings.models.root import canonical_environment_prefix 1a
21from prefect.settings.models.server.services import ServicesBaseSetting 1a
22from prefect.types._datetime import now 1a
23from prefect.utilities.processutils import ( 1a
24 _register_signal, # type: ignore[reportPrivateUsage]
25)
27logger: Logger = get_logger(__name__) 1a
30def _known_service_modules() -> list[ModuleType]: 1a
31 """Get list of Prefect server modules containing Service subclasses"""
32 from prefect.server.events import stream 1b
33 from prefect.server.events.services import ( 1b
34 actions,
35 event_logger,
36 event_persister,
37 triggers,
38 )
39 from prefect.server.logs import stream as logs_stream 1b
40 from prefect.server.services import ( 1b
41 cancellation_cleanup,
42 foreman,
43 late_runs,
44 pause_expirations,
45 repossessor,
46 scheduler,
47 task_run_recorder,
48 telemetry,
49 )
51 return [ 1b
52 # Orchestration services
53 cancellation_cleanup,
54 foreman,
55 late_runs,
56 pause_expirations,
57 repossessor,
58 scheduler,
59 task_run_recorder,
60 telemetry,
61 # Events services
62 event_logger,
63 event_persister,
64 triggers,
65 actions,
66 stream,
67 # Logs services
68 logs_stream,
69 ]
72class Service(ABC): 1a
73 name: str 1a
74 logger: Logger 1a
76 @classmethod 1a
77 @abstractmethod 1a
78 def service_settings(cls) -> ServicesBaseSetting: 1a
79 """The Prefect setting that controls whether the service is enabled"""
80 ...
82 @classmethod 1a
83 def environment_variable_name(cls) -> str: 1a
84 return canonical_environment_prefix(cls.service_settings()) + "ENABLED"
86 @classmethod 1a
87 def enabled(cls) -> bool: 1a
88 """Whether the service is enabled"""
89 return cls.service_settings().enabled 1b
91 @classmethod 1a
92 def all_services(cls) -> Sequence[type[Self]]: 1a
93 """Get list of all service classes"""
94 discovered: list[type[Self]] = [] 1b
95 for module in _known_service_modules(): 1b
96 for _, obj in inspect.getmembers(module): 1b
97 if (
98 inspect.isclass(obj)
99 and issubclass(obj, cls)
100 and not inspect.isabstract(obj)
101 ):
102 discovered.append(obj) 1b
103 return discovered 1b
105 @classmethod 1a
106 def enabled_services(cls) -> list[type[Self]]: 1a
107 """Get list of enabled service classes"""
108 return [svc for svc in cls.all_services() if svc.enabled()] 1b
110 @classmethod 1a
111 @asynccontextmanager 1a
112 async def running(cls) -> AsyncGenerator[None, None]: 1a
113 """A context manager that runs enabled services on entry and stops them on
114 exit."""
115 service_tasks: dict[Service, asyncio.Task[None]] = {} 1b
116 for service_class in cls.enabled_services(): 1b
117 service = service_class() 1b
118 service_tasks[service] = asyncio.create_task(service.start()) 1b
120 try: 1b
121 yield 1b
122 finally:
123 await asyncio.gather(*[service.stop() for service in service_tasks])
124 await asyncio.gather(*service_tasks.values(), return_exceptions=True)
126 @classmethod 1a
127 async def run_services(cls) -> NoReturn: 1a
128 """Run enabled services until cancelled."""
129 async with cls.running():
130 heat_death_of_the_universe = asyncio.get_running_loop().create_future()
131 try:
132 await heat_death_of_the_universe
133 except asyncio.CancelledError:
134 logger.info("Received cancellation, stopping services...")
136 @abstractmethod 1a
137 async def start(self) -> NoReturn: 1a
138 """Start running the service, which may run indefinitely"""
139 ...
141 @abstractmethod 1a
142 async def stop(self) -> None: 1a
143 """Stop the service"""
144 ...
146 def __init__(self): 1a
147 self.name = self.__class__.__name__ 1b
148 self.logger = get_logger(f"server.services.{self.name.lower()}") 1b
151class RunInEphemeralServers(Service, abc.ABC): 1a
152 """
153 A marker class for services that should run even when running an ephemeral server
154 """
156 pass 1a
159class RunInWebservers(Service, abc.ABC): 1a
160 """
161 A marker class for services that should run when running a webserver
162 """
164 pass 1a
167class LoopService(Service, abc.ABC): 1a
168 """
169 Loop services are relatively lightweight maintenance routines that need to run
170 periodically.
172 This class makes it straightforward to design and integrate them. Users only need to
173 define the `run_once` coroutine to describe the behavior of the service on each
174 loop.
175 """
177 loop_seconds = 60 1a
179 def __init__( 1a
180 self, loop_seconds: Optional[float] = None, handle_signals: bool = False
181 ):
182 """
183 Args:
184 loop_seconds (float): if provided, overrides the loop interval
185 otherwise specified as a class variable
186 handle_signals (bool): if True, SIGINT and SIGTERM are
187 gracefully intercepted and shut down the running service.
188 """
189 super().__init__() 1b
191 if loop_seconds: 1b
192 self.loop_seconds: float = loop_seconds # seconds between runs 1b
193 self._should_stop: bool = ( 1b
194 False # flag for whether the service should stop running
195 )
196 self._is_running: bool = False # flag for whether the service is running 1b
198 if handle_signals: 198 ↛ 199line 198 didn't jump to line 199 because the condition on line 198 was never true1b
199 _register_signal(signal.SIGINT, self._stop)
200 _register_signal(signal.SIGTERM, self._stop)
202 async def _on_start(self) -> None: 1a
203 """
204 Called prior to running the service
205 """
206 self._should_stop = False 1b
207 self._is_running = True 1b
208 self.logger.debug(f"Starting {self.name}") 1b
210 async def _on_stop(self) -> None: 1a
211 """
212 Called after running the service
213 """
214 self._is_running = False
215 self.logger.debug(f"Stopped {self.name}")
217 @overload 1a
218 async def start(self, loops: None = None) -> NoReturn: 1a
219 """
220 Run the service indefinitely.
221 """
222 ...
224 @overload 1ac
225 async def start(self, loops: int) -> None: 1a
226 """
227 Run the service `loops` time.
229 Args:
230 loops (int): the number of loops to run before exiting.
231 """
232 ...
234 async def start(self, loops: int | None = None) -> None | NoReturn: 1a
235 """
236 Run the service `loops` time. Pass loops=None to run forever.
238 Args:
239 loops (int, optional): the number of loops to run before exiting.
240 """
241 await self._on_start() 1b
243 i = 0 1b
244 while not self._should_stop: 244 ↛ 296line 244 didn't jump to line 296 because the condition on line 244 was always true1b
245 start_time = now("UTC") 1b
247 try: 1b
248 self.logger.debug(f"About to run {self.name}...") 1b
249 await self.run_once() 1bcd
251 except asyncio.CancelledError:
252 self.logger.info(f"Received cancellation signal for {self.name}")
253 raise
255 except Exception as exc:
256 # avoid circular import
257 from prefect.server.api.server import is_client_retryable_exception
259 retryable_error = is_client_retryable_exception(exc)
260 if not retryable_error or (
261 retryable_error and PREFECT_API_LOG_RETRYABLE_ERRORS.value()
262 ):
263 self.logger.error(
264 f"Unexpected error in: {repr(exc)}", exc_info=True
265 )
267 end_time = now("UTC") 1bcd
269 # if service took longer than its loop interval, log a warning
270 # that the interval might be too short
271 if (end_time - start_time).total_seconds() > self.loop_seconds: 271 ↛ 272line 271 didn't jump to line 272 because the condition on line 271 was never true1bcd
272 self.logger.warning(
273 f"{self.name} took {(end_time - start_time).total_seconds()} seconds"
274 " to run, which is longer than its loop interval of"
275 f" {self.loop_seconds} seconds."
276 )
278 # check if early stopping was requested
279 i += 1 1bcd
280 if loops is not None and i == loops: 280 ↛ 281line 280 didn't jump to line 281 because the condition on line 280 was never true1bcd
281 self.logger.debug(f"{self.name} exiting after {loops} loop(s).")
282 await self.stop(block=False)
284 # next run is every "loop seconds" after each previous run *started*.
285 # note that if the loop took unexpectedly long, the "next_run" time
286 # might be in the past, which will result in an instant start
287 next_run = max( 1bcd
288 start_time + timedelta(seconds=self.loop_seconds), now("UTC")
289 )
290 self.logger.debug(f"Finished running {self.name}. Next run at {next_run}") 1bc
292 # check the `_should_stop` flag every 1 seconds until the next run time is reached
293 while now("UTC") < next_run and not self._should_stop: 293 ↛ 244line 293 didn't jump to line 244 because the condition on line 293 was always true1bc
294 await asyncio.sleep(min(1, (next_run - now("UTC")).total_seconds())) 1bc
296 await self._on_stop()
298 async def stop(self, block: bool = True) -> None: 1a
299 """
300 Gracefully stops a running LoopService and optionally blocks until the
301 service stops.
303 Args:
304 block (bool): if True, blocks until the service is
305 finished running. Otherwise it requests a stop and returns but
306 the service may still be running a final loop.
308 """
309 self.logger.debug(f"Stopping {self.name}...")
310 self._stop()
312 if block:
313 # if block=True, sleep until the service stops running,
314 # but no more than `loop_seconds` to avoid a deadlock
315 with anyio.move_on_after(self.loop_seconds):
316 while self._is_running:
317 await asyncio.sleep(0.1)
319 # if the service is still running after `loop_seconds`, something's wrong
320 if self._is_running:
321 self.logger.warning(
322 f"`stop(block=True)` was called on {self.name} but more than one"
323 f" loop interval ({self.loop_seconds} seconds) has passed. This"
324 " usually means something is wrong. If `stop()` was called from"
325 " inside the loop service, use `stop(block=False)` instead."
326 )
328 def _stop(self, *_: Any) -> None: 1a
329 """
330 Private, synchronous method for setting the `_should_stop` flag. Takes arbitrary
331 arguments so it can be used as a signal handler.
332 """
333 self._should_stop = True
335 @abstractmethod 1a
336 async def run_once(self) -> None: 1a
337 """
338 Represents one loop of the service.
340 Subclasses must override this method.
342 To actually run the service once, call `LoopService().start(loops=1)`
343 instead of `LoopService().run_once()`, because this method will not invoke setup
344 and teardown methods properly.
345 """
346 ...
349async def run_multiple_services(loop_services: List[LoopService]) -> NoReturn: 1a
350 """
351 Only one signal handler can be active at a time, so this function takes a list
352 of loop services and runs all of them with a global signal handler.
353 """
355 def stop_all_services(*_: Any) -> None:
356 for service in loop_services:
357 stop = methodcaller("_stop")
358 stop(service)
360 signal.signal(signal.SIGINT, stop_all_services)
361 signal.signal(signal.SIGTERM, stop_all_services)
362 await asyncio.gather(*[service.start() for service in loop_services])