Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/event_bus_service/event_bus_listeners.py: 81%

97 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:32 +0000

1import contextlib 1a

2import json 1a

3from abc import ABC, abstractmethod 1a

4from collections.abc import Generator 1a

5from datetime import UTC, datetime 1a

6from urllib.parse import parse_qs, urlencode, urlsplit, urlunsplit 1a

7 

8from fastapi.encoders import jsonable_encoder 1a

9from pydantic import UUID4 1a

10from sqlalchemy import select 1a

11from sqlalchemy.orm.session import Session 1a

12 

13from mealie.db.db_setup import session_context 1a

14from mealie.db.models.household.webhooks import GroupWebhooksModel 1a

15from mealie.repos.repository_factory import AllRepositories 1a

16from mealie.schema.household.group_events import GroupEventNotifierPrivate 1a

17from mealie.schema.household.webhook import ReadWebhook 1a

18 

19from .event_types import Event, EventDocumentType, EventTypes, EventWebhookData 1a

20from .publisher import ApprisePublisher, PublisherLike, WebhookPublisher 1a

21 

22 

23class EventListenerBase(ABC): 1a

24 _session: Session | None = None 1a

25 _repos: AllRepositories | None = None 1a

26 

27 def __init__(self, group_id: UUID4, household_id: UUID4, publisher: PublisherLike) -> None: 1a

28 self.group_id = group_id 1gflhijbdekc

29 self.household_id = household_id 1gflhijbdekc

30 self.publisher = publisher 1gflhijbdekc

31 self._session = None 1gflhijbdekc

32 self._repos = None 1gflhijbdekc

33 

34 @abstractmethod 1a

35 def get_subscribers(self, event: Event) -> list: 1a

36 """Get a list of all subscribers to this event""" 

37 ... 

38 

39 @abstractmethod 1a

40 def publish_to_subscribers(self, event: Event, subscribers: list) -> None: 1a

41 """Publishes the event to all subscribers""" 

42 ... 

43 

44 @contextlib.contextmanager 1a

45 def ensure_session(self) -> Generator[Session, None, None]: 1a

46 """ 

47 ensure_session ensures that a session is available for the caller by checking if a session 

48 was provided during construction, and if not, creating a new session with the `with_session` 

49 function and closing it when the context manager exits. 

50 

51 This is _required_ when working with sessions inside an event bus listener where the listener 

52 may be constructed during a request where the session is provided by the request, but the when 

53 run as a scheduled task, the session is not provided and must be created. 

54 """ 

55 if self._session is None: 1gfhijbdekc

56 with session_context() as session: 1gfhijbdekc

57 self._session = session 1gfhijbdekc

58 yield self._session 1gfhijbdekc

59 else: 

60 yield self._session 1bc

61 

62 @contextlib.contextmanager 1a

63 def ensure_repos(self, group_id: UUID4, household_id: UUID4) -> Generator[AllRepositories, None, None]: 1a

64 if self._repos is None: 64 ↛ 69line 64 didn't jump to line 69 because the condition on line 64 was always true1gfhijbdekc

65 with self.ensure_session() as session: 1gfhijbdekc

66 self._repos = AllRepositories(session, group_id=group_id, household_id=household_id) 1gfhijbdekc

67 yield self._repos 1gfhijbdekc

68 else: 

69 yield self._repos 

70 

71 

72class AppriseEventListener(EventListenerBase): 1a

73 def __init__(self, group_id: UUID4, household_id: UUID4) -> None: 1a

74 super().__init__(group_id, household_id, ApprisePublisher()) 1gflhijbdekc

75 

76 def get_subscribers(self, event: Event) -> list[str]: 1a

77 with self.ensure_repos(self.group_id, self.household_id) as repos: 1gfhijbdekc

78 notifiers: list[GroupEventNotifierPrivate] = repos.group_event_notifier.multi_query( 1gfhijbdekc

79 {"enabled": True}, override_schema=GroupEventNotifierPrivate 

80 ) 

81 

82 urls = [notifier.apprise_url for notifier in notifiers if getattr(notifier.options, event.event_type.name)] 1gfhijbdekc

83 urls = AppriseEventListener.update_urls_with_event_data(urls, event) 1gfhijbdekc

84 

85 return urls 1gfhijbdekc

86 

87 def publish_to_subscribers(self, event: Event, subscribers: list[str]) -> None: 1a

88 self.publisher.publish(event, subscribers) 

89 

90 @staticmethod 1a

91 def update_urls_with_event_data(urls: list[str], event: Event): 1a

92 params = { 1gfhijbdekc

93 "event_type": event.event_type.name, 

94 "integration_id": event.integration_id, 

95 "document_data": json.dumps(jsonable_encoder(event.document_data)), 

96 "event_id": str(event.event_id), 

97 "timestamp": event.timestamp.isoformat() if event.timestamp else None, 

98 } 

99 

100 return [ 1gfhijbdekc

101 # We use query params to add custom key: value pairs to the Apprise payload by prepending the key with ":". 

102 ( 

103 AppriseEventListener.merge_query_parameters(url, {f":{k}": v for k, v in params.items()}) 

104 # only certain endpoints support the custom key: value pairs, so we only apply them to those endpoints 

105 if AppriseEventListener.is_custom_url(url) 

106 else url 

107 ) 

108 for url in urls 

109 ] 

110 

111 @staticmethod 1a

112 def merge_query_parameters(url: str, params: dict): 1a

113 scheme, netloc, path, query_string, fragment = urlsplit(url) 

114 

115 # merge query params 

116 query_params = parse_qs(query_string) 

117 query_params.update(params) 

118 new_query_string = urlencode(query_params, doseq=True) 

119 

120 return urlunsplit((scheme, netloc, path, new_query_string, fragment)) 

121 

122 @staticmethod 1a

123 def is_custom_url(url: str): 1a

124 return url.split(":", 1)[0].lower() in [ 

125 "form", 

126 "forms", 

127 "json", 

128 "jsons", 

129 "xml", 

130 "xmls", 

131 ] 

132 

133 

134class WebhookEventListener(EventListenerBase): 1a

135 def __init__(self, group_id: UUID4, household_id: UUID4) -> None: 1a

136 super().__init__(group_id, household_id, WebhookPublisher()) 1gfhijbdekc

137 

138 def get_subscribers(self, event: Event) -> list[ReadWebhook]: 1a

139 # we only care about events that contain webhook information 

140 if not (event.event_type == EventTypes.webhook_task and isinstance(event.document_data, EventWebhookData)): 1gfhijbdekc

141 return [] 1ghijbdekc

142 

143 scheduled_webhooks = self.get_scheduled_webhooks( 1fbdec

144 event.document_data.webhook_start_dt, event.document_data.webhook_end_dt 

145 ) 

146 return scheduled_webhooks 1fbdec

147 

148 def publish_to_subscribers(self, event: Event, subscribers: list[ReadWebhook]) -> None: 1a

149 with self.ensure_repos(self.group_id, self.household_id) as repos: 1bc

150 if not isinstance(event.document_data, EventWebhookData): 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true1bc

151 return 

152 

153 match event.document_data.document_type: 1bc

154 case EventDocumentType.mealplan: 154 ↛ 160line 154 didn't jump to line 160 because the pattern on line 154 always matched1bc

155 meal_repo = repos.meals 1bc

156 meal_data = meal_repo.get_meals_by_date_range( 1bc

157 event.document_data.webhook_start_dt, event.document_data.webhook_end_dt 

158 ) 

159 event.document_data.webhook_body = meal_data or None 1bc

160 case _: 

161 if event.event_type is EventTypes.test_message: 

162 # make sure the webhook has a valid body so it gets sent 

163 event.document_data.webhook_body = event.document_data.webhook_body or [] 

164 

165 # Only publish to subscribers if we have a webhook body to send 

166 if event.document_data.webhook_body is not None: 166 ↛ 167line 166 didn't jump to line 167 because the condition on line 166 was never true1bc

167 self.publisher.publish(event, [webhook.url for webhook in subscribers]) 

168 

169 def get_scheduled_webhooks(self, start_dt: datetime, end_dt: datetime) -> list[ReadWebhook]: 1a

170 """Fetches all scheduled webhooks from the database""" 

171 with self.ensure_session() as session: 1fbdec

172 stmt = select(GroupWebhooksModel).where( 1fbdec

173 GroupWebhooksModel.enabled == True, # noqa: E712 - required for SQLAlchemy comparison 

174 GroupWebhooksModel.scheduled_time > start_dt.astimezone(UTC).time(), 

175 GroupWebhooksModel.scheduled_time <= end_dt.astimezone(UTC).time(), 

176 GroupWebhooksModel.group_id == self.group_id, 

177 GroupWebhooksModel.household_id == self.household_id, 

178 ) 

179 return session.execute(stmt).scalars().all() 1fbdec