Coverage for polar/event_type/endpoints.py: 67%
25 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1from uuid import UUID 1a
3from fastapi import Depends, Query 1a
5from polar.customer.schemas.customer import CustomerID 1a
6from polar.event_type import auth, schemas, sorting 1a
7from polar.event_type.service import event_type_service 1a
8from polar.exceptions import ResourceNotFound 1a
9from polar.kit.pagination import ListResource, PaginationParamsQuery 1a
10from polar.kit.schemas import MultipleQueryFilter 1a
11from polar.models.event import EventSource 1a
12from polar.openapi import APITag 1a
13from polar.organization.schemas import OrganizationID 1a
14from polar.postgres import AsyncSession, get_db_session 1a
15from polar.routing import APIRouter 1a
17router = APIRouter(prefix="/event-types", tags=["event-types", APITag.public]) 1a
20@router.get( 1a
21 "/",
22 summary="List Event Types",
23 response_model=ListResource[schemas.EventTypeWithStats],
24)
25async def list( 1a
26 auth_subject: auth.EventTypeRead,
27 pagination: PaginationParamsQuery,
28 sorting: sorting.EventTypesSorting,
29 session: AsyncSession = Depends(get_db_session),
30 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
31 None, title="OrganizationID Filter", description="Filter by organization ID."
32 ),
33 customer_id: MultipleQueryFilter[CustomerID] | None = Query(
34 None, title="CustomerID Filter", description="Filter by customer ID."
35 ),
36 external_customer_id: MultipleQueryFilter[str] | None = Query(
37 None,
38 title="ExternalCustomerID Filter",
39 description="Filter by external customer ID.",
40 ),
41 query: str | None = Query(
42 None,
43 title="Query",
44 description="Query to filter event types by name or label.",
45 ),
46 root_events: bool = Query(
47 False,
48 title="Root Events Filter",
49 description="When true, only return event types with root events (parent_id IS NULL).",
50 ),
51 parent_id: UUID | None = Query(
52 None,
53 title="ParentID Filter",
54 description="Filter by specific parent event ID.",
55 ),
56 source: EventSource | None = Query(
57 None,
58 title="EventSource Filter",
59 description="Filter by event source (system or user).",
60 ),
61) -> ListResource[schemas.EventTypeWithStats]:
62 """List event types with aggregated statistics."""
63 results, count = await event_type_service.list_with_stats(
64 session,
65 auth_subject,
66 organization_id=organization_id,
67 customer_id=customer_id,
68 external_customer_id=external_customer_id,
69 query=query,
70 root_events=root_events,
71 parent_id=parent_id,
72 source=source,
73 pagination=pagination,
74 sorting=sorting,
75 )
76 return ListResource.from_paginated_results(results, count, pagination)
79@router.patch( 1a
80 "/{id}",
81 response_model=schemas.EventType,
82 summary="Update Event Type",
83 description="Update an event type's label.",
84 status_code=200,
85 responses={404: {}},
86)
87async def update( 1a
88 id: schemas.EventTypeID,
89 body: schemas.EventTypeUpdate,
90 auth_subject: auth.EventTypeWrite,
91 session: AsyncSession = Depends(get_db_session),
92) -> schemas.EventType:
93 event_type = await event_type_service.get(session, auth_subject, id)
94 if event_type is None:
95 raise ResourceNotFound()
97 updated_event_type = await event_type_service.update(
98 session, event_type, body.label, body.label_property_selector
99 )
100 return schemas.EventType.model_validate(updated_event_type)