Coverage for opt/mealie/lib/python3.12/site-packages/mealie/services/event_bus_service/publisher.py: 45%
30 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-11-25 17:29 +0000
1from typing import Protocol 1a
3import apprise 1a
4import requests 1a
5from fastapi.encoders import jsonable_encoder 1a
7from mealie.services.event_bus_service.event_types import Event 1a
10class PublisherLike(Protocol): 1a
11 def publish(self, event: Event, notification_urls: list[str]): ... 11 ↛ exitline 11 didn't return from function 'publish' because 1a
14class ApprisePublisher: 1a
15 def __init__(self, hard_fail=False) -> None: 1a
16 asset = apprise.AppriseAsset( 1bcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
17 async_mode=True,
18 image_url_mask="https://raw.githubusercontent.com/mealie-recipes/mealie/9571816ac4eed5beacfc0abf6c03eff1427fd0eb/frontend/static/icons/android-chrome-maskable-512x512.png",
19 )
20 self.apprise = apprise.Apprise(asset=asset) 1bcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
21 self.hard_fail = hard_fail 1bcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
23 def publish(self, event: Event, notification_urls: list[str]): 1a
24 """Publishses a list of notification URLs"""
26 tags = []
27 for dest in notification_urls:
28 # we tag the url so it only sends each notification once
29 tag = str(event.event_id)
30 tags.append(tag)
32 status = self.apprise.add(dest, tag=tag)
34 if not status and self.hard_fail:
35 raise Exception("Apprise URL Add Failed")
37 self.apprise.notify(title=event.message.title, body=event.message.body, tag=tags)
40class WebhookPublisher: 1a
41 def __init__(self, hard_fail=False) -> None: 1a
42 self.hard_fail = hard_fail 1bcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ
44 def publish(self, event: Event, notification_urls: list[str]): 1a
45 event_payload = jsonable_encoder(event)
46 for url in notification_urls:
47 r = requests.post(url, json=event_payload, timeout=15)
48 if self.hard_fail:
49 r.raise_for_status()