Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scheduler/tasks/post_webhooks.py: 87%
39 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1from datetime import UTC, datetime 1o
3from pydantic import UUID4 1o
5from mealie.db.db_setup import session_context 1o
6from mealie.repos.all_repositories import get_repositories 1o
7from mealie.schema.household.webhook import ReadWebhook 1o
8from mealie.schema.response.pagination import PaginationQuery 1o
9from mealie.services.event_bus_service.event_bus_listeners import WebhookEventListener 1o
10from mealie.services.event_bus_service.event_bus_service import EventBusService 1o
11from mealie.services.event_bus_service.event_types import ( 1o
12 INTERNAL_INTEGRATION_ID,
13 Event,
14 EventBusMessage,
15 EventDocumentType,
16 EventOperation,
17 EventTypes,
18 EventWebhookData,
19)
21last_ran = datetime.now(UTC) 1o
24def post_group_webhooks( 1o
25 start_dt: datetime | None = None, group_id: UUID4 | None = None, household_id: UUID4 | None = None
26) -> None:
27 """Post webhook events to specified group, or all groups"""
29 global last_ran
31 # if not specified, start the query at the last time the service ran
32 start_dt = start_dt or last_ran 1bcdefghikmnja
34 # end the query at the current time
35 last_ran = end_dt = datetime.now(UTC) 1bcdefghikmnja
37 if group_id is None: 1bcdefghikmnja
38 # publish the webhook event to each group's event bus
40 with session_context() as session: 1bcldefghikmnja
41 repos = get_repositories(session) 1bcdefghikmnja
42 groups_data = repos.groups.page_all(PaginationQuery(page=1, per_page=-1)) 1bcldefghikmnja
43 group_ids = [group.id for group in groups_data.items] 1bcldefghikmnja
45 else:
46 group_ids = [group_id]
48 """ 1bcldefghikmnja
49 At this time only mealplan webhooks are supported. To add support for more types,
50 add a dispatch event for that type here (e.g. EventDocumentType.recipe_bulk_report) and
51 handle the webhook data in the webhook event bus listener
52 """
54 event_type = EventTypes.webhook_task 1bcldefghikmnja
55 event_document_data = EventWebhookData( 1bcldefghikmnja
56 document_type=EventDocumentType.mealplan,
57 operation=EventOperation.info,
58 webhook_start_dt=start_dt,
59 webhook_end_dt=end_dt,
60 )
62 for group_id in group_ids: 1bcldefghikmnja
63 if household_id is None: 1bcldefghikmnja
64 with session_context() as session: 1bcldefghikmnja
65 household_repos = get_repositories(session, group_id=group_id) 1bcldefghikmnja
66 households_data = household_repos.households.page_all(PaginationQuery(page=1, per_page=-1)) 1bcldefghikmnja
67 household_ids = [household.id for household in households_data.items] 1bcldefghikja
68 else:
69 household_ids = [household_id] 1bcldefghija
71 for household_id in household_ids: 1bcldefghikja
72 event_bus = EventBusService() 1bcldefghikja
73 event_bus.dispatch( 1bcldefghikja
74 integration_id=INTERNAL_INTEGRATION_ID,
75 group_id=group_id,
76 household_id=household_id,
77 event_type=event_type,
78 document_data=event_document_data,
79 )
82def post_test_webhook(webhook: ReadWebhook, message: str = "") -> None: 1o
83 dt = datetime.min.replace(tzinfo=UTC)
84 event_type = EventTypes.test_message
86 event_document_data = EventWebhookData(
87 document_type=EventDocumentType.generic,
88 operation=EventOperation.info,
89 webhook_start_dt=dt,
90 webhook_end_dt=dt,
91 )
92 event = Event(
93 message=EventBusMessage.from_type(event_type, body=message),
94 event_type=event_type,
95 integration_id=INTERNAL_INTEGRATION_ID,
96 document_data=event_document_data,
97 )
99 listener = WebhookEventListener(webhook.group_id, webhook.household_id)
100 listener.publish_to_subscribers(event, [webhook])