Coverage for polar/event/schemas.py: 95%
154 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1from datetime import UTC, datetime 1a
2from decimal import Decimal 1a
3from typing import Annotated, Literal, NotRequired 1a
5from fastapi import Path 1a
6from pydantic import ( 1a
7 UUID4,
8 AfterValidator,
9 AliasChoices,
10 AwareDatetime,
11 Discriminator,
12 Field,
13)
14from pydantic.type_adapter import TypeAdapter 1a
15from typing_extensions import TypedDict 1a
17from polar.customer.schemas.customer import Customer 1a
18from polar.event.system import ( 1a
19 BenefitGrantMetadata,
20 CustomerCreatedMetadata,
21 CustomerDeletedMetadata,
22 CustomerUpdatedMetadata,
23 MeterCreditedMetadata,
24 MeterResetMetadata,
25 OrderPaidMetadata,
26 OrderRefundedMetadata,
27 SubscriptionCycledMetadata,
28 SubscriptionProductUpdatedMetadata,
29 SubscriptionRevokedMetadata,
30 SubscriptionSeatsUpdatedMetadata,
31)
32from polar.event.system import SystemEvent as SystemEventEnum 1a
33from polar.kit.metadata import METADATA_DESCRIPTION, MetadataValue 1a
34from polar.kit.schemas import ( 1a
35 ClassName,
36 IDSchema,
37 Schema,
38 SetSchemaReference,
39)
40from polar.models.event import EventSource 1a
41from polar.organization.schemas import OrganizationID 1a
43_NAME_DESCRIPTION = "The name of the event." 1a
44_SOURCE_DESCRIPTION = ( 1a
45 "The source of the event. "
46 "`system` events are created by Polar. "
47 "`user` events are the one you create through our ingestion API."
48)
51def default_timestamp_factory() -> datetime: 1a
52 return datetime.now(UTC)
55def is_past_timestamp(timestamp: datetime) -> datetime: 1a
56 # Convert to UTC
57 timestamp = timestamp.astimezone(UTC)
58 if timestamp > datetime.now(UTC):
59 raise ValueError("Timestamp must be in the past.")
60 return timestamp
63class CostMetadata(TypedDict): 1a
64 amount: Annotated[ 1a
65 Decimal,
66 Field(
67 description="The amount in cents.",
68 max_digits=17,
69 decimal_places=12,
70 ),
71 ]
72 currency: Annotated[ 1a
73 str,
74 Field(
75 pattern="usd",
76 description="The currency. Currently, only `usd` is supported.",
77 ),
78 ]
81class LLMMetadata(TypedDict): 1a
82 vendor: Annotated[str, Field(description="The vendor of the event.")] 1a
83 model: Annotated[str, Field(description="The model used for the event.")] 1a
84 prompt: Annotated[ 1a
85 str | None,
86 Field(default=None, description="The LLM prompt used for the event."),
87 ]
88 response: Annotated[ 1a
89 str | None,
90 Field(default=None, description="The LLM response used for the event."),
91 ]
92 input_tokens: Annotated[ 1a
93 int,
94 Field(description="The number of LLM input tokens used for the event."),
95 ]
96 cached_input_tokens: Annotated[ 1a
97 NotRequired[int],
98 Field(
99 description="The number of LLM cached tokens that were used for the event.",
100 ),
101 ]
102 output_tokens: Annotated[ 1a
103 int,
104 Field(description="The number of LLM output tokens used for the event."),
105 ]
106 total_tokens: Annotated[ 1a
107 int,
108 Field(description="The total number of LLM tokens used for the event."),
109 ]
112class EventMetadataInput( # type: ignore[call-arg] 1a
113 TypedDict,
114 total=False,
115 extra_items=MetadataValue,
116):
117 _cost: CostMetadata 1a
118 _llm: LLMMetadata 1a
121def metadata_default_factory() -> EventMetadataInput: 1a
122 return {}
125class EventCreateBase(Schema): 1a
126 timestamp: Annotated[ 1a
127 AwareDatetime,
128 AfterValidator(is_past_timestamp),
129 ] = Field(
130 default_factory=default_timestamp_factory,
131 description="The timestamp of the event.",
132 )
133 name: str = Field(..., description="The name of the event.") 1a
134 organization_id: OrganizationID | None = Field( 1a
135 default=None,
136 description=(
137 "The ID of the organization owning the event. "
138 "**Required unless you use an organization token.**"
139 ),
140 )
141 external_id: str | None = Field( 1a
142 default=None,
143 description=(
144 "Your unique identifier for this event. "
145 "Useful for deduplication and parent-child relationships."
146 ),
147 )
148 parent_id: str | None = Field( 1a
149 default=None,
150 description=(
151 "The ID of the parent event. "
152 "Can be either a Polar event ID (UUID) or an external event ID."
153 ),
154 )
155 metadata: EventMetadataInput = Field( 1a
156 description=METADATA_DESCRIPTION.format(
157 heading=(
158 "Key-value object allowing you to store additional information about the event. "
159 "Some keys like `_llm` are structured data that are handled specially by Polar."
160 )
161 ),
162 default_factory=metadata_default_factory,
163 serialization_alias="user_metadata",
164 )
167class EventCreateCustomer(EventCreateBase): 1a
168 customer_id: UUID4 = Field( 1a
169 description=(
170 "ID of the customer in your Polar organization associated with the event."
171 )
172 )
175class EventCreateExternalCustomer(EventCreateBase): 1a
176 external_customer_id: str = Field( 1a
177 description="ID of the customer in your system associated with the event."
178 )
181EventCreate = EventCreateCustomer | EventCreateExternalCustomer 1a
184class EventsIngest(Schema): 1a
185 events: list[EventCreate] = Field(description="List of events to ingest.") 1a
188class EventsIngestResponse(Schema): 1a
189 inserted: int = Field(description="Number of events inserted.") 1a
190 duplicates: int = Field( 1a
191 default=0, description="Number of duplicate events skipped."
192 )
195class BaseEvent(IDSchema): 1a
196 timestamp: datetime = Field(description="The timestamp of the event.") 1a
197 organization_id: OrganizationID = Field( 1a
198 description="The ID of the organization owning the event."
199 )
200 customer_id: UUID4 | None = Field( 1a
201 description=(
202 "ID of the customer in your Polar organization associated with the event."
203 )
204 )
205 customer: Customer | None = Field( 1a
206 description="The customer associated with the event."
207 )
208 external_customer_id: str | None = Field( 1a
209 description="ID of the customer in your system associated with the event."
210 )
211 child_count: int = Field( 1a
212 default=0, description="Number of direct child events linked to this event."
213 )
214 parent_id: UUID4 | None = Field( 1a
215 default=None,
216 description="The ID of the parent event.",
217 )
218 label: str = Field(description="Human readable label of the event type.") 1a
221class SystemEventBase(BaseEvent): 1a
222 """An event created by Polar."""
224 source: Literal[EventSource.system] = Field(description=_SOURCE_DESCRIPTION) 1a
227class MeterCreditEvent(SystemEventBase): 1a
228 """An event created by Polar when credits are added to a customer meter."""
230 name: Literal[SystemEventEnum.meter_credited] = Field(description=_NAME_DESCRIPTION) 1a
231 metadata: MeterCreditedMetadata = Field( 1a
232 validation_alias=AliasChoices("user_metadata", "metadata")
233 )
236class MeterResetEvent(SystemEventBase): 1a
237 """An event created by Polar when a customer meter is reset."""
239 name: Literal[SystemEventEnum.meter_reset] = Field(description=_NAME_DESCRIPTION) 1a
240 metadata: MeterResetMetadata = Field( 1a
241 validation_alias=AliasChoices("user_metadata", "metadata")
242 )
245class BenefitGrantedEvent(SystemEventBase): 1a
246 """An event created by Polar when a benefit is granted to a customer."""
248 name: Literal[SystemEventEnum.benefit_granted] = Field( 1a
249 description=_NAME_DESCRIPTION
250 )
251 metadata: BenefitGrantMetadata = Field( 1a
252 validation_alias=AliasChoices("user_metadata", "metadata")
253 )
256class BenefitCycledEvent(SystemEventBase): 1a
257 """An event created by Polar when a benefit is cycled."""
259 name: Literal[SystemEventEnum.benefit_cycled] = Field(description=_NAME_DESCRIPTION) 1a
260 metadata: BenefitGrantMetadata = Field( 1a
261 validation_alias=AliasChoices("user_metadata", "metadata")
262 )
265class BenefitUpdatedEvent(SystemEventBase): 1a
266 """An event created by Polar when a benefit is updated."""
268 name: Literal[SystemEventEnum.benefit_updated] = Field( 1a
269 description=_NAME_DESCRIPTION
270 )
271 metadata: BenefitGrantMetadata = Field( 1a
272 validation_alias=AliasChoices("user_metadata", "metadata")
273 )
276class BenefitRevokedEvent(SystemEventBase): 1a
277 """An event created by Polar when a benefit is revoked from a customer."""
279 name: Literal[SystemEventEnum.benefit_revoked] = Field( 1a
280 description=_NAME_DESCRIPTION
281 )
282 metadata: BenefitGrantMetadata = Field( 1a
283 validation_alias=AliasChoices("user_metadata", "metadata")
284 )
287class SubscriptionCycledEvent(SystemEventBase): 1a
288 """An event created by Polar when a subscription is cycled."""
290 name: Literal[SystemEventEnum.subscription_cycled] = Field( 1a
291 description=_NAME_DESCRIPTION
292 )
293 metadata: SubscriptionCycledMetadata = Field( 1a
294 validation_alias=AliasChoices("user_metadata", "metadata")
295 )
298class SubscriptionRevokedEvent(SystemEventBase): 1a
299 """An event created by Polar when a subscription is revoked from a customer."""
301 name: Literal[SystemEventEnum.subscription_revoked] = Field( 1a
302 description=_NAME_DESCRIPTION
303 )
304 metadata: SubscriptionRevokedMetadata = Field( 1a
305 validation_alias=AliasChoices("user_metadata", "metadata")
306 )
309class SubscriptionProductUpdatedEvent(SystemEventBase): 1a
310 """An event created by Polar when a subscription changes the product."""
312 name: Literal[SystemEventEnum.subscription_product_updated] = Field( 1a
313 description=_NAME_DESCRIPTION
314 )
315 metadata: SubscriptionProductUpdatedMetadata = Field( 1a
316 validation_alias=AliasChoices("user_metadata", "metadata")
317 )
320class SubscriptionSeatsUpdatedEvent(SystemEventBase): 1a
321 """An event created by Polar when a the seats on a subscription is changed."""
323 name: Literal[SystemEventEnum.subscription_seats_updated] = Field( 1a
324 description=_NAME_DESCRIPTION
325 )
326 metadata: SubscriptionSeatsUpdatedMetadata = Field( 1a
327 validation_alias=AliasChoices("user_metadata", "metadata")
328 )
331class OrderPaidEvent(SystemEventBase): 1a
332 """An event created by Polar when an order is paid."""
334 name: Literal[SystemEventEnum.order_paid] = Field(description=_NAME_DESCRIPTION) 1a
335 metadata: OrderPaidMetadata = Field( 1a
336 validation_alias=AliasChoices("user_metadata", "metadata")
337 )
340class CustomerCreatedEvent(SystemEventBase): 1a
341 """An event created by Polar when a customer is created."""
343 name: Literal[SystemEventEnum.customer_created] = Field( 1a
344 description=_NAME_DESCRIPTION
345 )
346 metadata: CustomerCreatedMetadata = Field( 1a
347 validation_alias=AliasChoices("user_metadata", "metadata")
348 )
351class OrderRefundedEvent(SystemEventBase): 1a
352 """An event created by Polar when an order is refunded."""
354 name: Literal[SystemEventEnum.order_refunded] = Field(description=_NAME_DESCRIPTION) 1a
355 metadata: OrderRefundedMetadata = Field( 1a
356 validation_alias=AliasChoices("user_metadata", "metadata")
357 )
360class CustomerUpdatedEvent(SystemEventBase): 1a
361 """An event created by Polar when a customer is updated."""
363 name: Literal[SystemEventEnum.customer_updated] = Field( 1a
364 description=_NAME_DESCRIPTION
365 )
366 metadata: CustomerUpdatedMetadata = Field( 1a
367 validation_alias=AliasChoices("user_metadata", "metadata")
368 )
371class CustomerDeletedEvent(SystemEventBase): 1a
372 """An event created by Polar when a customer is deleted."""
374 name: Literal[SystemEventEnum.customer_deleted] = Field( 1a
375 description=_NAME_DESCRIPTION
376 )
377 metadata: CustomerDeletedMetadata = Field( 1a
378 validation_alias=AliasChoices("user_metadata", "metadata")
379 )
382SystemEvent = Annotated[ 1a
383 MeterCreditEvent
384 | MeterResetEvent
385 | BenefitGrantedEvent
386 | BenefitCycledEvent
387 | BenefitUpdatedEvent
388 | BenefitRevokedEvent
389 | SubscriptionCycledEvent
390 | SubscriptionRevokedEvent
391 | SubscriptionProductUpdatedEvent
392 | SubscriptionSeatsUpdatedEvent
393 | OrderPaidEvent
394 | OrderRefundedEvent
395 | CustomerCreatedEvent
396 | CustomerUpdatedEvent
397 | CustomerDeletedEvent,
398 Discriminator("name"),
399 SetSchemaReference("SystemEvent"),
400 ClassName("SystemEvent"),
401]
404class EventMetadataOutput( # type: ignore[call-arg] 1a
405 TypedDict,
406 total=False,
407 extra_items=str | int | float | bool,
408):
409 _cost: CostMetadata 1a
410 _llm: LLMMetadata 1a
413class UserEvent(BaseEvent): 1a
414 """An event you created through the ingestion API."""
416 name: str = Field(description=_NAME_DESCRIPTION) 1a
417 source: Literal[EventSource.user] = Field(description=_SOURCE_DESCRIPTION) 1a
418 metadata: EventMetadataOutput = Field( 1a
419 validation_alias=AliasChoices("user_metadata", "metadata")
420 )
423Event = Annotated[ 1a
424 SystemEvent | UserEvent,
425 Discriminator("source"),
426 SetSchemaReference("Event"),
427 ClassName("Event"),
428]
430EventTypeAdapter: TypeAdapter[Event] = TypeAdapter(Event) 1a
433class EventName(Schema): 1a
434 name: str = Field(description="The name of the event.") 1a
435 source: EventSource = Field(description=_SOURCE_DESCRIPTION) 1a
436 occurrences: int = Field(description="Number of times the event has occurred.") 1a
437 first_seen: datetime = Field(description="The first time the event occurred.") 1a
438 last_seen: datetime = Field(description="The last time the event occurred.") 1a
441class EventAggregations(Schema): 1a
442 """Aggregated values from all descendant events."""
444 descendant_count: int = Field( 1a
445 description="Total number of descendant events (not including the event itself)."
446 )
447 sums: dict[str, Decimal] = Field( 1a
448 description="Aggregated sums for requested metadata fields. Keys are field paths (e.g., 'cost_amount'), values are the summed totals.",
449 default_factory=dict,
450 )
453class EventWithAggregations(Schema): 1a
454 """An event with aggregated values from its descendants."""
456 event: Event = Field(description="The event.") 1a
457 aggregations: EventAggregations = Field( 1a
458 description="Aggregated values from all descendant events."
459 )
462class EventStatistics(Schema): 1a
463 """Aggregate statistics for events grouped by root event name."""
465 name: str = Field(description="The name of the root event.") 1a
466 label: str = Field(description="The label of the event type.") 1a
467 event_type_id: UUID4 = Field(description="The ID of the event type") 1a
468 occurrences: int = Field( 1a
469 description="Number of root events with this name (i.e., number of traces)."
470 )
471 totals: dict[str, Decimal] = Field( 1a
472 description="Sum of each field across all events in all hierarchies.",
473 default_factory=dict,
474 )
475 averages: dict[str, Decimal] = Field( 1a
476 description="Average of per-hierarchy totals (i.e., average cost per trace).",
477 default_factory=dict,
478 )
479 p50: dict[str, Decimal] = Field( 1a
480 description="Median (50th percentile) of per-hierarchy totals.",
481 default_factory=dict,
482 )
483 p95: dict[str, Decimal] = Field( 1a
484 description="95th percentile of per-hierarchy totals.",
485 default_factory=dict,
486 )
487 p99: dict[str, Decimal] = Field( 1a
488 description="99th percentile of per-hierarchy totals.",
489 default_factory=dict,
490 )
493class StatisticsPeriod(Schema): 1a
494 """Event statistics for a single time period."""
496 timestamp: AwareDatetime = Field(description="Period timestamp") 1a
497 period_start: AwareDatetime = Field(description="Period start (inclusive)") 1a
498 period_end: AwareDatetime = Field(description="Period end (exclusive)") 1a
499 stats: list[EventStatistics] = Field( 1a
500 description="Stats grouped by event name for this period"
501 )
504class ListStatisticsTimeseries(Schema): 1a
505 """Event statistics timeseries."""
507 periods: list[StatisticsPeriod] = Field(description="Stats for each time period.") 1a
508 totals: list[EventStatistics] = Field( 1a
509 description="Overall stats across all periods."
510 )
513EventID = Annotated[UUID4, Path(description="The event ID.")] 1a