Coverage for polar/meter/endpoints.py: 55%

47 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from fastapi import Depends, Query 1a

2from pydantic import AwareDatetime 1a

3 

4from polar.customer.schemas.customer import CustomerID 1a

5from polar.exceptions import PolarRequestValidationError, ResourceNotFound 1a

6from polar.kit.metadata import MetadataQuery, get_metadata_query_openapi_schema 1a

7from polar.kit.pagination import ListResource, PaginationParamsQuery 1a

8from polar.kit.schemas import MultipleQueryFilter 1a

9from polar.kit.time_queries import MIN_DATETIME, TimeInterval, is_under_limits 1a

10from polar.meter.aggregation import AggregationFunction 1a

11from polar.models import Meter 1a

12from polar.openapi import APITag 1a

13from polar.organization.schemas import OrganizationID 1a

14from polar.postgres import ( 1a

15 AsyncReadSession, 

16 AsyncSession, 

17 get_db_read_session, 

18 get_db_session, 

19) 

20from polar.routing import APIRouter 1a

21 

22from . import auth, sorting 1a

23from .schemas import Meter as MeterSchema 1a

24from .schemas import MeterCreate, MeterID, MeterQuantities, MeterUpdate 1a

25from .service import meter as meter_service 1a

26 

27router = APIRouter(prefix="/meters", tags=["meters", APITag.public]) 1a

28 

29 

30MeterNotFound = { 1a

31 "description": "Meter not found.", 

32 "model": ResourceNotFound.schema(), 

33} 

34 

35 

36@router.get( 1a

37 "/", 

38 summary="List Meters", 

39 response_model=ListResource[MeterSchema], 

40 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]}, 

41) 

42async def list( 1a

43 auth_subject: auth.MeterRead, 

44 pagination: PaginationParamsQuery, 

45 sorting: sorting.ListSorting, 

46 metadata: MetadataQuery, 

47 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

48 None, title="OrganizationID Filter", description="Filter by organization ID." 

49 ), 

50 query: str | None = Query(None, description="Filter by name."), 

51 is_archived: bool | None = Query(None, description="Filter on archived meters."), 

52 session: AsyncReadSession = Depends(get_db_read_session), 

53) -> ListResource[MeterSchema]: 

54 """List meters.""" 

55 results, count = await meter_service.list( 

56 session, 

57 auth_subject, 

58 organization_id=organization_id, 

59 metadata=metadata, 

60 query=query, 

61 is_archived=is_archived, 

62 pagination=pagination, 

63 sorting=sorting, 

64 ) 

65 

66 return ListResource.from_paginated_results( 

67 [MeterSchema.model_validate(result) for result in results], 

68 count, 

69 pagination, 

70 ) 

71 

72 

73@router.get( 1a

74 "/{id}", 

75 summary="Get Meter", 

76 response_model=MeterSchema, 

77 responses={404: MeterNotFound}, 

78) 

79async def get( 1a

80 id: MeterID, 

81 auth_subject: auth.MeterRead, 

82 session: AsyncReadSession = Depends(get_db_read_session), 

83) -> Meter: 

84 """Get a meter by ID.""" 

85 meter = await meter_service.get(session, auth_subject, id) 

86 

87 if meter is None: 

88 raise ResourceNotFound() 

89 

90 return meter 

91 

92 

93@router.get( 1a

94 "/{id}/quantities", 

95 summary="Get Meter Quantities", 

96 response_model=MeterQuantities, 

97 responses={404: MeterNotFound}, 

98 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]}, 

99) 

100async def quantities( 1a

101 id: MeterID, 

102 auth_subject: auth.MeterRead, 

103 metadata: MetadataQuery, 

104 start_timestamp: AwareDatetime = Query( 

105 ..., 

106 description="Start timestamp.", 

107 ge=MIN_DATETIME, # type: ignore 

108 ), 

109 end_timestamp: AwareDatetime = Query(..., description="End timestamp."), 

110 interval: TimeInterval = Query(..., description="Interval between two timestamps."), 

111 customer_id: MultipleQueryFilter[CustomerID] | None = Query( 

112 None, title="CustomerID Filter", description="Filter by customer ID." 

113 ), 

114 external_customer_id: MultipleQueryFilter[str] | None = Query( 

115 None, 

116 title="ExternalCustomerID Filter", 

117 description="Filter by external customer ID.", 

118 ), 

119 customer_aggregation_function: AggregationFunction | None = Query( 

120 None, 

121 description=( 

122 "If set, will first compute the quantities per customer before aggregating " 

123 "them using the given function. " 

124 "If not set, the quantities will be aggregated across all events." 

125 ), 

126 ), 

127 session: AsyncReadSession = Depends(get_db_read_session), 

128) -> MeterQuantities: 

129 """Get quantities of a meter over a time period.""" 

130 meter = await meter_service.get(session, auth_subject, id) 

131 

132 if meter is None: 

133 raise ResourceNotFound() 

134 

135 if not is_under_limits(start_timestamp, end_timestamp, interval): 

136 raise PolarRequestValidationError( 

137 [ 

138 { 

139 "loc": ("query",), 

140 "msg": ( 

141 "The interval is too big. " 

142 "Try to change the interval or reduce the date range." 

143 ), 

144 "type": "value_error", 

145 "input": (start_timestamp, end_timestamp, interval), 

146 } 

147 ] 

148 ) 

149 

150 return await meter_service.get_quantities( 

151 session, 

152 meter, 

153 start_timestamp=start_timestamp, 

154 end_timestamp=end_timestamp, 

155 interval=interval, 

156 customer_id=customer_id, 

157 external_customer_id=external_customer_id, 

158 metadata=metadata, 

159 customer_aggregation_function=customer_aggregation_function, 

160 ) 

161 

162 

163@router.post( 1a

164 "/", 

165 response_model=MeterSchema, 

166 status_code=201, 

167 summary="Create Meter", 

168 responses={201: {"description": "Meter created."}}, 

169) 

170async def create( 1a

171 meter_create: MeterCreate, 

172 auth_subject: auth.MeterWrite, 

173 session: AsyncSession = Depends(get_db_session), 

174) -> Meter: 

175 """Create a meter.""" 

176 return await meter_service.create(session, meter_create, auth_subject) 

177 

178 

179@router.patch( 1a

180 "/{id}", 

181 response_model=MeterSchema, 

182 summary="Update Meter", 

183 responses={ 

184 200: {"description": "Meter updated."}, 

185 404: MeterNotFound, 

186 }, 

187) 

188async def update( 1a

189 id: MeterID, 

190 meter_update: MeterUpdate, 

191 auth_subject: auth.MeterWrite, 

192 session: AsyncSession = Depends(get_db_session), 

193) -> Meter: 

194 """Update a meter.""" 

195 meter = await meter_service.get(session, auth_subject, id) 

196 

197 if meter is None: 

198 raise ResourceNotFound() 

199 

200 return await meter_service.update(session, meter, meter_update)