Coverage for polar/models/webhook_endpoint.py: 97%
56 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1from enum import StrEnum 1ab
2from typing import TYPE_CHECKING, Literal 1ab
3from uuid import UUID 1ab
5from sqlalchemy import Boolean, ForeignKey, String, Uuid 1ab
6from sqlalchemy.dialects.postgresql import JSONB 1ab
7from sqlalchemy.orm import Mapped, declared_attr, mapped_column, relationship 1ab
9from polar.kit.db.models.base import RecordModel 1ab
11if TYPE_CHECKING: 11 ↛ 12line 11 didn't jump to line 12 because the condition on line 11 was never true1ab
12 from .organization import Organization
15class WebhookEventType(StrEnum): 1ab
16 checkout_created = "checkout.created" 1ab
17 checkout_updated = "checkout.updated" 1ab
18 customer_created = "customer.created" 1ab
19 customer_updated = "customer.updated" 1ab
20 customer_deleted = "customer.deleted" 1ab
21 customer_state_changed = "customer.state_changed" 1ab
22 customer_seat_assigned = "customer_seat.assigned" 1ab
23 customer_seat_claimed = "customer_seat.claimed" 1ab
24 customer_seat_revoked = "customer_seat.revoked" 1ab
25 order_created = "order.created" 1ab
26 order_updated = "order.updated" 1ab
27 order_paid = "order.paid" 1ab
28 order_refunded = "order.refunded" 1ab
29 subscription_created = "subscription.created" 1ab
30 subscription_updated = "subscription.updated" 1ab
31 subscription_active = "subscription.active" 1ab
32 subscription_canceled = "subscription.canceled" 1ab
33 subscription_uncanceled = "subscription.uncanceled" 1ab
34 subscription_revoked = "subscription.revoked" 1ab
35 refund_created = "refund.created" 1ab
36 refund_updated = "refund.updated" 1ab
37 product_created = "product.created" 1ab
38 product_updated = "product.updated" 1ab
39 benefit_created = "benefit.created" 1ab
40 benefit_updated = "benefit.updated" 1ab
41 benefit_grant_created = "benefit_grant.created" 1ab
42 benefit_grant_cycled = "benefit_grant.cycled" 1ab
43 benefit_grant_updated = "benefit_grant.updated" 1ab
44 benefit_grant_revoked = "benefit_grant.revoked" 1ab
45 organization_updated = "organization.updated" 1ab
48CustomerWebhookEventType = Literal[ 1ab
49 WebhookEventType.customer_created,
50 WebhookEventType.customer_updated,
51 WebhookEventType.customer_deleted,
52 WebhookEventType.customer_state_changed,
53]
56class WebhookFormat(StrEnum): 1ab
57 raw = "raw" 1ab
58 discord = "discord" 1ab
59 slack = "slack" 1ab
62class WebhookEndpoint(RecordModel): 1ab
63 __tablename__ = "webhook_endpoints" 1ab
65 url: Mapped[str] = mapped_column(String, nullable=False) 1ab
66 format: Mapped[WebhookFormat] = mapped_column(String, nullable=False) 1ab
67 secret: Mapped[str] = mapped_column(String, nullable=False) 1ab
68 events: Mapped[list[WebhookEventType]] = mapped_column( 1ab
69 JSONB, nullable=False, default=[]
70 )
71 enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) 1ab
73 organization_id: Mapped[UUID] = mapped_column( 1ab
74 Uuid,
75 ForeignKey("organizations.id", ondelete="CASCADE"),
76 nullable=False,
77 index=True,
78 )
80 @declared_attr 1ab
81 def organization(cls) -> Mapped["Organization"]: 1ab
82 return relationship("Organization", lazy="raise") 1ab