Coverage for polar/models/webhook_event.py: 87%

29 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from typing import TYPE_CHECKING 1ab

2from uuid import UUID 1ab

3 

4from sqlalchemy import Boolean, ColumnElement, ForeignKey, Index, Integer, String, Uuid 1ab

5from sqlalchemy.ext.hybrid import hybrid_property 1ab

6from sqlalchemy.orm import Mapped, declared_attr, mapped_column, relationship 1ab

7 

8from polar.kit.db.models.base import RecordModel 1ab

9from polar.kit.extensions.sqlalchemy.types import StringEnum 1ab

10 

11from .webhook_endpoint import WebhookEventType 1ab

12 

13if TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1ab

14 from .webhook_endpoint import WebhookEndpoint 

15 

16 

17class WebhookEvent(RecordModel): 1ab

18 __tablename__ = "webhook_events" 1ab

19 __table_args__ = ( 1ab

20 Index( 

21 "ix_webhook_events_created_at_non_archived", 

22 "created_at", 

23 postgresql_where="payload IS NOT NULL", 

24 ), 

25 ) 

26 

27 webhook_endpoint_id: Mapped[UUID] = mapped_column( 1ab

28 Uuid, 

29 ForeignKey("webhook_endpoints.id", ondelete="CASCADE"), 

30 nullable=False, 

31 index=True, 

32 ) 

33 

34 @declared_attr 1ab

35 def webhook_endpoint(cls) -> Mapped["WebhookEndpoint"]: 1ab

36 return relationship("WebhookEndpoint", lazy="raise") 1ab

37 

38 last_http_code: Mapped[int | None] = mapped_column(Integer, nullable=True) 1ab

39 succeeded: Mapped[bool | None] = mapped_column(Boolean, nullable=True) 1ab

40 skipped: Mapped[bool] = mapped_column( 1ab

41 Boolean, nullable=False, default=False, server_default="false" 

42 ) 

43 type: Mapped[WebhookEventType] = mapped_column( 1ab

44 StringEnum(WebhookEventType), nullable=False, index=True 

45 ) 

46 payload: Mapped[str | None] = mapped_column(String, nullable=True) 1ab

47 

48 @hybrid_property 1ab

49 def is_archived(self) -> bool: 1ab

50 return self.payload is None 

51 

52 @is_archived.inplace.expression 1ab

53 @classmethod 1ab

54 def _is_archived_expression(cls) -> ColumnElement[bool]: 1ab

55 return cls.payload.is_(None)