Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/scheduler/tasks/post_webhooks.py: 87%
39 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 15:32 +0000
1from datetime import UTC, datetime 1c
3from pydantic import UUID4 1c
5from mealie.db.db_setup import session_context 1c
6from mealie.repos.all_repositories import get_repositories 1c
7from mealie.schema.household.webhook import ReadWebhook 1c
8from mealie.schema.response.pagination import PaginationQuery 1c
9from mealie.services.event_bus_service.event_bus_listeners import WebhookEventListener 1c
10from mealie.services.event_bus_service.event_bus_service import EventBusService 1c
11from mealie.services.event_bus_service.event_types import ( 1c
12 INTERNAL_INTEGRATION_ID,
13 Event,
14 EventBusMessage,
15 EventDocumentType,
16 EventOperation,
17 EventTypes,
18 EventWebhookData,
19)
21last_ran = datetime.now(UTC) 1c
24def post_group_webhooks( 1c
25 start_dt: datetime | None = None, group_id: UUID4 | None = None, household_id: UUID4 | None = None
26) -> None:
27 """Post webhook events to specified group, or all groups"""
29 global last_ran
31 # if not specified, start the query at the last time the service ran
32 start_dt = start_dt or last_ran 1ab
34 # end the query at the current time
35 last_ran = end_dt = datetime.now(UTC) 1ab
37 if group_id is None: 1ab
38 # publish the webhook event to each group's event bus
40 with session_context() as session: 1a
41 repos = get_repositories(session) 1a
42 groups_data = repos.groups.page_all(PaginationQuery(page=1, per_page=-1)) 1a
43 group_ids = [group.id for group in groups_data.items] 1a
45 else:
46 group_ids = [group_id]
48 """ 1ab
49 At this time only mealplan webhooks are supported. To add support for more types,
50 add a dispatch event for that type here (e.g. EventDocumentType.recipe_bulk_report) and
51 handle the webhook data in the webhook event bus listener
52 """
54 event_type = EventTypes.webhook_task 1ab
55 event_document_data = EventWebhookData( 1ab
56 document_type=EventDocumentType.mealplan,
57 operation=EventOperation.info,
58 webhook_start_dt=start_dt,
59 webhook_end_dt=end_dt,
60 )
62 for group_id in group_ids: 1ab
63 if household_id is None: 1ab
64 with session_context() as session: 1a
65 household_repos = get_repositories(session, group_id=group_id) 1a
66 households_data = household_repos.households.page_all(PaginationQuery(page=1, per_page=-1)) 1a
67 household_ids = [household.id for household in households_data.items] 1a
68 else:
69 household_ids = [household_id]
71 for household_id in household_ids: 1ab
72 event_bus = EventBusService() 1ab
73 event_bus.dispatch( 1ab
74 integration_id=INTERNAL_INTEGRATION_ID,
75 group_id=group_id,
76 household_id=household_id,
77 event_type=event_type,
78 document_data=event_document_data,
79 )
82def post_test_webhook(webhook: ReadWebhook, message: str = "") -> None: 1c
83 dt = datetime.min.replace(tzinfo=UTC)
84 event_type = EventTypes.test_message
86 event_document_data = EventWebhookData(
87 document_type=EventDocumentType.generic,
88 operation=EventOperation.info,
89 webhook_start_dt=dt,
90 webhook_end_dt=dt,
91 )
92 event = Event(
93 message=EventBusMessage.from_type(event_type, body=message),
94 event_type=event_type,
95 integration_id=INTERNAL_INTEGRATION_ID,
96 document_data=event_document_data,
97 )
99 listener = WebhookEventListener(webhook.group_id, webhook.household_id)
100 listener.publish_to_subscribers(event, [webhook])