Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/services/triggers.py: 75%

49 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 11:21 +0000

1from __future__ import annotations 1a

2 

3import asyncio 1a

4from typing import TYPE_CHECKING, Any, NoReturn, Optional 1a

5 

6from prefect.logging import get_logger 1a

7from prefect.server.events import triggers 1a

8from prefect.server.services.base import LoopService, RunInEphemeralServers, Service 1a

9from prefect.server.utilities.messaging import Consumer, create_consumer 1a

10from prefect.server.utilities.messaging._consumer_names import ( 1a

11 generate_unique_consumer_name, 

12) 

13from prefect.settings import PREFECT_EVENTS_PROACTIVE_GRANULARITY 1a

14from prefect.settings.context import get_current_settings 1a

15from prefect.settings.models.server.services import ServicesBaseSetting 1a

16 

17if TYPE_CHECKING: 17 ↛ 18line 17 didn't jump to line 18 because the condition on line 17 was never true1a

18 import logging 

19 

20 

21logger: "logging.Logger" = get_logger(__name__) 1a

22 

23 

24class ReactiveTriggers(RunInEphemeralServers, Service): 1a

25 """Evaluates reactive automation triggers""" 

26 

27 consumer_task: asyncio.Task[None] | None = None 1a

28 

29 @classmethod 1a

30 def service_settings(cls) -> ServicesBaseSetting: 1a

31 return get_current_settings().server.services.triggers 1a

32 

33 async def start(self) -> NoReturn: 1a

34 assert self.consumer_task is None, "Reactive triggers already started" 1a

35 consumer_name = generate_unique_consumer_name("reactive-triggers") 1a

36 logger.info( 1a

37 f"ReactiveTriggers starting with unique consumer name: {consumer_name}" 

38 ) 

39 self.consumer: Consumer = create_consumer( 1a

40 "events", group="reactive-triggers", name=consumer_name 

41 ) 

42 

43 async with triggers.consumer() as handler: 1ab

44 self.consumer_task = asyncio.create_task(self.consumer.run(handler)) 1b

45 logger.debug("Reactive triggers started") 1b

46 

47 try: 1b

48 await self.consumer_task 1b

49 except asyncio.CancelledError: 

50 pass 

51 

52 async def stop(self) -> None: 1a

53 assert self.consumer_task is not None, "Reactive triggers not started" 

54 self.consumer_task.cancel() 

55 try: 

56 await self.consumer_task 

57 except asyncio.CancelledError: 

58 pass 

59 finally: 

60 await self.consumer.cleanup() 

61 self.consumer_task = None 

62 logger.debug("Reactive triggers stopped") 

63 

64 

65class ProactiveTriggers(RunInEphemeralServers, LoopService): 1a

66 """Evaluates proactive automation triggers""" 

67 

68 @classmethod 1a

69 def service_settings(cls) -> ServicesBaseSetting: 1a

70 return get_current_settings().server.services.triggers 1a

71 

72 def __init__(self, loop_seconds: Optional[float] = None, **kwargs: Any): 1a

73 super().__init__( 1a

74 loop_seconds=( 

75 loop_seconds 

76 or PREFECT_EVENTS_PROACTIVE_GRANULARITY.value().total_seconds() 

77 ), 

78 **kwargs, 

79 ) 

80 

81 async def run_once(self) -> None: 1a

82 await triggers.evaluate_proactive_triggers() 1a