Coverage for polar/models/event.py: 54%

113 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1import datetime 1ab

2from enum import StrEnum 1ab

3from typing import TYPE_CHECKING, Any, cast 1ab

4from uuid import UUID 1ab

5 

6from sqlalchemy import ( 1ab

7 TIMESTAMP, 

8 BigInteger, 

9 ColumnElement, 

10 ForeignKey, 

11 Index, 

12 Select, 

13 String, 

14 Uuid, 

15 and_, 

16 case, 

17 event, 

18 exists, 

19 extract, 

20 literal_column, 

21 or_, 

22 select, 

23 update, 

24) 

25from sqlalchemy import ( 1ab

26 cast as sql_cast, 

27) 

28from sqlalchemy import ( 1ab

29 cast as sqla_cast, 

30) 

31from sqlalchemy.dialects.postgresql import insert 1ab

32from sqlalchemy.ext.hybrid import hybrid_property 1ab

33from sqlalchemy.orm import ( 1ab

34 Mapped, 

35 Relationship, 

36 column_property, 

37 declared_attr, 

38 mapped_column, 

39 relationship, 

40) 

41from sqlalchemy.sql.elements import BinaryExpression 1ab

42 

43from polar.kit.db.models import Model 1ab

44from polar.kit.metadata import MetadataMixin, extract_metadata_value 1ab

45from polar.kit.utils import generate_uuid, utc_now 1ab

46 

47from .customer import Customer 1ab

48 

49if TYPE_CHECKING: 49 ↛ 50line 49 didn't jump to line 50 because the condition on line 49 was never true1ab

50 from .event_type import EventType 

51 from .organization import Organization 

52 

53 

54class EventSource(StrEnum): 1ab

55 system = "system" 1ab

56 user = "user" 1ab

57 

58 

59class CustomerComparator(Relationship.Comparator[Customer]): 1ab

60 def __eq__(self, other: Any) -> ColumnElement[bool]: # type: ignore[override] 1ab

61 if isinstance(other, Customer): 

62 clause = Event.customer_id == other.id 

63 if other.external_id is not None: 

64 clause |= and_( 

65 Event.external_customer_id.is_not(None), 

66 Event.external_customer_id == other.external_id, 

67 Event.organization_id == other.organization_id, 

68 ) 

69 return clause 

70 

71 raise NotImplementedError() 

72 

73 def is_(self, other: Any) -> BinaryExpression[bool]: 1ab

74 if other is None: 

75 return cast( 

76 BinaryExpression[bool], 

77 and_( 

78 Event.customer_id.is_(None), 

79 or_( 

80 Event.external_customer_id.is_(None), 

81 ~exists( 

82 select(1).where( 

83 Customer.external_id == Event.external_customer_id, 

84 Customer.organization_id == Event.organization_id, 

85 ) 

86 ), 

87 ), 

88 ), 

89 ) 

90 

91 raise NotImplementedError() 

92 

93 def is_not(self, other: Any) -> BinaryExpression[bool]: 1ab

94 if other is None: 

95 return cast( 

96 BinaryExpression[bool], 

97 or_( 

98 Event.customer_id.is_not(None), 

99 and_( 

100 Event.external_customer_id.is_not(None), 

101 exists( 

102 select(1).where( 

103 Customer.external_id == Event.external_customer_id, 

104 Customer.organization_id == Event.organization_id, 

105 ) 

106 ), 

107 ), 

108 ), 

109 ) 

110 

111 raise NotImplementedError() 

112 

113 

114class Event(Model, MetadataMixin): 1ab

115 __tablename__ = "events" 1ab

116 __table_args__ = ( 1ab

117 Index( 

118 "ix_events_org_timestamp_id", 

119 "organization_id", 

120 literal_column("timestamp DESC"), 

121 "id", 

122 ), 

123 ) 

124 

125 id: Mapped[UUID] = mapped_column(Uuid, primary_key=True, default=generate_uuid) 1ab

126 ingested_at: Mapped[datetime.datetime] = mapped_column( 1ab

127 TIMESTAMP(timezone=True), nullable=False, default=utc_now, index=True 

128 ) 

129 timestamp: Mapped[datetime.datetime] = mapped_column( 1ab

130 TIMESTAMP(timezone=True), nullable=False, default=utc_now, index=True 

131 ) 

132 name: Mapped[str] = mapped_column(String(128), nullable=False, index=True) 1ab

133 source: Mapped[EventSource] = mapped_column( 1ab

134 String, nullable=False, default=EventSource.system, index=True 

135 ) 

136 

137 customer_id: Mapped[UUID | None] = mapped_column( 1ab

138 Uuid, ForeignKey("customers.id"), nullable=True, index=True 

139 ) 

140 

141 external_customer_id: Mapped[str | None] = mapped_column( 1ab

142 String, nullable=True, index=True 

143 ) 

144 

145 external_id: Mapped[str | None] = mapped_column( 1ab

146 String, nullable=True, index=True, unique=True 

147 ) 

148 

149 parent_id: Mapped[UUID | None] = mapped_column( 1ab

150 Uuid, ForeignKey("events.id"), nullable=True, index=True 

151 ) 

152 

153 root_id: Mapped[UUID | None] = mapped_column( 1ab

154 Uuid, ForeignKey("events.id"), nullable=True, index=True 

155 ) 

156 

157 @declared_attr 1ab

158 def parent(cls) -> Mapped["Event | None"]: 1ab

159 return relationship( 1ab

160 "Event", 

161 foreign_keys="Event.parent_id", 

162 remote_side="Event.id", 

163 lazy="raise", 

164 ) 

165 

166 @declared_attr 1ab

167 def customer(cls) -> Mapped[Customer | None]: 1ab

168 return relationship( 1ab

169 Customer, 

170 primaryjoin=( 

171 "or_(" 

172 "Event.customer_id == Customer.id," 

173 "and_(" 

174 "Event.external_customer_id == Customer.external_id," 

175 "Event.organization_id == Customer.organization_id" 

176 ")" 

177 ")" 

178 ), 

179 comparator_factory=CustomerComparator, 

180 lazy="raise", 

181 viewonly=True, 

182 ) 

183 

184 resolved_customer_id: Mapped[UUID | str] = column_property( 1ab

185 case( 

186 (customer_id.is_not(None), sql_cast(customer_id, String)), 

187 else_=external_customer_id, 

188 ) 

189 ) 

190 

191 organization_id: Mapped[UUID] = mapped_column( 1ab

192 Uuid, 

193 ForeignKey("organizations.id", ondelete="cascade"), 

194 nullable=False, 

195 index=True, 

196 ) 

197 

198 @declared_attr 1ab

199 def organization(cls) -> Mapped["Organization"]: 1ab

200 return relationship("Organization", lazy="raise") 1ab

201 

202 event_type_id: Mapped[UUID | None] = mapped_column( 1ab

203 Uuid, ForeignKey("event_types.id"), nullable=True, index=True 

204 ) 

205 

206 @declared_attr 1ab

207 def event_types(cls) -> Mapped["EventType | None"]: 1ab

208 return relationship("EventType", lazy="raise") 1ab

209 

210 @property 1ab

211 def label(self) -> str: 1ab

212 if self.source == EventSource.system: 

213 # Lazy import to avoid a circular dependency 

214 from polar.event.system import SYSTEM_EVENT_LABELS 

215 

216 return SYSTEM_EVENT_LABELS.get(self.name, self.name) 

217 if self.event_types is not None: 

218 base_label = self.event_types.label 

219 if self.event_types.label_property_selector: 

220 dynamic_label = extract_metadata_value( 

221 self.user_metadata, self.event_types.label_property_selector 

222 ) 

223 if dynamic_label: 

224 return f"{base_label}{dynamic_label}" 

225 return base_label 

226 return self.name 

227 

228 @hybrid_property 1ab

229 def is_meter_credit(self) -> bool: 1ab

230 return ( 

231 self.source == EventSource.system 

232 and 

233 # ⚠️ We don't use `SystemEvent` here to avoid circular imports. 

234 self.name == "meter.credited" 

235 ) 

236 

237 @is_meter_credit.inplace.expression 1ab

238 @classmethod 1ab

239 def _is_meter_credit_expression(cls) -> ColumnElement[bool]: 1ab

240 return and_( 

241 cls.source == EventSource.system, 

242 # ⚠️ We don't use `SystemEvent` here to avoid circular imports. 

243 cls.name == "meter.credited", 

244 ) 

245 

246 _filterable_fields: dict[str, tuple[type[str | int | bool], Any]] = { 1ab

247 "timestamp": (int, sqla_cast(extract("epoch", timestamp), BigInteger)), 

248 "name": (str, name), 

249 "source": (str, source), 

250 } 

251 

252 

253class EventClosure(Model): 1ab

254 __tablename__ = "events_closure" 1ab

255 __table_args__ = ( 1ab

256 Index( 

257 "ix_events_closure_ancestor_descendant", 

258 "ancestor_id", 

259 "descendant_id", 

260 ), 

261 Index( 

262 "ix_events_closure_descendant_ancestor", 

263 "descendant_id", 

264 "ancestor_id", 

265 ), 

266 ) 

267 

268 ancestor_id: Mapped[UUID] = mapped_column( 1ab

269 Uuid, 

270 ForeignKey("events.id", ondelete="cascade"), 

271 primary_key=True, 

272 nullable=False, 

273 ) 

274 

275 descendant_id: Mapped[UUID] = mapped_column( 1ab

276 Uuid, 

277 ForeignKey("events.id", ondelete="cascade"), 

278 primary_key=True, 

279 nullable=False, 

280 ) 

281 

282 depth: Mapped[int] = mapped_column( 1ab

283 BigInteger, 

284 nullable=False, 

285 index=True, 

286 ) 

287 

288 @declared_attr 1ab

289 def ancestor(cls) -> Mapped[Event]: 1ab

290 return relationship( 1ab

291 Event, 

292 foreign_keys="EventClosure.ancestor_id", 

293 lazy="raise", 

294 ) 

295 

296 @declared_attr 1ab

297 def descendant(cls) -> Mapped[Event]: 1ab

298 return relationship( 1ab

299 Event, 

300 foreign_keys="EventClosure.descendant_id", 

301 lazy="raise", 

302 ) 

303 

304 

305# Event listener to populate closure table when events are inserted 

306@event.listens_for(Event, "after_insert") 1ab

307def populate_event_closure(mapper: Any, connection: Any, target: Event) -> None: 1ab

308 """ 

309 Automatically populate the closure table when an event is inserted. 

310 This ensures the closure table is maintained even when using session.add() directly. 

311 """ 

312 # Insert self-reference 

313 connection.execute( 

314 insert(EventClosure).values( 

315 ancestor_id=target.id, 

316 descendant_id=target.id, 

317 depth=0, 

318 ) 

319 ) 

320 

321 # If event has a parent, copy parent's ancestors 

322 if target.parent_id is not None: 

323 parent_closures: Select[Any] = select( 

324 EventClosure.ancestor_id, 

325 literal_column(f"'{target.id}'::uuid").label("descendant_id"), 

326 (EventClosure.depth + 1).label("depth"), 

327 ).where(EventClosure.descendant_id == target.parent_id) 

328 

329 connection.execute( 

330 insert(EventClosure).from_select( 

331 ["ancestor_id", "descendant_id", "depth"], 

332 parent_closures, 

333 ) 

334 ) 

335 

336 # Set root_id if not already set 

337 if target.root_id is None: 

338 if target.parent_id is None: 

339 # This is a root event 

340 connection.execute( 

341 update(Event).where(Event.id == target.id).values(root_id=target.id) 

342 ) 

343 target.root_id = target.id 

344 else: 

345 # Get parent's root_id 

346 result = connection.execute( 

347 select(Event.root_id).where(Event.id == target.parent_id) 

348 ) 

349 parent_root_id = result.scalar_one_or_none() 

350 root_id = parent_root_id or target.parent_id 

351 connection.execute( 

352 update(Event).where(Event.id == target.id).values(root_id=root_id) 

353 ) 

354 target.root_id = root_id