Coverage for polar/webhook/tasks.py: 26%
84 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import base64 1a
2from collections.abc import Mapping 1a
3from ssl import SSLError 1a
4from uuid import UUID 1a
6import httpx 1a
7import structlog 1a
8from apscheduler.triggers.cron import CronTrigger 1a
9from dramatiq import Retry 1a
10from standardwebhooks.webhooks import Webhook as StandardWebhook 1a
12from polar.config import settings 1a
13from polar.kit.db.postgres import AsyncSession 1a
14from polar.kit.utils import utc_now 1a
15from polar.logging import Logger 1a
16from polar.models.webhook_delivery import WebhookDelivery 1a
17from polar.worker import AsyncSessionMaker, TaskPriority, actor, can_retry, enqueue_job 1a
19from .service import webhook as webhook_service 1a
21log: Logger = structlog.get_logger() 1a
24@actor( 1a
25 actor_name="webhook_event.send",
26 max_retries=settings.WEBHOOK_MAX_RETRIES,
27 priority=TaskPriority.MEDIUM,
28)
29async def webhook_event_send(webhook_event_id: UUID, redeliver: bool = False) -> None: 1a
30 async with AsyncSessionMaker() as session:
31 return await _webhook_event_send(
32 session, webhook_event_id=webhook_event_id, redeliver=redeliver
33 )
36async def _webhook_event_send( 1a
37 session: AsyncSession, *, webhook_event_id: UUID, redeliver: bool = False
38) -> None:
39 event = await webhook_service.get_event_by_id(session, webhook_event_id)
40 if not event:
41 raise Exception(f"webhook event not found id={webhook_event_id}")
43 bound_log = log.bind(
44 id=webhook_event_id,
45 type=event.type,
46 webhook_endpoint_id=event.webhook_endpoint_id,
47 )
49 if not event.webhook_endpoint.enabled:
50 bound_log.info("Webhook endpoint is disabled, skipping")
51 event.skipped = True
52 session.add(event)
53 return
55 if event.payload is None:
56 bound_log.info("Archived event, skipping")
57 return
59 if event.succeeded and not redeliver:
60 bound_log.info("Event already succeeded, skipping")
61 return
63 if not await webhook_service.is_latest_event(session, event):
64 log.info(
65 "Earlier events need to be delivered first, retrying later",
66 id=event.id,
67 type=event.type,
68 webhook_endpoint_id=event.webhook_endpoint_id,
69 )
70 raise Retry()
72 if event.skipped:
73 event.skipped = False
74 session.add(event)
76 ts = utc_now()
78 b64secret = base64.b64encode(event.webhook_endpoint.secret.encode("utf-8")).decode(
79 "utf-8"
80 )
82 # Sign the payload
83 wh = StandardWebhook(b64secret)
84 signature = wh.sign(str(event.id), ts, event.payload)
86 headers: Mapping[str, str] = {
87 "user-agent": "polar.sh webhooks",
88 "content-type": "application/json",
89 "webhook-id": str(event.id),
90 "webhook-timestamp": str(int(ts.timestamp())),
91 "webhook-signature": signature,
92 }
94 delivery = WebhookDelivery(
95 webhook_event_id=webhook_event_id, webhook_endpoint_id=event.webhook_endpoint_id
96 )
98 async with httpx.AsyncClient() as client:
99 try:
100 response = await client.post(
101 event.webhook_endpoint.url,
102 content=event.payload,
103 headers=headers,
104 timeout=20.0,
105 )
106 delivery.http_code = response.status_code
107 delivery.response = (
108 # Limit to first 2048 characters to avoid bloating the DB
109 response.text[:2048] if response.text else None
110 )
111 event.last_http_code = response.status_code
112 response.raise_for_status()
113 # Error
114 except (httpx.HTTPError, SSLError) as e:
115 bound_log.info("An error occurred while sending a webhook", error=e)
116 delivery.succeeded = False
117 if delivery.response is None:
118 delivery.response = str(e)
120 # Permanent failure
121 if not can_retry():
122 event.succeeded = False
123 enqueue_job("webhook_event.failed", webhook_event_id=webhook_event_id)
124 # Retry
125 else:
126 raise Retry() from e
127 # Success
128 else:
129 delivery.succeeded = True
130 event.succeeded = True
131 enqueue_job("webhook_event.success", webhook_event_id=webhook_event_id)
132 # Either way, save the delivery
133 finally:
134 assert delivery.succeeded is not None
135 session.add(delivery)
136 session.add(event)
137 await session.commit()
140@actor(actor_name="webhook_event.success", priority=TaskPriority.HIGH) 1a
141async def webhook_event_success(webhook_event_id: UUID) -> None: 1a
142 async with AsyncSessionMaker() as session:
143 return await webhook_service.on_event_success(session, webhook_event_id)
146@actor(actor_name="webhook_event.failed", priority=TaskPriority.HIGH) 1a
147async def webhook_event_failed(webhook_event_id: UUID) -> None: 1a
148 async with AsyncSessionMaker() as session:
149 return await webhook_service.on_event_failed(session, webhook_event_id)
152@actor( 1a
153 actor_name="webhook_event.archive",
154 cron_trigger=CronTrigger(hour=0, minute=0),
155 priority=TaskPriority.LOW,
156)
157async def webhook_event_archive() -> None: 1a
158 async with AsyncSessionMaker() as session:
159 return await webhook_service.archive_events(
160 session, older_than=utc_now() - settings.WEBHOOK_EVENT_RETENTION_PERIOD
161 )