Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scheduler/tasks/post_webhooks.py: 87%

39 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-11-25 17:29 +0000

1from datetime import UTC, datetime 1o

2 

3from pydantic import UUID4 1o

4 

5from mealie.db.db_setup import session_context 1o

6from mealie.repos.all_repositories import get_repositories 1o

7from mealie.schema.household.webhook import ReadWebhook 1o

8from mealie.schema.response.pagination import PaginationQuery 1o

9from mealie.services.event_bus_service.event_bus_listeners import WebhookEventListener 1o

10from mealie.services.event_bus_service.event_bus_service import EventBusService 1o

11from mealie.services.event_bus_service.event_types import ( 1o

12 INTERNAL_INTEGRATION_ID, 

13 Event, 

14 EventBusMessage, 

15 EventDocumentType, 

16 EventOperation, 

17 EventTypes, 

18 EventWebhookData, 

19) 

20 

21last_ran = datetime.now(UTC) 1o

22 

23 

24def post_group_webhooks( 1o

25 start_dt: datetime | None = None, group_id: UUID4 | None = None, household_id: UUID4 | None = None 

26) -> None: 

27 """Post webhook events to specified group, or all groups""" 

28 

29 global last_ran 

30 

31 # if not specified, start the query at the last time the service ran 

32 start_dt = start_dt or last_ran 1bcdefghikmnja

33 

34 # end the query at the current time 

35 last_ran = end_dt = datetime.now(UTC) 1bcdefghikmnja

36 

37 if group_id is None: 1bcdefghikmnja

38 # publish the webhook event to each group's event bus 

39 

40 with session_context() as session: 1bcldefghikmnja

41 repos = get_repositories(session) 1bcdefghikmnja

42 groups_data = repos.groups.page_all(PaginationQuery(page=1, per_page=-1)) 1bcldefghikmnja

43 group_ids = [group.id for group in groups_data.items] 1bcldefghikmnja

44 

45 else: 

46 group_ids = [group_id] 

47 

48 """ 1bcldefghikmnja

49 At this time only mealplan webhooks are supported. To add support for more types, 

50 add a dispatch event for that type here (e.g. EventDocumentType.recipe_bulk_report) and 

51 handle the webhook data in the webhook event bus listener 

52 """ 

53 

54 event_type = EventTypes.webhook_task 1bcldefghikmnja

55 event_document_data = EventWebhookData( 1bcldefghikmnja

56 document_type=EventDocumentType.mealplan, 

57 operation=EventOperation.info, 

58 webhook_start_dt=start_dt, 

59 webhook_end_dt=end_dt, 

60 ) 

61 

62 for group_id in group_ids: 1bcldefghikmnja

63 if household_id is None: 1bcldefghikmnja

64 with session_context() as session: 1bcldefghikmnja

65 household_repos = get_repositories(session, group_id=group_id) 1bcldefghikmnja

66 households_data = household_repos.households.page_all(PaginationQuery(page=1, per_page=-1)) 1bcldefghikmnja

67 household_ids = [household.id for household in households_data.items] 1bcldefghikja

68 else: 

69 household_ids = [household_id] 1bcldefghija

70 

71 for household_id in household_ids: 1bcldefghikja

72 event_bus = EventBusService() 1bcldefghikja

73 event_bus.dispatch( 1bcldefghikja

74 integration_id=INTERNAL_INTEGRATION_ID, 

75 group_id=group_id, 

76 household_id=household_id, 

77 event_type=event_type, 

78 document_data=event_document_data, 

79 ) 

80 

81 

82def post_test_webhook(webhook: ReadWebhook, message: str = "") -> None: 1o

83 dt = datetime.min.replace(tzinfo=UTC) 

84 event_type = EventTypes.test_message 

85 

86 event_document_data = EventWebhookData( 

87 document_type=EventDocumentType.generic, 

88 operation=EventOperation.info, 

89 webhook_start_dt=dt, 

90 webhook_end_dt=dt, 

91 ) 

92 event = Event( 

93 message=EventBusMessage.from_type(event_type, body=message), 

94 event_type=event_type, 

95 integration_id=INTERNAL_INTEGRATION_ID, 

96 document_data=event_document_data, 

97 ) 

98 

99 listener = WebhookEventListener(webhook.group_id, webhook.household_id) 

100 listener.publish_to_subscribers(event, [webhook])