Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/services/triggers.py: 75%
49 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import asyncio 1a
4from typing import TYPE_CHECKING, Any, NoReturn, Optional 1a
6from prefect.logging import get_logger 1a
7from prefect.server.events import triggers 1a
8from prefect.server.services.base import LoopService, RunInEphemeralServers, Service 1a
9from prefect.server.utilities.messaging import Consumer, create_consumer 1a
10from prefect.server.utilities.messaging._consumer_names import ( 1a
11 generate_unique_consumer_name,
12)
13from prefect.settings import PREFECT_EVENTS_PROACTIVE_GRANULARITY 1a
14from prefect.settings.context import get_current_settings 1a
15from prefect.settings.models.server.services import ServicesBaseSetting 1a
17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1a
18 import logging
21logger: "logging.Logger" = get_logger(__name__) 1a
24class ReactiveTriggers(RunInEphemeralServers, Service): 1a
25 """Evaluates reactive automation triggers"""
27 consumer_task: asyncio.Task[None] | None = None 1a
29 @classmethod 1a
30 def service_settings(cls) -> ServicesBaseSetting: 1a
31 return get_current_settings().server.services.triggers 1a
33 async def start(self) -> NoReturn: 1a
34 assert self.consumer_task is None, "Reactive triggers already started" 1a
35 consumer_name = generate_unique_consumer_name("reactive-triggers") 1a
36 logger.info( 1a
37 f"ReactiveTriggers starting with unique consumer name: {consumer_name}"
38 )
39 self.consumer: Consumer = create_consumer( 1a
40 "events", group="reactive-triggers", name=consumer_name
41 )
43 async with triggers.consumer() as handler: 1ab
44 self.consumer_task = asyncio.create_task(self.consumer.run(handler)) 1b
45 logger.debug("Reactive triggers started") 1b
47 try: 1b
48 await self.consumer_task 1b
49 except asyncio.CancelledError:
50 pass
52 async def stop(self) -> None: 1a
53 assert self.consumer_task is not None, "Reactive triggers not started"
54 self.consumer_task.cancel()
55 try:
56 await self.consumer_task
57 except asyncio.CancelledError:
58 pass
59 finally:
60 await self.consumer.cleanup()
61 self.consumer_task = None
62 logger.debug("Reactive triggers stopped")
65class ProactiveTriggers(RunInEphemeralServers, LoopService): 1a
66 """Evaluates proactive automation triggers"""
68 @classmethod 1a
69 def service_settings(cls) -> ServicesBaseSetting: 1a
70 return get_current_settings().server.services.triggers 1a
72 def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any): 1a
73 super().__init__( 1a
74 loop_seconds=(
75 loop_seconds
76 or PREFECT_EVENTS_PROACTIVE_GRANULARITY.value().total_seconds()
77 ),
78 **kwargs,
79 )
81 async def run_once(self) -> None: 1a
82 await triggers.evaluate_proactive_triggers() 1a