Coverage for opt/mealie/lib/python3.12/site-packages/mealie/routes/households/controller_webhooks.py: 96%
47 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1from datetime import UTC, datetime 1a
2from functools import cached_property 1a
4from fastapi import APIRouter, BackgroundTasks, Depends 1a
5from pydantic import UUID4 1a
7from mealie.routes._base.base_controllers import BaseUserController 1a
8from mealie.routes._base.controller import controller 1a
9from mealie.routes._base.mixins import HttpRepo 1a
10from mealie.schema import mapper 1a
11from mealie.schema.household.webhook import CreateWebhook, ReadWebhook, SaveWebhook, WebhookPagination 1a
12from mealie.schema.response.pagination import PaginationQuery 1a
13from mealie.services.scheduler.tasks.post_webhooks import post_group_webhooks, post_test_webhook 1a
15router = APIRouter(prefix="/households/webhooks", tags=["Households: Webhooks"]) 1a
18@controller(router) 1a
19class ReadWebhookController(BaseUserController): 1a
20 @cached_property 1a
21 def repo(self): 1a
22 return self.repos.webhooks 1CDeAfBcghijkElmnozFpdGHqrstIKuvwJxyb
24 @property 1a
25 def mixins(self) -> HttpRepo: 1a
26 return HttpRepo[CreateWebhook, SaveWebhook, CreateWebhook](self.repo, self.logger) 1DeAfBcghijklmnozFpdGqrstIuvwJxyb
28 @router.get("", response_model=WebhookPagination) 1a
29 def get_all(self, q: PaginationQuery = Depends(PaginationQuery)): 1a
30 response = self.repo.page_all( 1CeAfBcghijkElmnozpdHqrstKuvwxyb
31 pagination=q,
32 override=ReadWebhook,
33 )
35 response.set_pagination_guides(router.url_path_for("get_all"), q.model_dump()) 1CefcghijkElmnozpdHqrstuvwxyb
36 return response 1CefcghijkElmnozpdHqrstuvwxyb
38 @router.post("", response_model=ReadWebhook, status_code=201) 1a
39 def create_one(self, data: CreateWebhook): 1a
40 save = mapper.cast(data, SaveWebhook, group_id=self.group_id, household_id=self.household_id) 1DeAfBcghijklmnoFpdGqrstIuvwJxyb
41 return self.mixins.create_one(save) 1DeAfBcghijklmnoFpdGqrstIuvwJxyb
43 @router.post("/rerun") 1a
44 def rerun_webhooks(self): 1a
45 """Manually re-fires all previously scheduled webhooks for today"""
47 start_time = datetime.min.time()
48 start_dt = datetime.combine(datetime.now(UTC).date(), start_time)
49 post_group_webhooks(start_dt=start_dt, group_id=self.group.id, household_id=self.household.id)
51 @router.get("/{item_id}", response_model=ReadWebhook) 1a
52 def get_one(self, item_id: UUID4): 1a
53 return self.mixins.get_one(item_id) 1db
55 @router.post("/{item_id}/test") 1a
56 def test_one(self, item_id: UUID4, bg_tasks: BackgroundTasks): 1a
57 webhook = self.mixins.get_one(item_id)
58 bg_tasks.add_task(post_test_webhook, webhook, "Test Webhook")
60 @router.put("/{item_id}", response_model=ReadWebhook) 1a
61 def update_one(self, item_id: UUID4, data: CreateWebhook): 1a
62 return self.mixins.update_one(data, item_id) 1cb
64 @router.delete("/{item_id}", response_model=ReadWebhook) 1a
65 def delete_one(self, item_id: UUID4): 1a
66 return self.mixins.delete_one(item_id) # type: ignore 1zb