Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/services/actions.py: 50%

38 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 10:48 +0000

1from __future__ import annotations 1a

2 

3import asyncio 1a

4from typing import TYPE_CHECKING, NoReturn 1a

5 

6from prefect.logging import get_logger 1a

7from prefect.server.events import actions 1a

8from prefect.server.services.base import RunInEphemeralServers, Service 1a

9from prefect.server.utilities.messaging import Consumer, create_consumer 1a

10from prefect.server.utilities.messaging._consumer_names import ( 1a

11 generate_unique_consumer_name, 

12) 

13from prefect.settings.context import get_current_settings 1a

14from prefect.settings.models.server.services import ServicesBaseSetting 1a

15 

16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17 because the condition on line 16 was never true1a

17 import logging 

18 

19logger: "logging.Logger" = get_logger(__name__) 1a

20 

21 

22class Actions(RunInEphemeralServers, Service): 1a

23 """Runs the actions triggered by automations""" 

24 

25 consumer_task: asyncio.Task[None] | None = None 1a

26 

27 @classmethod 1a

28 def service_settings(cls) -> ServicesBaseSetting: 1a

29 return get_current_settings().server.services.triggers 1a

30 

31 async def start(self) -> NoReturn: 1a

32 assert self.consumer_task is None, "Actions already started" 

33 self.consumer: Consumer = create_consumer( 

34 "actions", name=generate_unique_consumer_name("actions") 

35 ) 

36 

37 async with actions.consumer() as handler: 

38 self.consumer_task = asyncio.create_task(self.consumer.run(handler)) 

39 logger.debug("Actions started") 

40 

41 try: 

42 await self.consumer_task 

43 except asyncio.CancelledError: 

44 pass 

45 

46 async def stop(self) -> None: 1a

47 assert self.consumer_task is not None, "Actions not started" 

48 self.consumer_task.cancel() 

49 try: 

50 await self.consumer_task 

51 except asyncio.CancelledError: 

52 pass 

53 finally: 

54 await self.consumer.cleanup() 

55 self.consumer_task = None 

56 logger.debug("Actions stopped")