Coverage for polar/customer/endpoints.py: 26%

135 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import json 1a

2from collections.abc import AsyncGenerator 1a

3 

4from fastapi import Depends, Query, Response 1a

5from fastapi.responses import StreamingResponse 1a

6 

7from polar.exceptions import ResourceNotFound 1a

8from polar.kit.csv import IterableCSVWriter 1a

9from polar.kit.metadata import MetadataQuery, get_metadata_query_openapi_schema 1a

10from polar.kit.pagination import ListResource, PaginationParamsQuery 1a

11from polar.kit.schemas import MultipleQueryFilter 1a

12from polar.member import member_service 1a

13from polar.member.schemas import Member as MemberSchema 1a

14from polar.openapi import APITag 1a

15from polar.organization.schemas import OrganizationID 1a

16from polar.postgres import ( 1a

17 AsyncReadSession, 

18 AsyncSession, 

19 get_db_read_session, 

20 get_db_session, 

21) 

22from polar.redis import Redis, get_redis 1a

23from polar.routing import APIRouter 1a

24 

25from . import auth, sorting 1a

26from .repository import CustomerRepository 1a

27from .schemas.customer import Customer as CustomerSchema 1a

28from .schemas.customer import ( 1a

29 CustomerCreate, 

30 CustomerID, 

31 CustomerUpdate, 

32 CustomerUpdateExternalID, 

33 CustomerWithMembers, 

34 ExternalCustomerID, 

35) 

36from .schemas.state import CustomerState 1a

37from .service import customer as customer_service 1a

38 

39router = APIRouter( 1a

40 prefix="/customers", 

41 tags=["customers", APITag.public, APITag.mcp], 

42) 

43 

44 

45CustomerNotFound = { 1a

46 "description": "Customer not found.", 

47 "model": ResourceNotFound.schema(), 

48} 

49 

50 

51@router.get( 1a

52 "/", 

53 summary="List Customers", 

54 response_model=ListResource[CustomerWithMembers], 

55 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]}, 

56) 

57async def list( 1a

58 auth_subject: auth.CustomerRead, 

59 pagination: PaginationParamsQuery, 

60 sorting: sorting.ListSorting, 

61 metadata: MetadataQuery, 

62 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

63 None, title="OrganizationID Filter", description="Filter by organization ID." 

64 ), 

65 email: str | None = Query(None, description="Filter by exact email."), 

66 query: str | None = Query( 

67 None, description="Filter by name, email, or external ID." 

68 ), 

69 include_members: bool = Query( 

70 False, 

71 description="Include members in the response. Only populated when set to true.", 

72 ), 

73 session: AsyncReadSession = Depends(get_db_read_session), 

74) -> ListResource[CustomerWithMembers]: 

75 """List customers.""" 

76 results, count = await customer_service.list( 

77 session, 

78 auth_subject, 

79 organization_id=organization_id, 

80 email=email, 

81 metadata=metadata, 

82 query=query, 

83 pagination=pagination, 

84 sorting=sorting, 

85 ) 

86 

87 customers_with_members = [] 

88 if include_members and results: 

89 customer_ids = [result.id for result in results] 

90 all_members = await member_service.list_by_customers(session, customer_ids) 

91 

92 members_by_customer = {} 

93 for member in all_members: 

94 member_schema = MemberSchema.model_validate(member) 

95 if member.customer_id not in members_by_customer: 

96 members_by_customer[member.customer_id] = [member_schema] 

97 else: 

98 members_by_customer[member.customer_id].append(member_schema) 

99 

100 for result in results: 

101 customer_dict = CustomerSchema.model_validate(result).model_dump() 

102 customer_dict["members"] = members_by_customer.get(result.id, []) 

103 customers_with_members.append(CustomerWithMembers(**customer_dict)) 

104 else: 

105 for r in results: 

106 customer_dict = CustomerSchema.model_validate(r).model_dump() 

107 customer_dict["members"] = [] 

108 customers_with_members.append(CustomerWithMembers(**customer_dict)) 

109 

110 return ListResource.from_paginated_results( 

111 customers_with_members, 

112 count, 

113 pagination, 

114 ) 

115 

116 

117@router.get("/export", summary="Export Customers") 1a

118async def export( 1a

119 auth_subject: auth.CustomerRead, 

120 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

121 None, description="Filter by organization ID." 

122 ), 

123 session: AsyncReadSession = Depends(get_db_read_session), 

124) -> Response: 

125 """Export customers as a CSV file.""" 

126 

127 async def create_csv() -> AsyncGenerator[str, None]: 

128 csv_writer = IterableCSVWriter(dialect="excel") 

129 

130 yield csv_writer.getrow( 

131 ( 

132 "ID", 

133 "External ID", 

134 "Created At", 

135 "Email", 

136 "Name", 

137 "Tax ID", 

138 "Billing Address Line 1", 

139 "Billing Address Line 2", 

140 "Billing Address City", 

141 "Billing Address State", 

142 "Billing Address Zip", 

143 "Billing Address Country", 

144 "Metadata", 

145 ) 

146 ) 

147 

148 repository = CustomerRepository.from_session(session) 

149 stream = repository.stream_by_organization(auth_subject, organization_id) 

150 

151 async for customer in stream: 

152 billing_address = customer.billing_address 

153 

154 yield csv_writer.getrow( 

155 ( 

156 customer.id, 

157 customer.external_id, 

158 customer.created_at.isoformat(), 

159 customer.email, 

160 customer.name, 

161 customer.tax_id, 

162 billing_address.line1 if billing_address else None, 

163 billing_address.line2 if billing_address else None, 

164 billing_address.city if billing_address else None, 

165 billing_address.state if billing_address else None, 

166 billing_address.postal_code if billing_address else None, 

167 billing_address.country if billing_address else None, 

168 json.dumps(customer.user_metadata) 

169 if customer.user_metadata 

170 else None, 

171 ) 

172 ) 

173 

174 filename = "polar-customers.csv" 

175 return StreamingResponse( 

176 create_csv(), 

177 media_type="text/csv", 

178 headers={"Content-Disposition": f"attachment; filename={filename}"}, 

179 ) 

180 

181 

182@router.get( 1a

183 "/{id}", 

184 summary="Get Customer", 

185 response_model=CustomerWithMembers, 

186 responses={404: CustomerNotFound}, 

187) 

188async def get( 1a

189 id: CustomerID, 

190 auth_subject: auth.CustomerRead, 

191 include_members: bool = Query( 

192 False, 

193 description="Include members in the response. Only populated when set to true.", 

194 ), 

195 session: AsyncReadSession = Depends(get_db_read_session), 

196) -> CustomerWithMembers: 

197 """Get a customer by ID.""" 

198 customer = await customer_service.get(session, auth_subject, id) 

199 

200 if customer is None: 

201 raise ResourceNotFound() 

202 

203 customer_dict = CustomerSchema.model_validate(customer).model_dump() 

204 if include_members: 

205 customer_dict["members"] = await customer_service.load_members( 

206 session, customer.id 

207 ) 

208 else: 

209 customer_dict["members"] = [] 

210 return CustomerWithMembers(**customer_dict) 

211 

212 

213@router.get( 1a

214 "/external/{external_id}", 

215 summary="Get Customer by External ID", 

216 response_model=CustomerWithMembers, 

217 responses={404: CustomerNotFound}, 

218) 

219async def get_external( 1a

220 external_id: ExternalCustomerID, 

221 auth_subject: auth.CustomerRead, 

222 include_members: bool = Query( 

223 False, 

224 description="Include members in the response. Only populated when set to true.", 

225 ), 

226 session: AsyncReadSession = Depends(get_db_read_session), 

227) -> CustomerWithMembers: 

228 """Get a customer by external ID.""" 

229 customer = await customer_service.get_external(session, auth_subject, external_id) 

230 

231 if customer is None: 

232 raise ResourceNotFound() 

233 

234 customer_dict = CustomerSchema.model_validate(customer).model_dump() 

235 if include_members: 

236 customer_dict["members"] = await customer_service.load_members( 

237 session, customer.id 

238 ) 

239 else: 

240 customer_dict["members"] = [] 

241 return CustomerWithMembers(**customer_dict) 

242 

243 

244@router.get( 1a

245 "/{id}/state", 

246 summary="Get Customer State", 

247 response_model=CustomerState, 

248 responses={404: CustomerNotFound}, 

249) 

250async def get_state( 1a

251 id: CustomerID, 

252 auth_subject: auth.CustomerRead, 

253 session: AsyncReadSession = Depends(get_db_read_session), 

254 redis: Redis = Depends(get_redis), 

255) -> CustomerState: 

256 """ 

257 Get a customer state by ID. 

258 

259 The customer state includes information about 

260 the customer's active subscriptions and benefits. 

261 

262 It's the ideal endpoint to use when you need to get a full overview 

263 of a customer's status. 

264 """ 

265 customer = await customer_service.get(session, auth_subject, id) 

266 

267 if customer is None: 

268 raise ResourceNotFound() 

269 

270 return await customer_service.get_state(session, redis, customer) 

271 

272 

273@router.get( 1a

274 "/external/{external_id}/state", 

275 summary="Get Customer State by External ID", 

276 response_model=CustomerState, 

277 responses={404: CustomerNotFound}, 

278) 

279async def get_state_external( 1a

280 external_id: ExternalCustomerID, 

281 auth_subject: auth.CustomerRead, 

282 session: AsyncReadSession = Depends(get_db_read_session), 

283 redis: Redis = Depends(get_redis), 

284) -> CustomerState: 

285 """ 

286 Get a customer state by external ID. 

287 

288 The customer state includes information about 

289 the customer's active subscriptions and benefits. 

290 

291 It's the ideal endpoint to use when you need to get a full overview 

292 of a customer's status. 

293 """ 

294 customer = await customer_service.get_external(session, auth_subject, external_id) 

295 

296 if customer is None: 

297 raise ResourceNotFound() 

298 

299 return await customer_service.get_state(session, redis, customer) 

300 

301 

302@router.post( 1a

303 "/", 

304 response_model=CustomerWithMembers, 

305 status_code=201, 

306 summary="Create Customer", 

307 responses={201: {"description": "Customer created."}}, 

308) 

309async def create( 1a

310 customer_create: CustomerCreate, 

311 auth_subject: auth.CustomerWrite, 

312 include_members: bool = Query( 

313 False, 

314 description="Include members in the response. Only populated when set to true.", 

315 ), 

316 session: AsyncSession = Depends(get_db_session), 

317) -> CustomerWithMembers: 

318 """Create a customer.""" 

319 created_customer = await customer_service.create( 

320 session, customer_create, auth_subject 

321 ) 

322 

323 customer = await session.get(type(created_customer), created_customer.id) 

324 if customer is None: 

325 raise ResourceNotFound() 

326 

327 customer_dict = CustomerSchema.model_validate(customer).model_dump() 

328 if include_members: 

329 customer_dict["members"] = await customer_service.load_members( 

330 session, customer.id 

331 ) 

332 else: 

333 customer_dict["members"] = [] 

334 return CustomerWithMembers(**customer_dict) 

335 

336 

337@router.patch( 1a

338 "/{id}", 

339 response_model=CustomerWithMembers, 

340 summary="Update Customer", 

341 responses={ 

342 200: {"description": "Customer updated."}, 

343 404: CustomerNotFound, 

344 }, 

345) 

346async def update( 1a

347 id: CustomerID, 

348 customer_update: CustomerUpdate, 

349 auth_subject: auth.CustomerWrite, 

350 include_members: bool = Query( 

351 False, 

352 description="Include members in the response. Only populated when set to true.", 

353 ), 

354 session: AsyncSession = Depends(get_db_session), 

355) -> CustomerWithMembers: 

356 """Update a customer.""" 

357 customer = await customer_service.get(session, auth_subject, id) 

358 

359 if customer is None: 

360 raise ResourceNotFound() 

361 

362 updated_customer = await customer_service.update(session, customer, customer_update) 

363 

364 customer_dict = CustomerSchema.model_validate(updated_customer).model_dump() 

365 if include_members: 

366 customer_dict["members"] = await customer_service.load_members( 

367 session, updated_customer.id 

368 ) 

369 else: 

370 customer_dict["members"] = [] 

371 return CustomerWithMembers(**customer_dict) 

372 

373 

374@router.patch( 1a

375 "/external/{external_id}", 

376 response_model=CustomerWithMembers, 

377 summary="Update Customer by External ID", 

378 responses={ 

379 200: {"description": "Customer updated."}, 

380 404: CustomerNotFound, 

381 }, 

382) 

383async def update_external( 1a

384 external_id: ExternalCustomerID, 

385 customer_update: CustomerUpdateExternalID, 

386 auth_subject: auth.CustomerWrite, 

387 include_members: bool = Query( 

388 False, 

389 description="Include members in the response. Only populated when set to true.", 

390 ), 

391 session: AsyncSession = Depends(get_db_session), 

392) -> CustomerWithMembers: 

393 """Update a customer by external ID.""" 

394 customer = await customer_service.get_external(session, auth_subject, external_id) 

395 

396 if customer is None: 

397 raise ResourceNotFound() 

398 

399 updated_customer = await customer_service.update(session, customer, customer_update) 

400 

401 customer_dict = CustomerSchema.model_validate(updated_customer).model_dump() 

402 if include_members: 

403 customer_dict["members"] = await customer_service.load_members( 

404 session, updated_customer.id 

405 ) 

406 else: 

407 customer_dict["members"] = [] 

408 return CustomerWithMembers(**customer_dict) 

409 

410 

411@router.delete( 1a

412 "/{id}", 

413 status_code=204, 

414 summary="Delete Customer", 

415 responses={ 

416 204: {"description": "Customer deleted."}, 

417 404: CustomerNotFound, 

418 }, 

419) 

420async def delete( 1a

421 id: CustomerID, 

422 auth_subject: auth.CustomerWrite, 

423 session: AsyncSession = Depends(get_db_session), 

424) -> None: 

425 """ 

426 Delete a customer. 

427 

428 This action cannot be undone and will immediately: 

429 - Cancel any active subscriptions for the customer 

430 - Revoke all their benefits 

431 - Clear any `external_id` 

432 

433 Use it only in the context of deleting a user within your 

434 own service. Otherwise, use more granular API endpoints to cancel 

435 a specific subscription or revoke certain benefits. 

436 

437 Note: The customers information will nonetheless be retained for historic 

438 orders and subscriptions. 

439 """ 

440 customer = await customer_service.get(session, auth_subject, id) 

441 

442 if customer is None: 

443 raise ResourceNotFound() 

444 

445 await customer_service.delete(session, customer) 

446 

447 

448@router.delete( 1a

449 "/external/{external_id}", 

450 status_code=204, 

451 summary="Delete Customer by External ID", 

452 responses={ 

453 204: {"description": "Customer deleted."}, 

454 404: CustomerNotFound, 

455 }, 

456) 

457async def delete_external( 1a

458 external_id: ExternalCustomerID, 

459 auth_subject: auth.CustomerWrite, 

460 session: AsyncSession = Depends(get_db_session), 

461) -> None: 

462 """ 

463 Delete a customer by external ID. 

464 

465 Immediately cancels any active subscriptions and revokes any active benefits. 

466 """ 

467 customer = await customer_service.get_external(session, auth_subject, external_id) 

468 

469 if customer is None: 

470 raise ResourceNotFound() 

471 

472 await customer_service.delete(session, customer)