Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/event_bus_service/event_bus_service.py: 86%
51 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1from fastapi import BackgroundTasks, Depends 1a
2from pydantic import UUID4 1a
3from sqlalchemy.orm.session import Session 1a
5from mealie.core.config import get_app_settings 1a
6from mealie.db.db_setup import generate_session 1a
7from mealie.repos.all_repositories import get_repositories 1a
8from mealie.schema.response.pagination import PaginationQuery 1a
9from mealie.services.event_bus_service.event_bus_listeners import ( 1a
10 AppriseEventListener,
11 EventListenerBase,
12 WebhookEventListener,
13)
15from .event_types import Event, EventBusMessage, EventDocumentDataBase, EventTypes 1a
17settings = get_app_settings() 1a
18ALGORITHM = "HS256" 1a
21class EventSource: 1a
22 event_type: str 1a
23 item_type: str 1a
24 item_id: UUID4 | int 1a
25 kwargs: dict 1a
27 def __init__(self, event_type: str, item_type: str, item_id: UUID4 | int, **kwargs) -> None: 1a
28 self.event_type = event_type
29 self.item_type = item_type
30 self.item_id = item_id
31 self.kwargs = kwargs
33 def dict(self) -> dict: 1a
34 return {
35 "event_type": self.event_type,
36 "item_type": self.item_type,
37 "item_id": str(self.item_id),
38 **self.kwargs,
39 }
42class EventBusService: 1a
43 bg: BackgroundTasks | None = None 1a
44 session: Session | None = None 1a
46 def __init__( 1a
47 self,
48 bg: BackgroundTasks | None = None,
49 session: Session | None = None,
50 ) -> None:
51 self.bg = bg 1kmndefgbhijlc
52 self.session = session 1kmndefgbhijlc
54 def _get_listeners(self, group_id: UUID4, household_id: UUID4) -> list[EventListenerBase]: 1a
55 return [ 1kdefgbhijlc
56 AppriseEventListener(group_id, household_id),
57 WebhookEventListener(group_id, household_id),
58 ]
60 def _publish_event(self, event: Event, group_id: UUID4, household_id: UUID4) -> None: 1a
61 """Publishes the event to all listeners"""
62 for listener in self._get_listeners(group_id, household_id): 1kdefgbhijlc
63 if subscribers := listener.get_subscribers(event): 63 ↛ 64line 63 didn't jump to line 64 because the condition on line 63 was never true1kdefgbhijlc
64 listener.publish_to_subscribers(event, subscribers)
66 def dispatch( 1a
67 self,
68 integration_id: str,
69 group_id: UUID4,
70 household_id: UUID4 | None,
71 event_type: EventTypes,
72 document_data: EventDocumentDataBase | None,
73 message: str = "",
74 ) -> None:
75 event = Event( 1kdefgbhijlc
76 message=EventBusMessage.from_type(event_type, body=message),
77 event_type=event_type,
78 integration_id=integration_id,
79 document_data=document_data,
80 )
82 if not household_id: 1kdefgbhijlc
83 if not self.session: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true1defgbhijc
84 raise ValueError("Session is required if household_id is not provided")
86 repos = get_repositories(self.session, group_id=group_id) 1defgbhijc
87 households = repos.households.page_all(PaginationQuery(page=1, per_page=-1)).items 1defgbhijc
88 household_ids = [household.id for household in households] 1defgbhijc
89 else:
90 household_ids = [household_id] 1kdefgbhijlc
92 for household_id in household_ids: 1kdefgbhijlc
93 if self.bg: 1kdefgbhijlc
94 self.bg.add_task(self._publish_event, event=event, group_id=group_id, household_id=household_id) 1kdefgbhijlc
95 else:
96 self._publish_event(event, group_id, household_id) 1bc
98 @classmethod 1a
99 def as_dependency( 1a
100 cls,
101 bg: BackgroundTasks,
102 session=Depends(generate_session),
103 ):
104 """Convenience method to use as a dependency in FastAPI routes"""
105 return cls(bg, session) 1kmndefgbhijlc