Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/event_bus_service/event_bus_service.py: 89%

51 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:48 +0000

1from fastapi import BackgroundTasks, Depends 1a

2from pydantic import UUID4 1a

3from sqlalchemy.orm.session import Session 1a

4 

5from mealie.core.config import get_app_settings 1a

6from mealie.db.db_setup import generate_session 1a

7from mealie.repos.all_repositories import get_repositories 1a

8from mealie.schema.response.pagination import PaginationQuery 1a

9from mealie.services.event_bus_service.event_bus_listeners import ( 1a

10 AppriseEventListener, 

11 EventListenerBase, 

12 WebhookEventListener, 

13) 

14 

15from .event_types import Event, EventBusMessage, EventDocumentDataBase, EventTypes 1a

16 

17settings = get_app_settings() 1a

18ALGORITHM = "HS256" 1a

19 

20 

21class EventSource: 1a

22 event_type: str 1a

23 item_type: str 1a

24 item_id: UUID4 | int 1a

25 kwargs: dict 1a

26 

27 def __init__(self, event_type: str, item_type: str, item_id: UUID4 | int, **kwargs) -> None: 1a

28 self.event_type = event_type 

29 self.item_type = item_type 

30 self.item_id = item_id 

31 self.kwargs = kwargs 

32 

33 def dict(self) -> dict: 1a

34 return { 

35 "event_type": self.event_type, 

36 "item_type": self.item_type, 

37 "item_id": str(self.item_id), 

38 **self.kwargs, 

39 } 

40 

41 

42class EventBusService: 1a

43 bg: BackgroundTasks | None = None 1a

44 session: Session | None = None 1a

45 

46 def __init__( 1a

47 self, 

48 bg: BackgroundTasks | None = None, 

49 session: Session | None = None, 

50 ) -> None: 

51 self.bg = bg 1ptuvwxqryzABefcghijklmndosb

52 self.session = session 1ptuvwxqryzABefcghijklmndosb

53 

54 def _get_listeners(self, group_id: UUID4, household_id: UUID4) -> list[EventListenerBase]: 1a

55 return [ 1pqrefcghijklmndosb

56 AppriseEventListener(group_id, household_id), 

57 WebhookEventListener(group_id, household_id), 

58 ] 

59 

60 def _publish_event(self, event: Event, group_id: UUID4, household_id: UUID4) -> None: 1a

61 """Publishes the event to all listeners""" 

62 for listener in self._get_listeners(group_id, household_id): 1pqrefcghijklmndosb

63 if subscribers := listener.get_subscribers(event): 1pqrefcghijklmndosb

64 listener.publish_to_subscribers(event, subscribers) 

65 

66 def dispatch( 1a

67 self, 

68 integration_id: str, 

69 group_id: UUID4, 

70 household_id: UUID4 | None, 

71 event_type: EventTypes, 

72 document_data: EventDocumentDataBase | None, 

73 message: str = "", 

74 ) -> None: 

75 event = Event( 1pqrefcghijklmndosb

76 message=EventBusMessage.from_type(event_type, body=message), 

77 event_type=event_type, 

78 integration_id=integration_id, 

79 document_data=document_data, 

80 ) 

81 

82 if not household_id: 1pqrefcghijklmndosb

83 if not self.session: 83 ↛ 84line 83 didn't jump to line 84 because the condition on line 83 was never true1efcghijklmndob

84 raise ValueError("Session is required if household_id is not provided") 

85 

86 repos = get_repositories(self.session, group_id=group_id) 1efcghijklmndob

87 households = repos.households.page_all(PaginationQuery(page=1, per_page=-1)).items 1efcghijklmndob

88 household_ids = [household.id for household in households] 1efcghijklmndob

89 else: 

90 household_ids = [household_id] 1pqrefcghijklmndosb

91 

92 for household_id in household_ids: 1pqrefcghijklmndosb

93 if self.bg: 1pqrefcghijklmndosb

94 self.bg.add_task(self._publish_event, event=event, group_id=group_id, household_id=household_id) 1pqefcghijklmndosb

95 else: 

96 self._publish_event(event, group_id, household_id) 1rcdb

97 

98 @classmethod 1a

99 def as_dependency( 1a

100 cls, 

101 bg: BackgroundTasks, 

102 session=Depends(generate_session), 

103 ): 

104 """Convenience method to use as a dependency in FastAPI routes""" 

105 return cls(bg, session) 1ptuvwxqryzABefcghijklmndosb