Coverage for polar/event/schemas.py: 95%

154 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from datetime import UTC, datetime 1a

2from decimal import Decimal 1a

3from typing import Annotated, Literal, NotRequired 1a

4 

5from fastapi import Path 1a

6from pydantic import ( 1a

7 UUID4, 

8 AfterValidator, 

9 AliasChoices, 

10 AwareDatetime, 

11 Discriminator, 

12 Field, 

13) 

14from pydantic.type_adapter import TypeAdapter 1a

15from typing_extensions import TypedDict 1a

16 

17from polar.customer.schemas.customer import Customer 1a

18from polar.event.system import ( 1a

19 BenefitGrantMetadata, 

20 CustomerCreatedMetadata, 

21 CustomerDeletedMetadata, 

22 CustomerUpdatedMetadata, 

23 MeterCreditedMetadata, 

24 MeterResetMetadata, 

25 OrderPaidMetadata, 

26 OrderRefundedMetadata, 

27 SubscriptionCycledMetadata, 

28 SubscriptionProductUpdatedMetadata, 

29 SubscriptionRevokedMetadata, 

30 SubscriptionSeatsUpdatedMetadata, 

31) 

32from polar.event.system import SystemEvent as SystemEventEnum 1a

33from polar.kit.metadata import METADATA_DESCRIPTION, MetadataValue 1a

34from polar.kit.schemas import ( 1a

35 ClassName, 

36 IDSchema, 

37 Schema, 

38 SetSchemaReference, 

39) 

40from polar.models.event import EventSource 1a

41from polar.organization.schemas import OrganizationID 1a

42 

43_NAME_DESCRIPTION = "The name of the event." 1a

44_SOURCE_DESCRIPTION = ( 1a

45 "The source of the event. " 

46 "`system` events are created by Polar. " 

47 "`user` events are the one you create through our ingestion API." 

48) 

49 

50 

51def default_timestamp_factory() -> datetime: 1a

52 return datetime.now(UTC) 

53 

54 

55def is_past_timestamp(timestamp: datetime) -> datetime: 1a

56 # Convert to UTC 

57 timestamp = timestamp.astimezone(UTC) 

58 if timestamp > datetime.now(UTC): 

59 raise ValueError("Timestamp must be in the past.") 

60 return timestamp 

61 

62 

63class CostMetadata(TypedDict): 1a

64 amount: Annotated[ 1a

65 Decimal, 

66 Field( 

67 description="The amount in cents.", 

68 max_digits=17, 

69 decimal_places=12, 

70 ), 

71 ] 

72 currency: Annotated[ 1a

73 str, 

74 Field( 

75 pattern="usd", 

76 description="The currency. Currently, only `usd` is supported.", 

77 ), 

78 ] 

79 

80 

81class LLMMetadata(TypedDict): 1a

82 vendor: Annotated[str, Field(description="The vendor of the event.")] 1a

83 model: Annotated[str, Field(description="The model used for the event.")] 1a

84 prompt: Annotated[ 1a

85 str | None, 

86 Field(default=None, description="The LLM prompt used for the event."), 

87 ] 

88 response: Annotated[ 1a

89 str | None, 

90 Field(default=None, description="The LLM response used for the event."), 

91 ] 

92 input_tokens: Annotated[ 1a

93 int, 

94 Field(description="The number of LLM input tokens used for the event."), 

95 ] 

96 cached_input_tokens: Annotated[ 1a

97 NotRequired[int], 

98 Field( 

99 description="The number of LLM cached tokens that were used for the event.", 

100 ), 

101 ] 

102 output_tokens: Annotated[ 1a

103 int, 

104 Field(description="The number of LLM output tokens used for the event."), 

105 ] 

106 total_tokens: Annotated[ 1a

107 int, 

108 Field(description="The total number of LLM tokens used for the event."), 

109 ] 

110 

111 

112class EventMetadataInput( # type: ignore[call-arg] 1a

113 TypedDict, 

114 total=False, 

115 extra_items=MetadataValue, 

116): 

117 _cost: CostMetadata 1a

118 _llm: LLMMetadata 1a

119 

120 

121def metadata_default_factory() -> EventMetadataInput: 1a

122 return {} 

123 

124 

125class EventCreateBase(Schema): 1a

126 timestamp: Annotated[ 1a

127 AwareDatetime, 

128 AfterValidator(is_past_timestamp), 

129 ] = Field( 

130 default_factory=default_timestamp_factory, 

131 description="The timestamp of the event.", 

132 ) 

133 name: str = Field(..., description="The name of the event.") 1a

134 organization_id: OrganizationID | None = Field( 1a

135 default=None, 

136 description=( 

137 "The ID of the organization owning the event. " 

138 "**Required unless you use an organization token.**" 

139 ), 

140 ) 

141 external_id: str | None = Field( 1a

142 default=None, 

143 description=( 

144 "Your unique identifier for this event. " 

145 "Useful for deduplication and parent-child relationships." 

146 ), 

147 ) 

148 parent_id: str | None = Field( 1a

149 default=None, 

150 description=( 

151 "The ID of the parent event. " 

152 "Can be either a Polar event ID (UUID) or an external event ID." 

153 ), 

154 ) 

155 metadata: EventMetadataInput = Field( 1a

156 description=METADATA_DESCRIPTION.format( 

157 heading=( 

158 "Key-value object allowing you to store additional information about the event. " 

159 "Some keys like `_llm` are structured data that are handled specially by Polar." 

160 ) 

161 ), 

162 default_factory=metadata_default_factory, 

163 serialization_alias="user_metadata", 

164 ) 

165 

166 

167class EventCreateCustomer(EventCreateBase): 1a

168 customer_id: UUID4 = Field( 1a

169 description=( 

170 "ID of the customer in your Polar organization associated with the event." 

171 ) 

172 ) 

173 

174 

175class EventCreateExternalCustomer(EventCreateBase): 1a

176 external_customer_id: str = Field( 1a

177 description="ID of the customer in your system associated with the event." 

178 ) 

179 

180 

181EventCreate = EventCreateCustomer | EventCreateExternalCustomer 1a

182 

183 

184class EventsIngest(Schema): 1a

185 events: list[EventCreate] = Field(description="List of events to ingest.") 1a

186 

187 

188class EventsIngestResponse(Schema): 1a

189 inserted: int = Field(description="Number of events inserted.") 1a

190 duplicates: int = Field( 1a

191 default=0, description="Number of duplicate events skipped." 

192 ) 

193 

194 

195class BaseEvent(IDSchema): 1a

196 timestamp: datetime = Field(description="The timestamp of the event.") 1a

197 organization_id: OrganizationID = Field( 1a

198 description="The ID of the organization owning the event." 

199 ) 

200 customer_id: UUID4 | None = Field( 1a

201 description=( 

202 "ID of the customer in your Polar organization associated with the event." 

203 ) 

204 ) 

205 customer: Customer | None = Field( 1a

206 description="The customer associated with the event." 

207 ) 

208 external_customer_id: str | None = Field( 1a

209 description="ID of the customer in your system associated with the event." 

210 ) 

211 child_count: int = Field( 1a

212 default=0, description="Number of direct child events linked to this event." 

213 ) 

214 parent_id: UUID4 | None = Field( 1a

215 default=None, 

216 description="The ID of the parent event.", 

217 ) 

218 label: str = Field(description="Human readable label of the event type.") 1a

219 

220 

221class SystemEventBase(BaseEvent): 1a

222 """An event created by Polar.""" 

223 

224 source: Literal[EventSource.system] = Field(description=_SOURCE_DESCRIPTION) 1a

225 

226 

227class MeterCreditEvent(SystemEventBase): 1a

228 """An event created by Polar when credits are added to a customer meter.""" 

229 

230 name: Literal[SystemEventEnum.meter_credited] = Field(description=_NAME_DESCRIPTION) 1a

231 metadata: MeterCreditedMetadata = Field( 1a

232 validation_alias=AliasChoices("user_metadata", "metadata") 

233 ) 

234 

235 

236class MeterResetEvent(SystemEventBase): 1a

237 """An event created by Polar when a customer meter is reset.""" 

238 

239 name: Literal[SystemEventEnum.meter_reset] = Field(description=_NAME_DESCRIPTION) 1a

240 metadata: MeterResetMetadata = Field( 1a

241 validation_alias=AliasChoices("user_metadata", "metadata") 

242 ) 

243 

244 

245class BenefitGrantedEvent(SystemEventBase): 1a

246 """An event created by Polar when a benefit is granted to a customer.""" 

247 

248 name: Literal[SystemEventEnum.benefit_granted] = Field( 1a

249 description=_NAME_DESCRIPTION 

250 ) 

251 metadata: BenefitGrantMetadata = Field( 1a

252 validation_alias=AliasChoices("user_metadata", "metadata") 

253 ) 

254 

255 

256class BenefitCycledEvent(SystemEventBase): 1a

257 """An event created by Polar when a benefit is cycled.""" 

258 

259 name: Literal[SystemEventEnum.benefit_cycled] = Field(description=_NAME_DESCRIPTION) 1a

260 metadata: BenefitGrantMetadata = Field( 1a

261 validation_alias=AliasChoices("user_metadata", "metadata") 

262 ) 

263 

264 

265class BenefitUpdatedEvent(SystemEventBase): 1a

266 """An event created by Polar when a benefit is updated.""" 

267 

268 name: Literal[SystemEventEnum.benefit_updated] = Field( 1a

269 description=_NAME_DESCRIPTION 

270 ) 

271 metadata: BenefitGrantMetadata = Field( 1a

272 validation_alias=AliasChoices("user_metadata", "metadata") 

273 ) 

274 

275 

276class BenefitRevokedEvent(SystemEventBase): 1a

277 """An event created by Polar when a benefit is revoked from a customer.""" 

278 

279 name: Literal[SystemEventEnum.benefit_revoked] = Field( 1a

280 description=_NAME_DESCRIPTION 

281 ) 

282 metadata: BenefitGrantMetadata = Field( 1a

283 validation_alias=AliasChoices("user_metadata", "metadata") 

284 ) 

285 

286 

287class SubscriptionCycledEvent(SystemEventBase): 1a

288 """An event created by Polar when a subscription is cycled.""" 

289 

290 name: Literal[SystemEventEnum.subscription_cycled] = Field( 1a

291 description=_NAME_DESCRIPTION 

292 ) 

293 metadata: SubscriptionCycledMetadata = Field( 1a

294 validation_alias=AliasChoices("user_metadata", "metadata") 

295 ) 

296 

297 

298class SubscriptionRevokedEvent(SystemEventBase): 1a

299 """An event created by Polar when a subscription is revoked from a customer.""" 

300 

301 name: Literal[SystemEventEnum.subscription_revoked] = Field( 1a

302 description=_NAME_DESCRIPTION 

303 ) 

304 metadata: SubscriptionRevokedMetadata = Field( 1a

305 validation_alias=AliasChoices("user_metadata", "metadata") 

306 ) 

307 

308 

309class SubscriptionProductUpdatedEvent(SystemEventBase): 1a

310 """An event created by Polar when a subscription changes the product.""" 

311 

312 name: Literal[SystemEventEnum.subscription_product_updated] = Field( 1a

313 description=_NAME_DESCRIPTION 

314 ) 

315 metadata: SubscriptionProductUpdatedMetadata = Field( 1a

316 validation_alias=AliasChoices("user_metadata", "metadata") 

317 ) 

318 

319 

320class SubscriptionSeatsUpdatedEvent(SystemEventBase): 1a

321 """An event created by Polar when a the seats on a subscription is changed.""" 

322 

323 name: Literal[SystemEventEnum.subscription_seats_updated] = Field( 1a

324 description=_NAME_DESCRIPTION 

325 ) 

326 metadata: SubscriptionSeatsUpdatedMetadata = Field( 1a

327 validation_alias=AliasChoices("user_metadata", "metadata") 

328 ) 

329 

330 

331class OrderPaidEvent(SystemEventBase): 1a

332 """An event created by Polar when an order is paid.""" 

333 

334 name: Literal[SystemEventEnum.order_paid] = Field(description=_NAME_DESCRIPTION) 1a

335 metadata: OrderPaidMetadata = Field( 1a

336 validation_alias=AliasChoices("user_metadata", "metadata") 

337 ) 

338 

339 

340class CustomerCreatedEvent(SystemEventBase): 1a

341 """An event created by Polar when a customer is created.""" 

342 

343 name: Literal[SystemEventEnum.customer_created] = Field( 1a

344 description=_NAME_DESCRIPTION 

345 ) 

346 metadata: CustomerCreatedMetadata = Field( 1a

347 validation_alias=AliasChoices("user_metadata", "metadata") 

348 ) 

349 

350 

351class OrderRefundedEvent(SystemEventBase): 1a

352 """An event created by Polar when an order is refunded.""" 

353 

354 name: Literal[SystemEventEnum.order_refunded] = Field(description=_NAME_DESCRIPTION) 1a

355 metadata: OrderRefundedMetadata = Field( 1a

356 validation_alias=AliasChoices("user_metadata", "metadata") 

357 ) 

358 

359 

360class CustomerUpdatedEvent(SystemEventBase): 1a

361 """An event created by Polar when a customer is updated.""" 

362 

363 name: Literal[SystemEventEnum.customer_updated] = Field( 1a

364 description=_NAME_DESCRIPTION 

365 ) 

366 metadata: CustomerUpdatedMetadata = Field( 1a

367 validation_alias=AliasChoices("user_metadata", "metadata") 

368 ) 

369 

370 

371class CustomerDeletedEvent(SystemEventBase): 1a

372 """An event created by Polar when a customer is deleted.""" 

373 

374 name: Literal[SystemEventEnum.customer_deleted] = Field( 1a

375 description=_NAME_DESCRIPTION 

376 ) 

377 metadata: CustomerDeletedMetadata = Field( 1a

378 validation_alias=AliasChoices("user_metadata", "metadata") 

379 ) 

380 

381 

382SystemEvent = Annotated[ 1a

383 MeterCreditEvent 

384 | MeterResetEvent 

385 | BenefitGrantedEvent 

386 | BenefitCycledEvent 

387 | BenefitUpdatedEvent 

388 | BenefitRevokedEvent 

389 | SubscriptionCycledEvent 

390 | SubscriptionRevokedEvent 

391 | SubscriptionProductUpdatedEvent 

392 | SubscriptionSeatsUpdatedEvent 

393 | OrderPaidEvent 

394 | OrderRefundedEvent 

395 | CustomerCreatedEvent 

396 | CustomerUpdatedEvent 

397 | CustomerDeletedEvent, 

398 Discriminator("name"), 

399 SetSchemaReference("SystemEvent"), 

400 ClassName("SystemEvent"), 

401] 

402 

403 

404class EventMetadataOutput( # type: ignore[call-arg] 1a

405 TypedDict, 

406 total=False, 

407 extra_items=str | int | float | bool, 

408): 

409 _cost: CostMetadata 1a

410 _llm: LLMMetadata 1a

411 

412 

413class UserEvent(BaseEvent): 1a

414 """An event you created through the ingestion API.""" 

415 

416 name: str = Field(description=_NAME_DESCRIPTION) 1a

417 source: Literal[EventSource.user] = Field(description=_SOURCE_DESCRIPTION) 1a

418 metadata: EventMetadataOutput = Field( 1a

419 validation_alias=AliasChoices("user_metadata", "metadata") 

420 ) 

421 

422 

423Event = Annotated[ 1a

424 SystemEvent | UserEvent, 

425 Discriminator("source"), 

426 SetSchemaReference("Event"), 

427 ClassName("Event"), 

428] 

429 

430EventTypeAdapter: TypeAdapter[Event] = TypeAdapter(Event) 1a

431 

432 

433class EventName(Schema): 1a

434 name: str = Field(description="The name of the event.") 1a

435 source: EventSource = Field(description=_SOURCE_DESCRIPTION) 1a

436 occurrences: int = Field(description="Number of times the event has occurred.") 1a

437 first_seen: datetime = Field(description="The first time the event occurred.") 1a

438 last_seen: datetime = Field(description="The last time the event occurred.") 1a

439 

440 

441class EventAggregations(Schema): 1a

442 """Aggregated values from all descendant events.""" 

443 

444 descendant_count: int = Field( 1a

445 description="Total number of descendant events (not including the event itself)." 

446 ) 

447 sums: dict[str, Decimal] = Field( 1a

448 description="Aggregated sums for requested metadata fields. Keys are field paths (e.g., 'cost_amount'), values are the summed totals.", 

449 default_factory=dict, 

450 ) 

451 

452 

453class EventWithAggregations(Schema): 1a

454 """An event with aggregated values from its descendants.""" 

455 

456 event: Event = Field(description="The event.") 1a

457 aggregations: EventAggregations = Field( 1a

458 description="Aggregated values from all descendant events." 

459 ) 

460 

461 

462class EventStatistics(Schema): 1a

463 """Aggregate statistics for events grouped by root event name.""" 

464 

465 name: str = Field(description="The name of the root event.") 1a

466 label: str = Field(description="The label of the event type.") 1a

467 event_type_id: UUID4 = Field(description="The ID of the event type") 1a

468 occurrences: int = Field( 1a

469 description="Number of root events with this name (i.e., number of traces)." 

470 ) 

471 totals: dict[str, Decimal] = Field( 1a

472 description="Sum of each field across all events in all hierarchies.", 

473 default_factory=dict, 

474 ) 

475 averages: dict[str, Decimal] = Field( 1a

476 description="Average of per-hierarchy totals (i.e., average cost per trace).", 

477 default_factory=dict, 

478 ) 

479 p50: dict[str, Decimal] = Field( 1a

480 description="Median (50th percentile) of per-hierarchy totals.", 

481 default_factory=dict, 

482 ) 

483 p95: dict[str, Decimal] = Field( 1a

484 description="95th percentile of per-hierarchy totals.", 

485 default_factory=dict, 

486 ) 

487 p99: dict[str, Decimal] = Field( 1a

488 description="99th percentile of per-hierarchy totals.", 

489 default_factory=dict, 

490 ) 

491 

492 

493class StatisticsPeriod(Schema): 1a

494 """Event statistics for a single time period.""" 

495 

496 timestamp: AwareDatetime = Field(description="Period timestamp") 1a

497 period_start: AwareDatetime = Field(description="Period start (inclusive)") 1a

498 period_end: AwareDatetime = Field(description="Period end (exclusive)") 1a

499 stats: list[EventStatistics] = Field( 1a

500 description="Stats grouped by event name for this period" 

501 ) 

502 

503 

504class ListStatisticsTimeseries(Schema): 1a

505 """Event statistics timeseries.""" 

506 

507 periods: list[StatisticsPeriod] = Field(description="Stats for each time period.") 1a

508 totals: list[EventStatistics] = Field( 1a

509 description="Overall stats across all periods." 

510 ) 

511 

512 

513EventID = Annotated[UUID4, Path(description="The event ID.")] 1a