Coverage for /usr/local/lib/python3.12/site-packages/prefect/server/events/services/actions.py: 50%
38 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 13:38 +0000
1from __future__ import annotations 1a
3import asyncio 1a
4from typing import TYPE_CHECKING, NoReturn 1a
6from prefect.logging import get_logger 1a
7from prefect.server.events import actions 1a
8from prefect.server.services.base import RunInEphemeralServers, Service 1a
9from prefect.server.utilities.messaging import Consumer, create_consumer 1a
10from prefect.server.utilities.messaging._consumer_names import ( 1a
11 generate_unique_consumer_name,
12)
13from prefect.settings.context import get_current_settings 1a
14from prefect.settings.models.server.services import ServicesBaseSetting 1a
16if TYPE_CHECKING: 16 ↛ 17line 16 didn't jump to line 17 because the condition on line 16 was never true1a
17 import logging
19logger: "logging.Logger" = get_logger(__name__) 1a
22class Actions(RunInEphemeralServers, Service): 1a
23 """Runs the actions triggered by automations"""
25 consumer_task: asyncio.Task[None] | None = None 1a
27 @classmethod 1a
28 def service_settings(cls) -> ServicesBaseSetting: 1a
29 return get_current_settings().server.services.triggers 1a
31 async def start(self) -> NoReturn: 1a
32 assert self.consumer_task is None, "Actions already started"
33 self.consumer: Consumer = create_consumer(
34 "actions", name=generate_unique_consumer_name("actions")
35 )
37 async with actions.consumer() as handler:
38 self.consumer_task = asyncio.create_task(self.consumer.run(handler))
39 logger.debug("Actions started")
41 try:
42 await self.consumer_task
43 except asyncio.CancelledError:
44 pass
46 async def stop(self) -> None: 1a
47 assert self.consumer_task is not None, "Actions not started"
48 self.consumer_task.cancel()
49 try:
50 await self.consumer_task
51 except asyncio.CancelledError:
52 pass
53 finally:
54 await self.consumer.cleanup()
55 self.consumer_task = None
56 logger.debug("Actions stopped")