Coverage for polar/meter/endpoints.py: 55%
47 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1from fastapi import Depends, Query 1a
2from pydantic import AwareDatetime 1a
4from polar.customer.schemas.customer import CustomerID 1a
5from polar.exceptions import PolarRequestValidationError, ResourceNotFound 1a
6from polar.kit.metadata import MetadataQuery, get_metadata_query_openapi_schema 1a
7from polar.kit.pagination import ListResource, PaginationParamsQuery 1a
8from polar.kit.schemas import MultipleQueryFilter 1a
9from polar.kit.time_queries import MIN_DATETIME, TimeInterval, is_under_limits 1a
10from polar.meter.aggregation import AggregationFunction 1a
11from polar.models import Meter 1a
12from polar.openapi import APITag 1a
13from polar.organization.schemas import OrganizationID 1a
14from polar.postgres import ( 1a
15 AsyncReadSession,
16 AsyncSession,
17 get_db_read_session,
18 get_db_session,
19)
20from polar.routing import APIRouter 1a
22from . import auth, sorting 1a
23from .schemas import Meter as MeterSchema 1a
24from .schemas import MeterCreate, MeterID, MeterQuantities, MeterUpdate 1a
25from .service import meter as meter_service 1a
27router = APIRouter(prefix="/meters", tags=["meters", APITag.public]) 1a
30MeterNotFound = { 1a
31 "description": "Meter not found.",
32 "model": ResourceNotFound.schema(),
33}
36@router.get( 1a
37 "/",
38 summary="List Meters",
39 response_model=ListResource[MeterSchema],
40 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]},
41)
42async def list( 1a
43 auth_subject: auth.MeterRead,
44 pagination: PaginationParamsQuery,
45 sorting: sorting.ListSorting,
46 metadata: MetadataQuery,
47 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
48 None, title="OrganizationID Filter", description="Filter by organization ID."
49 ),
50 query: str | None = Query(None, description="Filter by name."),
51 is_archived: bool | None = Query(None, description="Filter on archived meters."),
52 session: AsyncReadSession = Depends(get_db_read_session),
53) -> ListResource[MeterSchema]:
54 """List meters."""
55 results, count = await meter_service.list(
56 session,
57 auth_subject,
58 organization_id=organization_id,
59 metadata=metadata,
60 query=query,
61 is_archived=is_archived,
62 pagination=pagination,
63 sorting=sorting,
64 )
66 return ListResource.from_paginated_results(
67 [MeterSchema.model_validate(result) for result in results],
68 count,
69 pagination,
70 )
73@router.get( 1a
74 "/{id}",
75 summary="Get Meter",
76 response_model=MeterSchema,
77 responses={404: MeterNotFound},
78)
79async def get( 1a
80 id: MeterID,
81 auth_subject: auth.MeterRead,
82 session: AsyncReadSession = Depends(get_db_read_session),
83) -> Meter:
84 """Get a meter by ID."""
85 meter = await meter_service.get(session, auth_subject, id)
87 if meter is None:
88 raise ResourceNotFound()
90 return meter
93@router.get( 1a
94 "/{id}/quantities",
95 summary="Get Meter Quantities",
96 response_model=MeterQuantities,
97 responses={404: MeterNotFound},
98 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]},
99)
100async def quantities( 1a
101 id: MeterID,
102 auth_subject: auth.MeterRead,
103 metadata: MetadataQuery,
104 start_timestamp: AwareDatetime = Query(
105 ...,
106 description="Start timestamp.",
107 ge=MIN_DATETIME, # type: ignore
108 ),
109 end_timestamp: AwareDatetime = Query(..., description="End timestamp."),
110 interval: TimeInterval = Query(..., description="Interval between two timestamps."),
111 customer_id: MultipleQueryFilter[CustomerID] | None = Query(
112 None, title="CustomerID Filter", description="Filter by customer ID."
113 ),
114 external_customer_id: MultipleQueryFilter[str] | None = Query(
115 None,
116 title="ExternalCustomerID Filter",
117 description="Filter by external customer ID.",
118 ),
119 customer_aggregation_function: AggregationFunction | None = Query(
120 None,
121 description=(
122 "If set, will first compute the quantities per customer before aggregating "
123 "them using the given function. "
124 "If not set, the quantities will be aggregated across all events."
125 ),
126 ),
127 session: AsyncReadSession = Depends(get_db_read_session),
128) -> MeterQuantities:
129 """Get quantities of a meter over a time period."""
130 meter = await meter_service.get(session, auth_subject, id)
132 if meter is None:
133 raise ResourceNotFound()
135 if not is_under_limits(start_timestamp, end_timestamp, interval):
136 raise PolarRequestValidationError(
137 [
138 {
139 "loc": ("query",),
140 "msg": (
141 "The interval is too big. "
142 "Try to change the interval or reduce the date range."
143 ),
144 "type": "value_error",
145 "input": (start_timestamp, end_timestamp, interval),
146 }
147 ]
148 )
150 return await meter_service.get_quantities(
151 session,
152 meter,
153 start_timestamp=start_timestamp,
154 end_timestamp=end_timestamp,
155 interval=interval,
156 customer_id=customer_id,
157 external_customer_id=external_customer_id,
158 metadata=metadata,
159 customer_aggregation_function=customer_aggregation_function,
160 )
163@router.post( 1a
164 "/",
165 response_model=MeterSchema,
166 status_code=201,
167 summary="Create Meter",
168 responses={201: {"description": "Meter created."}},
169)
170async def create( 1a
171 meter_create: MeterCreate,
172 auth_subject: auth.MeterWrite,
173 session: AsyncSession = Depends(get_db_session),
174) -> Meter:
175 """Create a meter."""
176 return await meter_service.create(session, meter_create, auth_subject)
179@router.patch( 1a
180 "/{id}",
181 response_model=MeterSchema,
182 summary="Update Meter",
183 responses={
184 200: {"description": "Meter updated."},
185 404: MeterNotFound,
186 },
187)
188async def update( 1a
189 id: MeterID,
190 meter_update: MeterUpdate,
191 auth_subject: auth.MeterWrite,
192 session: AsyncSession = Depends(get_db_session),
193) -> Meter:
194 """Update a meter."""
195 meter = await meter_service.get(session, auth_subject, id)
197 if meter is None:
198 raise ResourceNotFound()
200 return await meter_service.update(session, meter, meter_update)