Coverage for polar/models/webhook_endpoint.py: 97%

56 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from enum import StrEnum 1ab

2from typing import TYPE_CHECKING, Literal 1ab

3from uuid import UUID 1ab

4 

5from sqlalchemy import Boolean, ForeignKey, String, Uuid 1ab

6from sqlalchemy.dialects.postgresql import JSONB 1ab

7from sqlalchemy.orm import Mapped, declared_attr, mapped_column, relationship 1ab

8 

9from polar.kit.db.models.base import RecordModel 1ab

10 

11if TYPE_CHECKING: 11 ↛ 12line 11 didn't jump to line 12 because the condition on line 11 was never true1ab

12 from .organization import Organization 

13 

14 

15class WebhookEventType(StrEnum): 1ab

16 checkout_created = "checkout.created" 1ab

17 checkout_updated = "checkout.updated" 1ab

18 customer_created = "customer.created" 1ab

19 customer_updated = "customer.updated" 1ab

20 customer_deleted = "customer.deleted" 1ab

21 customer_state_changed = "customer.state_changed" 1ab

22 customer_seat_assigned = "customer_seat.assigned" 1ab

23 customer_seat_claimed = "customer_seat.claimed" 1ab

24 customer_seat_revoked = "customer_seat.revoked" 1ab

25 order_created = "order.created" 1ab

26 order_updated = "order.updated" 1ab

27 order_paid = "order.paid" 1ab

28 order_refunded = "order.refunded" 1ab

29 subscription_created = "subscription.created" 1ab

30 subscription_updated = "subscription.updated" 1ab

31 subscription_active = "subscription.active" 1ab

32 subscription_canceled = "subscription.canceled" 1ab

33 subscription_uncanceled = "subscription.uncanceled" 1ab

34 subscription_revoked = "subscription.revoked" 1ab

35 refund_created = "refund.created" 1ab

36 refund_updated = "refund.updated" 1ab

37 product_created = "product.created" 1ab

38 product_updated = "product.updated" 1ab

39 benefit_created = "benefit.created" 1ab

40 benefit_updated = "benefit.updated" 1ab

41 benefit_grant_created = "benefit_grant.created" 1ab

42 benefit_grant_cycled = "benefit_grant.cycled" 1ab

43 benefit_grant_updated = "benefit_grant.updated" 1ab

44 benefit_grant_revoked = "benefit_grant.revoked" 1ab

45 organization_updated = "organization.updated" 1ab

46 

47 

48CustomerWebhookEventType = Literal[ 1ab

49 WebhookEventType.customer_created, 

50 WebhookEventType.customer_updated, 

51 WebhookEventType.customer_deleted, 

52 WebhookEventType.customer_state_changed, 

53] 

54 

55 

56class WebhookFormat(StrEnum): 1ab

57 raw = "raw" 1ab

58 discord = "discord" 1ab

59 slack = "slack" 1ab

60 

61 

62class WebhookEndpoint(RecordModel): 1ab

63 __tablename__ = "webhook_endpoints" 1ab

64 

65 url: Mapped[str] = mapped_column(String, nullable=False) 1ab

66 format: Mapped[WebhookFormat] = mapped_column(String, nullable=False) 1ab

67 secret: Mapped[str] = mapped_column(String, nullable=False) 1ab

68 events: Mapped[list[WebhookEventType]] = mapped_column( 1ab

69 JSONB, nullable=False, default=[] 

70 ) 

71 enabled: Mapped[bool] = mapped_column(Boolean, nullable=False, default=True) 1ab

72 

73 organization_id: Mapped[UUID] = mapped_column( 1ab

74 Uuid, 

75 ForeignKey("organizations.id", ondelete="CASCADE"), 

76 nullable=False, 

77 index=True, 

78 ) 

79 

80 @declared_attr 1ab

81 def organization(cls) -> Mapped["Organization"]: 1ab

82 return relationship("Organization", lazy="raise") 1ab