Coverage for polar/event/endpoints.py: 48%
65 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1from collections.abc import Sequence 1a
2from datetime import date 1a
3from zoneinfo import ZoneInfo 1a
5from fastapi import Depends, Query 1a
6from fastapi.exceptions import RequestValidationError 1a
7from pydantic import UUID4, AwareDatetime, ValidationError 1a
8from pydantic_extra_types.timezone_name import TimeZoneName 1a
10from polar.customer.schemas.customer import CustomerID 1a
11from polar.exceptions import PolarRequestValidationError, ResourceNotFound 1a
12from polar.kit.metadata import MetadataQuery, get_metadata_query_openapi_schema 1a
13from polar.kit.pagination import ListResource, PaginationParamsQuery 1a
14from polar.kit.schemas import MultipleQueryFilter 1a
15from polar.kit.time_queries import TimeInterval, is_under_limits 1a
16from polar.meter.filter import Filter 1a
17from polar.meter.schemas import MeterID 1a
18from polar.models import Event 1a
19from polar.models.event import EventSource 1a
20from polar.openapi import APITag 1a
21from polar.organization.schemas import OrganizationID 1a
22from polar.postgres import AsyncSession, get_db_session 1a
23from polar.routing import APIRouter 1a
25from . import auth, sorting 1a
26from .schemas import Event as EventSchema 1a
27from .schemas import ( 1a
28 EventID,
29 EventName,
30 EventsIngest,
31 EventsIngestResponse,
32 EventTypeAdapter,
33 ListStatisticsTimeseries,
34)
35from .service import event as event_service 1a
37router = APIRouter(prefix="/events", tags=["events", APITag.public]) 1a
40EventNotFound = {"description": "Event not found.", "model": ResourceNotFound.schema()} 1a
43@router.get( 1a
44 "/",
45 summary="List Events",
46 response_model=ListResource[EventSchema],
47 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]},
48)
49async def list( 1a
50 auth_subject: auth.EventRead,
51 pagination: PaginationParamsQuery,
52 sorting: sorting.ListSorting,
53 metadata: MetadataQuery,
54 filter: str | None = Query(
55 None,
56 description=(
57 "Filter events following filter clauses. "
58 "JSON string following the same schema a meter filter clause. "
59 ),
60 ),
61 start_timestamp: AwareDatetime | None = Query(
62 None, description="Filter events after this timestamp."
63 ),
64 end_timestamp: AwareDatetime | None = Query(
65 None, description="Filter events before this timestamp."
66 ),
67 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
68 None, title="OrganizationID Filter", description="Filter by organization ID."
69 ),
70 customer_id: MultipleQueryFilter[CustomerID] | None = Query(
71 None, title="CustomerID Filter", description="Filter by customer ID."
72 ),
73 external_customer_id: MultipleQueryFilter[str] | None = Query(
74 None,
75 title="ExternalCustomerID Filter",
76 description="Filter by external customer ID.",
77 ),
78 meter_id: MeterID | None = Query(
79 None, title="MeterID Filter", description="Filter by a meter filter clause."
80 ),
81 name: MultipleQueryFilter[str] | None = Query(
82 None, title="Name Filter", description="Filter by event name."
83 ),
84 source: MultipleQueryFilter[EventSource] | None = Query(
85 None, title="Source Filter", description="Filter by event source."
86 ),
87 event_type_id: UUID4 | None = Query(
88 None,
89 title="Event Type ID Filter",
90 description="Filter by event type ID.",
91 include_in_schema=False,
92 ),
93 query: str | None = Query(
94 None, title="Query", description="Query to filter events."
95 ),
96 parent_id: EventID | None = Query(
97 None,
98 description="Filter events by parent event ID when hierarchical is set to true. When not specified or null, returns root events only.",
99 ),
100 hierarchical: bool = Query(
101 False,
102 description="When true, filters by parent_id (root events if not specified). When false, returns all events regardless of hierarchy.",
103 ),
104 aggregate_fields: Sequence[str] = Query(
105 default=[],
106 description="Metadata field paths to aggregate from descendants into ancestors (e.g., '_cost.amount', 'duration_ns'). Use dot notation for nested fields.",
107 include_in_schema=False,
108 ),
109 session: AsyncSession = Depends(get_db_session),
110) -> ListResource[EventSchema]:
111 """List events."""
113 # Manually parse the filter string to a Filter object as FastAPI does not
114 # support complex schemas in query parameters.
115 parsed_filter: Filter | None = None
116 if filter is not None:
117 try:
118 parsed_filter = Filter.model_validate_json(filter)
119 except ValidationError as e:
120 raise RequestValidationError(e.errors()) from e
122 if query is not None and organization_id is None:
123 raise RequestValidationError(
124 [
125 {
126 "type": "query",
127 "msg": "Query is only supported when organization_id is provided.",
128 }
129 ]
130 )
132 results, count = await event_service.list(
133 session,
134 auth_subject,
135 filter=parsed_filter,
136 start_timestamp=start_timestamp,
137 end_timestamp=end_timestamp,
138 organization_id=organization_id,
139 customer_id=customer_id,
140 external_customer_id=external_customer_id,
141 meter_id=meter_id,
142 name=name,
143 source=source,
144 event_type_id=event_type_id,
145 metadata=metadata,
146 pagination=pagination,
147 sorting=sorting,
148 query=query,
149 parent_id=parent_id,
150 hierarchical=hierarchical,
151 aggregate_fields=aggregate_fields,
152 )
154 return ListResource.from_paginated_results(
155 [EventTypeAdapter.validate_python(result) for result in results],
156 count,
157 pagination,
158 )
161@router.get( 1a
162 "/statistics/timeseries",
163 summary="List statistics timeseries",
164 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]},
165 tags=[APITag.private],
166 response_model=ListStatisticsTimeseries,
167)
168async def list_statistics_timeseries( 1a
169 auth_subject: auth.EventRead,
170 metadata: MetadataQuery,
171 hierarchy_sorting: sorting.EventStatisticsSorting,
172 start_date: date = Query(
173 ...,
174 description="Start date.",
175 ),
176 end_date: date = Query(..., description="End date."),
177 timezone: TimeZoneName = Query(
178 default="UTC",
179 description="Timezone to use for the dates. Default is UTC.",
180 ),
181 interval: TimeInterval = Query(..., description="Interval between two dates."),
182 filter: str | None = Query(
183 None,
184 description=(
185 "Filter events following filter clauses. "
186 "JSON string following the same schema a meter filter clause. "
187 ),
188 ),
189 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
190 None, title="OrganizationID Filter", description="Filter by organization ID."
191 ),
192 customer_id: MultipleQueryFilter[CustomerID] | None = Query(
193 None, title="CustomerID Filter", description="Filter by customer ID."
194 ),
195 external_customer_id: MultipleQueryFilter[str] | None = Query(
196 None,
197 title="ExternalCustomerID Filter",
198 description="Filter by external customer ID.",
199 ),
200 meter_id: MeterID | None = Query(
201 None, title="MeterID Filter", description="Filter by a meter filter clause."
202 ),
203 name: MultipleQueryFilter[str] | None = Query(
204 None, title="Name Filter", description="Filter by event name."
205 ),
206 source: MultipleQueryFilter[EventSource] | None = Query(
207 None, title="Source Filter", description="Filter by event source."
208 ),
209 event_type_id: UUID4 | None = Query(
210 None,
211 title="Event Type ID Filter",
212 description="Filter by event type ID.",
213 ),
214 query: str | None = Query(
215 None, title="Query", description="Query to filter events."
216 ),
217 aggregate_fields: Sequence[str] = Query(
218 default=["_cost.amount"],
219 description="Metadata field paths to aggregate (e.g., '_cost.amount', 'duration_ns'). Use dot notation for nested fields.",
220 ),
221 session: AsyncSession = Depends(get_db_session),
222) -> ListStatisticsTimeseries:
223 """
224 Get aggregate statistics grouped by root event name over time.
226 Returns time series data with periods and totals, similar to the metrics endpoint.
227 Each period contains stats grouped by event name, and totals show overall stats
228 across all periods.
229 """
230 # Validate interval limits
231 if not is_under_limits(start_date, end_date, interval):
232 raise PolarRequestValidationError(
233 [
234 {
235 "loc": ("query",),
236 "msg": (
237 "The interval is too big. "
238 "Try to change the interval or reduce the date range."
239 ),
240 "type": "value_error",
241 "input": (start_date, end_date, interval),
242 }
243 ]
244 )
246 # Parse filter if provided
247 parsed_filter: Filter | None = None
248 if filter is not None:
249 try:
250 parsed_filter = Filter.model_validate_json(filter)
251 except ValidationError as e:
252 raise RequestValidationError(e.errors()) from e
254 if query is not None and organization_id is None:
255 raise RequestValidationError(
256 [
257 {
258 "type": "query",
259 "msg": "Query is only supported when organization_id is provided.",
260 }
261 ]
262 )
264 return await event_service.list_statistics_timeseries(
265 session,
266 auth_subject,
267 start_date=start_date,
268 end_date=end_date,
269 timezone=ZoneInfo(timezone),
270 interval=interval,
271 filter=parsed_filter,
272 organization_id=organization_id,
273 customer_id=customer_id,
274 external_customer_id=external_customer_id,
275 meter_id=meter_id,
276 name=name,
277 source=source,
278 event_type_id=event_type_id,
279 metadata=metadata,
280 query=query,
281 aggregate_fields=tuple(aggregate_fields),
282 hierarchy_stats_sorting=hierarchy_sorting,
283 )
286@router.get( 1a
287 "/names", summary="List Event Names", response_model=ListResource[EventName]
288)
289async def list_names( 1a
290 auth_subject: auth.EventRead,
291 pagination: PaginationParamsQuery,
292 sorting: sorting.EventNamesSorting,
293 session: AsyncSession = Depends(get_db_session),
294 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
295 None, title="OrganizationID Filter", description="Filter by organization ID."
296 ),
297 customer_id: MultipleQueryFilter[CustomerID] | None = Query(
298 None, title="CustomerID Filter", description="Filter by customer ID."
299 ),
300 external_customer_id: MultipleQueryFilter[str] | None = Query(
301 None,
302 title="ExternalCustomerID Filter",
303 description="Filter by external customer ID.",
304 ),
305 source: MultipleQueryFilter[EventSource] | None = Query(
306 None, title="Source Filter", description="Filter by event source."
307 ),
308 query: str | None = Query(
309 None, title="Query", description="Query to filter event names."
310 ),
311) -> ListResource[EventName]:
312 """List event names."""
313 results, count = await event_service.list_names(
314 session,
315 auth_subject,
316 organization_id=organization_id,
317 customer_id=customer_id,
318 external_customer_id=external_customer_id,
319 source=source,
320 query=query,
321 pagination=pagination,
322 sorting=sorting,
323 )
324 return ListResource.from_paginated_results(results, count, pagination)
327@router.get( 1a
328 "/{id}",
329 summary="Get Event",
330 response_model=EventSchema,
331 responses={404: EventNotFound},
332)
333async def get( 1a
334 id: EventID,
335 auth_subject: auth.EventRead,
336 session: AsyncSession = Depends(get_db_session),
337) -> Event:
338 """Get an event by ID."""
339 event = await event_service.get(session, auth_subject, id)
341 if event is None:
342 raise ResourceNotFound()
344 return event
347@router.post("/ingest", summary="Ingest Events") 1a
348async def ingest( 1a
349 ingest: EventsIngest,
350 auth_subject: auth.EventWrite,
351 session: AsyncSession = Depends(get_db_session),
352) -> EventsIngestResponse:
353 """Ingest batch of events."""
354 return await event_service.ingest(session, auth_subject, ingest)