Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/event_bus_service/event_bus_listeners.py: 81%
97 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:32 +0000
1import contextlib 1a
2import json 1a
3from abc import ABC, abstractmethod 1a
4from collections.abc import Generator 1a
5from datetime import UTC, datetime 1a
6from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit 1a
8from fastapi.encoders import jsonable_encoder 1a
9from pydantic import UUID4 1a
10from sqlalchemy import select 1a
11from sqlalchemy.orm.session import Session 1a
13from mealie.db.db_setup import session_context 1a
14from mealie.db.models.household.webhooks import GroupWebhooksModel 1a
15from mealie.repos.repository_factory import AllRepositories 1a
16from mealie.schema.household.group_events import GroupEventNotifierPrivate 1a
17from mealie.schema.household.webhook import ReadWebhook 1a
19from .event_types import Event, EventDocumentType, EventTypes, EventWebhookData 1a
20from .publisher import ApprisePublisher, PublisherLike, WebhookPublisher 1a
23class EventListenerBase(ABC): 1a
24 _session: Session | None = None 1a
25 _repos: AllRepositories | None = None 1a
27 def __init__(self, group_id: UUID4, household_id: UUID4, publisher: PublisherLike) -> None: 1a
28 self.group_id = group_id 1gflhijbdekc
29 self.household_id = household_id 1gflhijbdekc
30 self.publisher = publisher 1gflhijbdekc
31 self._session = None 1gflhijbdekc
32 self._repos = None 1gflhijbdekc
34 @abstractmethod 1a
35 def get_subscribers(self, event: Event) -> list: 1a
36 """Get a list of all subscribers to this event"""
37 ...
39 @abstractmethod 1a
40 def publish_to_subscribers(self, event: Event, subscribers: list) -> None: 1a
41 """Publishes the event to all subscribers"""
42 ...
44 @contextlib.contextmanager 1a
45 def ensure_session(self) -> Generator[Session, None, None]: 1a
46 """
47 ensure_session ensures that a session is available for the caller by checking if a session
48 was provided during construction, and if not, creating a new session with the `with_session`
49 function and closing it when the context manager exits.
51 This is _required_ when working with sessions inside an event bus listener where the listener
52 may be constructed during a request where the session is provided by the request, but the when
53 run as a scheduled task, the session is not provided and must be created.
54 """
55 if self._session is None: 1gfhijbdekc
56 with session_context() as session: 1gfhijbdekc
57 self._session = session 1gfhijbdekc
58 yield self._session 1gfhijbdekc
59 else:
60 yield self._session 1bc
62 @contextlib.contextmanager 1a
63 def ensure_repos(self, group_id: UUID4, household_id: UUID4) -> Generator[AllRepositories, None, None]: 1a
64 if self._repos is None: 64 ↛ 69line 64 didn't jump to line 69 because the condition on line 64 was always true1gfhijbdekc
65 with self.ensure_session() as session: 1gfhijbdekc
66 self._repos = AllRepositories(session, group_id=group_id, household_id=household_id) 1gfhijbdekc
67 yield self._repos 1gfhijbdekc
68 else:
69 yield self._repos
72class AppriseEventListener(EventListenerBase): 1a
73 def __init__(self, group_id: UUID4, household_id: UUID4) -> None: 1a
74 super().__init__(group_id, household_id, ApprisePublisher()) 1gflhijbdekc
76 def get_subscribers(self, event: Event) -> list[str]: 1a
77 with self.ensure_repos(self.group_id, self.household_id) as repos: 1gfhijbdekc
78 notifiers: list[GroupEventNotifierPrivate] = repos.group_event_notifier.multi_query( 1gfhijbdekc
79 {"enabled": True}, override_schema=GroupEventNotifierPrivate
80 )
82 urls = [notifier.apprise_url for notifier in notifiers if getattr(notifier.options, event.event_type.name)] 1gfhijbdekc
83 urls = AppriseEventListener.update_urls_with_event_data(urls, event) 1gfhijbdekc
85 return urls 1gfhijbdekc
87 def publish_to_subscribers(self, event: Event, subscribers: list[str]) -> None: 1a
88 self.publisher.publish(event, subscribers)
90 @staticmethod 1a
91 def update_urls_with_event_data(urls: list[str], event: Event): 1a
92 params = { 1gfhijbdekc
93 "event_type": event.event_type.name,
94 "integration_id": event.integration_id,
95 "document_data": json.dumps(jsonable_encoder(event.document_data)),
96 "event_id": str(event.event_id),
97 "timestamp": event.timestamp.isoformat() if event.timestamp else None,
98 }
100 return [ 1gfhijbdekc
101 # We use query params to add custom key: value pairs to the Apprise payload by prepending the key with ":".
102 (
103 AppriseEventListener.merge_query_parameters(url, {f":{k}": v for k, v in params.items()})
104 # only certain endpoints support the custom key: value pairs, so we only apply them to those endpoints
105 if AppriseEventListener.is_custom_url(url)
106 else url
107 )
108 for url in urls
109 ]
111 @staticmethod 1a
112 def merge_query_parameters(url: str, params: dict): 1a
113 scheme, netloc, path, query_string, fragment = urlsplit(url)
115 # merge query params
116 query_params = parse_qs(query_string)
117 query_params.update(params)
118 new_query_string = urlencode(query_params, doseq=True)
120 return urlunsplit((scheme, netloc, path, new_query_string, fragment))
122 @staticmethod 1a
123 def is_custom_url(url: str): 1a
124 return url.split(":", 1)[0].lower() in [
125 "form",
126 "forms",
127 "json",
128 "jsons",
129 "xml",
130 "xmls",
131 ]
134class WebhookEventListener(EventListenerBase): 1a
135 def __init__(self, group_id: UUID4, household_id: UUID4) -> None: 1a
136 super().__init__(group_id, household_id, WebhookPublisher()) 1gfhijbdekc
138 def get_subscribers(self, event: Event) -> list[ReadWebhook]: 1a
139 # we only care about events that contain webhook information
140 if not (event.event_type == EventTypes.webhook_task and isinstance(event.document_data, EventWebhookData)): 1gfhijbdekc
141 return [] 1ghijbdekc
143 scheduled_webhooks = self.get_scheduled_webhooks( 1fbdec
144 event.document_data.webhook_start_dt, event.document_data.webhook_end_dt
145 )
146 return scheduled_webhooks 1fbdec
148 def publish_to_subscribers(self, event: Event, subscribers: list[ReadWebhook]) -> None: 1a
149 with self.ensure_repos(self.group_id, self.household_id) as repos: 1bc
150 if not isinstance(event.document_data, EventWebhookData): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true1bc
151 return
153 match event.document_data.document_type: 1bc
154 case EventDocumentType.mealplan: 154 ↛ 160line 154 didn't jump to line 160 because the pattern on line 154 always matched1bc
155 meal_repo = repos.meals 1bc
156 meal_data = meal_repo.get_meals_by_date_range( 1bc
157 event.document_data.webhook_start_dt, event.document_data.webhook_end_dt
158 )
159 event.document_data.webhook_body = meal_data or None 1bc
160 case _:
161 if event.event_type is EventTypes.test_message:
162 # make sure the webhook has a valid body so it gets sent
163 event.document_data.webhook_body = event.document_data.webhook_body or []
165 # Only publish to subscribers if we have a webhook body to send
166 if event.document_data.webhook_body is not None: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true1bc
167 self.publisher.publish(event, [webhook.url for webhook in subscribers])
169 def get_scheduled_webhooks(self, start_dt: datetime, end_dt: datetime) -> list[ReadWebhook]: 1a
170 """Fetches all scheduled webhooks from the database"""
171 with self.ensure_session() as session: 1fbdec
172 stmt = select(GroupWebhooksModel).where( 1fbdec
173 GroupWebhooksModel.enabled == True, # noqa: E712 - required for SQLAlchemy comparison
174 GroupWebhooksModel.scheduled_time > start_dt.astimezone(UTC).time(),
175 GroupWebhooksModel.scheduled_time <= end_dt.astimezone(UTC).time(),
176 GroupWebhooksModel.group_id == self.group_id,
177 GroupWebhooksModel.household_id == self.household_id,
178 )
179 return session.execute(stmt).scalars().all() 1fbdec