Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scheduler/tasks/post_webhooks.py: 87%

39 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 15:48 +0000

1from datetime import UTC, datetime 1e

2 

3from pydantic import UUID4 1e

4 

5from mealie.db.db_setup import session_context 1e

6from mealie.repos.all_repositories import get_repositories 1e

7from mealie.schema.household.webhook import ReadWebhook 1e

8from mealie.schema.response.pagination import PaginationQuery 1e

9from mealie.services.event_bus_service.event_bus_listeners import WebhookEventListener 1e

10from mealie.services.event_bus_service.event_bus_service import EventBusService 1e

11from mealie.services.event_bus_service.event_types import ( 1e

12 INTERNAL_INTEGRATION_ID, 

13 Event, 

14 EventBusMessage, 

15 EventDocumentType, 

16 EventOperation, 

17 EventTypes, 

18 EventWebhookData, 

19) 

20 

21last_ran = datetime.now(UTC) 1e

22 

23 

24def post_group_webhooks( 1e

25 start_dt: datetime | None = None, group_id: UUID4 | None = None, household_id: UUID4 | None = None 

26) -> None: 

27 """Post webhook events to specified group, or all groups""" 

28 

29 global last_ran 

30 

31 # if not specified, start the query at the last time the service ran 

32 start_dt = start_dt or last_ran 1abcd

33 

34 # end the query at the current time 

35 last_ran = end_dt = datetime.now(UTC) 1abcd

36 

37 if group_id is None: 1abcd

38 # publish the webhook event to each group's event bus 

39 

40 with session_context() as session: 1abc

41 repos = get_repositories(session) 1abc

42 groups_data = repos.groups.page_all(PaginationQuery(page=1, per_page=-1)) 1abc

43 group_ids = [group.id for group in groups_data.items] 1abc

44 

45 else: 

46 group_ids = [group_id] 

47 

48 """ 1abcd

49 At this time only mealplan webhooks are supported. To add support for more types, 

50 add a dispatch event for that type here (e.g. EventDocumentType.recipe_bulk_report) and 

51 handle the webhook data in the webhook event bus listener 

52 """ 

53 

54 event_type = EventTypes.webhook_task 1abcd

55 event_document_data = EventWebhookData( 1abcd

56 document_type=EventDocumentType.mealplan, 

57 operation=EventOperation.info, 

58 webhook_start_dt=start_dt, 

59 webhook_end_dt=end_dt, 

60 ) 

61 

62 for group_id in group_ids: 1abcd

63 if household_id is None: 1abcd

64 with session_context() as session: 1abc

65 household_repos = get_repositories(session, group_id=group_id) 1abc

66 households_data = household_repos.households.page_all(PaginationQuery(page=1, per_page=-1)) 1abc

67 household_ids = [household.id for household in households_data.items] 1abc

68 else: 

69 household_ids = [household_id] 1abcd

70 

71 for household_id in household_ids: 1abcd

72 event_bus = EventBusService() 1abcd

73 event_bus.dispatch( 1abcd

74 integration_id=INTERNAL_INTEGRATION_ID, 

75 group_id=group_id, 

76 household_id=household_id, 

77 event_type=event_type, 

78 document_data=event_document_data, 

79 ) 

80 

81 

82def post_test_webhook(webhook: ReadWebhook, message: str = "") -> None: 1e

83 dt = datetime.min.replace(tzinfo=UTC) 

84 event_type = EventTypes.test_message 

85 

86 event_document_data = EventWebhookData( 

87 document_type=EventDocumentType.generic, 

88 operation=EventOperation.info, 

89 webhook_start_dt=dt, 

90 webhook_end_dt=dt, 

91 ) 

92 event = Event( 

93 message=EventBusMessage.from_type(event_type, body=message), 

94 event_type=event_type, 

95 integration_id=INTERNAL_INTEGRATION_ID, 

96 document_data=event_document_data, 

97 ) 

98 

99 listener = WebhookEventListener(webhook.group_id, webhook.household_id) 

100 listener.publish_to_subscribers(event, [webhook])