Coverage for polar/models/event.py: 54%
113 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import datetime 1ab
2from enum import StrEnum 1ab
3from typing import TYPE_CHECKING, Any, cast 1ab
4from uuid import UUID 1ab
6from sqlalchemy import ( 1ab
7 TIMESTAMP,
8 BigInteger,
9 ColumnElement,
10 ForeignKey,
11 Index,
12 Select,
13 String,
14 Uuid,
15 and_,
16 case,
17 event,
18 exists,
19 extract,
20 literal_column,
21 or_,
22 select,
23 update,
24)
25from sqlalchemy import ( 1ab
26 cast as sql_cast,
27)
28from sqlalchemy import ( 1ab
29 cast as sqla_cast,
30)
31from sqlalchemy.dialects.postgresql import insert 1ab
32from sqlalchemy.ext.hybrid import hybrid_property 1ab
33from sqlalchemy.orm import ( 1ab
34 Mapped,
35 Relationship,
36 column_property,
37 declared_attr,
38 mapped_column,
39 relationship,
40)
41from sqlalchemy.sql.elements import BinaryExpression 1ab
43from polar.kit.db.models import Model 1ab
44from polar.kit.metadata import MetadataMixin, extract_metadata_value 1ab
45from polar.kit.utils import generate_uuid, utc_now 1ab
47from .customer import Customer 1ab
49if TYPE_CHECKING: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true1ab
50 from .event_type import EventType
51 from .organization import Organization
54class EventSource(StrEnum): 1ab
55 system = "system" 1ab
56 user = "user" 1ab
59class CustomerComparator(Relationship.Comparator[Customer]): 1ab
60 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] 1ab
61 if isinstance(other, Customer):
62 clause = Event.customer_id == other.id
63 if other.external_id is not None:
64 clause |= and_(
65 Event.external_customer_id.is_not(None),
66 Event.external_customer_id == other.external_id,
67 Event.organization_id == other.organization_id,
68 )
69 return clause
71 raise NotImplementedError()
73 def is_(self, other: Any) -> BinaryExpression[bool]: 1ab
74 if other is None:
75 return cast(
76 BinaryExpression[bool],
77 and_(
78 Event.customer_id.is_(None),
79 or_(
80 Event.external_customer_id.is_(None),
81 ~exists(
82 select(1).where(
83 Customer.external_id == Event.external_customer_id,
84 Customer.organization_id == Event.organization_id,
85 )
86 ),
87 ),
88 ),
89 )
91 raise NotImplementedError()
93 def is_not(self, other: Any) -> BinaryExpression[bool]: 1ab
94 if other is None:
95 return cast(
96 BinaryExpression[bool],
97 or_(
98 Event.customer_id.is_not(None),
99 and_(
100 Event.external_customer_id.is_not(None),
101 exists(
102 select(1).where(
103 Customer.external_id == Event.external_customer_id,
104 Customer.organization_id == Event.organization_id,
105 )
106 ),
107 ),
108 ),
109 )
111 raise NotImplementedError()
114class Event(Model, MetadataMixin): 1ab
115 __tablename__ = "events" 1ab
116 __table_args__ = ( 1ab
117 Index(
118 "ix_events_org_timestamp_id",
119 "organization_id",
120 literal_column("timestamp DESC"),
121 "id",
122 ),
123 )
125 id: Mapped[UUID] = mapped_column(Uuid, primary_key=True, default=generate_uuid) 1ab
126 ingested_at: Mapped[datetime.datetime] = mapped_column( 1ab
127 TIMESTAMP(timezone=True), nullable=False, default=utc_now, index=True
128 )
129 timestamp: Mapped[datetime.datetime] = mapped_column( 1ab
130 TIMESTAMP(timezone=True), nullable=False, default=utc_now, index=True
131 )
132 name: Mapped[str] = mapped_column(String(128), nullable=False, index=True) 1ab
133 source: Mapped[EventSource] = mapped_column( 1ab
134 String, nullable=False, default=EventSource.system, index=True
135 )
137 customer_id: Mapped[UUID | None] = mapped_column( 1ab
138 Uuid, ForeignKey("customers.id"), nullable=True, index=True
139 )
141 external_customer_id: Mapped[str | None] = mapped_column( 1ab
142 String, nullable=True, index=True
143 )
145 external_id: Mapped[str | None] = mapped_column( 1ab
146 String, nullable=True, index=True, unique=True
147 )
149 parent_id: Mapped[UUID | None] = mapped_column( 1ab
150 Uuid, ForeignKey("events.id"), nullable=True, index=True
151 )
153 root_id: Mapped[UUID | None] = mapped_column( 1ab
154 Uuid, ForeignKey("events.id"), nullable=True, index=True
155 )
157 @declared_attr 1ab
158 def parent(cls) -> Mapped["Event | None"]: 1ab
159 return relationship( 1ab
160 "Event",
161 foreign_keys="Event.parent_id",
162 remote_side="Event.id",
163 lazy="raise",
164 )
166 @declared_attr 1ab
167 def customer(cls) -> Mapped[Customer | None]: 1ab
168 return relationship( 1ab
169 Customer,
170 primaryjoin=(
171 "or_("
172 "Event.customer_id == Customer.id,"
173 "and_("
174 "Event.external_customer_id == Customer.external_id,"
175 "Event.organization_id == Customer.organization_id"
176 ")"
177 ")"
178 ),
179 comparator_factory=CustomerComparator,
180 lazy="raise",
181 viewonly=True,
182 )
184 resolved_customer_id: Mapped[UUID | str] = column_property( 1ab
185 case(
186 (customer_id.is_not(None), sql_cast(customer_id, String)),
187 else_=external_customer_id,
188 )
189 )
191 organization_id: Mapped[UUID] = mapped_column( 1ab
192 Uuid,
193 ForeignKey("organizations.id", ondelete="cascade"),
194 nullable=False,
195 index=True,
196 )
198 @declared_attr 1ab
199 def organization(cls) -> Mapped["Organization"]: 1ab
200 return relationship("Organization", lazy="raise") 1ab
202 event_type_id: Mapped[UUID | None] = mapped_column( 1ab
203 Uuid, ForeignKey("event_types.id"), nullable=True, index=True
204 )
206 @declared_attr 1ab
207 def event_types(cls) -> Mapped["EventType | None"]: 1ab
208 return relationship("EventType", lazy="raise") 1ab
210 @property 1ab
211 def label(self) -> str: 1ab
212 if self.source == EventSource.system:
213 # Lazy import to avoid a circular dependency
214 from polar.event.system import SYSTEM_EVENT_LABELS
216 return SYSTEM_EVENT_LABELS.get(self.name, self.name)
217 if self.event_types is not None:
218 base_label = self.event_types.label
219 if self.event_types.label_property_selector:
220 dynamic_label = extract_metadata_value(
221 self.user_metadata, self.event_types.label_property_selector
222 )
223 if dynamic_label:
224 return f"{base_label} → {dynamic_label}"
225 return base_label
226 return self.name
228 @hybrid_property 1ab
229 def is_meter_credit(self) -> bool: 1ab
230 return (
231 self.source == EventSource.system
232 and
233 # ⚠️ We don't use `SystemEvent` here to avoid circular imports.
234 self.name == "meter.credited"
235 )
237 @is_meter_credit.inplace.expression 1ab
238 @classmethod 1ab
239 def _is_meter_credit_expression(cls) -> ColumnElement[bool]: 1ab
240 return and_(
241 cls.source == EventSource.system,
242 # ⚠️ We don't use `SystemEvent` here to avoid circular imports.
243 cls.name == "meter.credited",
244 )
246 _filterable_fields: dict[str, tuple[type[str | int | bool], Any]] = { 1ab
247 "timestamp": (int, sqla_cast(extract("epoch", timestamp), BigInteger)),
248 "name": (str, name),
249 "source": (str, source),
250 }
253class EventClosure(Model): 1ab
254 __tablename__ = "events_closure" 1ab
255 __table_args__ = ( 1ab
256 Index(
257 "ix_events_closure_ancestor_descendant",
258 "ancestor_id",
259 "descendant_id",
260 ),
261 Index(
262 "ix_events_closure_descendant_ancestor",
263 "descendant_id",
264 "ancestor_id",
265 ),
266 )
268 ancestor_id: Mapped[UUID] = mapped_column( 1ab
269 Uuid,
270 ForeignKey("events.id", ondelete="cascade"),
271 primary_key=True,
272 nullable=False,
273 )
275 descendant_id: Mapped[UUID] = mapped_column( 1ab
276 Uuid,
277 ForeignKey("events.id", ondelete="cascade"),
278 primary_key=True,
279 nullable=False,
280 )
282 depth: Mapped[int] = mapped_column( 1ab
283 BigInteger,
284 nullable=False,
285 index=True,
286 )
288 @declared_attr 1ab
289 def ancestor(cls) -> Mapped[Event]: 1ab
290 return relationship( 1ab
291 Event,
292 foreign_keys="EventClosure.ancestor_id",
293 lazy="raise",
294 )
296 @declared_attr 1ab
297 def descendant(cls) -> Mapped[Event]: 1ab
298 return relationship( 1ab
299 Event,
300 foreign_keys="EventClosure.descendant_id",
301 lazy="raise",
302 )
305# Event listener to populate closure table when events are inserted
306@event.listens_for(Event, "after_insert") 1ab
307def populate_event_closure(mapper: Any, connection: Any, target: Event) -> None: 1ab
308 """
309 Automatically populate the closure table when an event is inserted.
310 This ensures the closure table is maintained even when using session.add() directly.
311 """
312 # Insert self-reference
313 connection.execute(
314 insert(EventClosure).values(
315 ancestor_id=target.id,
316 descendant_id=target.id,
317 depth=0,
318 )
319 )
321 # If event has a parent, copy parent's ancestors
322 if target.parent_id is not None:
323 parent_closures: Select[Any] = select(
324 EventClosure.ancestor_id,
325 literal_column(f"'{target.id}'::uuid").label("descendant_id"),
326 (EventClosure.depth + 1).label("depth"),
327 ).where(EventClosure.descendant_id == target.parent_id)
329 connection.execute(
330 insert(EventClosure).from_select(
331 ["ancestor_id", "descendant_id", "depth"],
332 parent_closures,
333 )
334 )
336 # Set root_id if not already set
337 if target.root_id is None:
338 if target.parent_id is None:
339 # This is a root event
340 connection.execute(
341 update(Event).where(Event.id == target.id).values(root_id=target.id)
342 )
343 target.root_id = target.id
344 else:
345 # Get parent's root_id
346 result = connection.execute(
347 select(Event.root_id).where(Event.id == target.parent_id)
348 )
349 parent_root_id = result.scalar_one_or_none()
350 root_id = parent_root_id or target.parent_id
351 connection.execute(
352 update(Event).where(Event.id == target.id).values(root_id=root_id)
353 )
354 target.root_id = root_id