Coverage for polar/customer/endpoints.py: 26%
135 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import json 1a
2from collections.abc import AsyncGenerator 1a
4from fastapi import Depends, Query, Response 1a
5from fastapi.responses import StreamingResponse 1a
7from polar.exceptions import ResourceNotFound 1a
8from polar.kit.csv import IterableCSVWriter 1a
9from polar.kit.metadata import MetadataQuery, get_metadata_query_openapi_schema 1a
10from polar.kit.pagination import ListResource, PaginationParamsQuery 1a
11from polar.kit.schemas import MultipleQueryFilter 1a
12from polar.member import member_service 1a
13from polar.member.schemas import Member as MemberSchema 1a
14from polar.openapi import APITag 1a
15from polar.organization.schemas import OrganizationID 1a
16from polar.postgres import ( 1a
17 AsyncReadSession,
18 AsyncSession,
19 get_db_read_session,
20 get_db_session,
21)
22from polar.redis import Redis, get_redis 1a
23from polar.routing import APIRouter 1a
25from . import auth, sorting 1a
26from .repository import CustomerRepository 1a
27from .schemas.customer import Customer as CustomerSchema 1a
28from .schemas.customer import ( 1a
29 CustomerCreate,
30 CustomerID,
31 CustomerUpdate,
32 CustomerUpdateExternalID,
33 CustomerWithMembers,
34 ExternalCustomerID,
35)
36from .schemas.state import CustomerState 1a
37from .service import customer as customer_service 1a
39router = APIRouter( 1a
40 prefix="/customers",
41 tags=["customers", APITag.public, APITag.mcp],
42)
45CustomerNotFound = { 1a
46 "description": "Customer not found.",
47 "model": ResourceNotFound.schema(),
48}
51@router.get( 1a
52 "/",
53 summary="List Customers",
54 response_model=ListResource[CustomerWithMembers],
55 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]},
56)
57async def list( 1a
58 auth_subject: auth.CustomerRead,
59 pagination: PaginationParamsQuery,
60 sorting: sorting.ListSorting,
61 metadata: MetadataQuery,
62 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
63 None, title="OrganizationID Filter", description="Filter by organization ID."
64 ),
65 email: str | None = Query(None, description="Filter by exact email."),
66 query: str | None = Query(
67 None, description="Filter by name, email, or external ID."
68 ),
69 include_members: bool = Query(
70 False,
71 description="Include members in the response. Only populated when set to true.",
72 ),
73 session: AsyncReadSession = Depends(get_db_read_session),
74) -> ListResource[CustomerWithMembers]:
75 """List customers."""
76 results, count = await customer_service.list(
77 session,
78 auth_subject,
79 organization_id=organization_id,
80 email=email,
81 metadata=metadata,
82 query=query,
83 pagination=pagination,
84 sorting=sorting,
85 )
87 customers_with_members = []
88 if include_members and results:
89 customer_ids = [result.id for result in results]
90 all_members = await member_service.list_by_customers(session, customer_ids)
92 members_by_customer = {}
93 for member in all_members:
94 member_schema = MemberSchema.model_validate(member)
95 if member.customer_id not in members_by_customer:
96 members_by_customer[member.customer_id] = [member_schema]
97 else:
98 members_by_customer[member.customer_id].append(member_schema)
100 for result in results:
101 customer_dict = CustomerSchema.model_validate(result).model_dump()
102 customer_dict["members"] = members_by_customer.get(result.id, [])
103 customers_with_members.append(CustomerWithMembers(**customer_dict))
104 else:
105 for r in results:
106 customer_dict = CustomerSchema.model_validate(r).model_dump()
107 customer_dict["members"] = []
108 customers_with_members.append(CustomerWithMembers(**customer_dict))
110 return ListResource.from_paginated_results(
111 customers_with_members,
112 count,
113 pagination,
114 )
117@router.get("/export", summary="Export Customers") 1a
118async def export( 1a
119 auth_subject: auth.CustomerRead,
120 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
121 None, description="Filter by organization ID."
122 ),
123 session: AsyncReadSession = Depends(get_db_read_session),
124) -> Response:
125 """Export customers as a CSV file."""
127 async def create_csv() -> AsyncGenerator[str, None]:
128 csv_writer = IterableCSVWriter(dialect="excel")
130 yield csv_writer.getrow(
131 (
132 "ID",
133 "External ID",
134 "Created At",
135 "Email",
136 "Name",
137 "Tax ID",
138 "Billing Address Line 1",
139 "Billing Address Line 2",
140 "Billing Address City",
141 "Billing Address State",
142 "Billing Address Zip",
143 "Billing Address Country",
144 "Metadata",
145 )
146 )
148 repository = CustomerRepository.from_session(session)
149 stream = repository.stream_by_organization(auth_subject, organization_id)
151 async for customer in stream:
152 billing_address = customer.billing_address
154 yield csv_writer.getrow(
155 (
156 customer.id,
157 customer.external_id,
158 customer.created_at.isoformat(),
159 customer.email,
160 customer.name,
161 customer.tax_id,
162 billing_address.line1 if billing_address else None,
163 billing_address.line2 if billing_address else None,
164 billing_address.city if billing_address else None,
165 billing_address.state if billing_address else None,
166 billing_address.postal_code if billing_address else None,
167 billing_address.country if billing_address else None,
168 json.dumps(customer.user_metadata)
169 if customer.user_metadata
170 else None,
171 )
172 )
174 filename = "polar-customers.csv"
175 return StreamingResponse(
176 create_csv(),
177 media_type="text/csv",
178 headers={"Content-Disposition": f"attachment; filename={filename}"},
179 )
182@router.get( 1a
183 "/{id}",
184 summary="Get Customer",
185 response_model=CustomerWithMembers,
186 responses={404: CustomerNotFound},
187)
188async def get( 1a
189 id: CustomerID,
190 auth_subject: auth.CustomerRead,
191 include_members: bool = Query(
192 False,
193 description="Include members in the response. Only populated when set to true.",
194 ),
195 session: AsyncReadSession = Depends(get_db_read_session),
196) -> CustomerWithMembers:
197 """Get a customer by ID."""
198 customer = await customer_service.get(session, auth_subject, id)
200 if customer is None:
201 raise ResourceNotFound()
203 customer_dict = CustomerSchema.model_validate(customer).model_dump()
204 if include_members:
205 customer_dict["members"] = await customer_service.load_members(
206 session, customer.id
207 )
208 else:
209 customer_dict["members"] = []
210 return CustomerWithMembers(**customer_dict)
213@router.get( 1a
214 "/external/{external_id}",
215 summary="Get Customer by External ID",
216 response_model=CustomerWithMembers,
217 responses={404: CustomerNotFound},
218)
219async def get_external( 1a
220 external_id: ExternalCustomerID,
221 auth_subject: auth.CustomerRead,
222 include_members: bool = Query(
223 False,
224 description="Include members in the response. Only populated when set to true.",
225 ),
226 session: AsyncReadSession = Depends(get_db_read_session),
227) -> CustomerWithMembers:
228 """Get a customer by external ID."""
229 customer = await customer_service.get_external(session, auth_subject, external_id)
231 if customer is None:
232 raise ResourceNotFound()
234 customer_dict = CustomerSchema.model_validate(customer).model_dump()
235 if include_members:
236 customer_dict["members"] = await customer_service.load_members(
237 session, customer.id
238 )
239 else:
240 customer_dict["members"] = []
241 return CustomerWithMembers(**customer_dict)
244@router.get( 1a
245 "/{id}/state",
246 summary="Get Customer State",
247 response_model=CustomerState,
248 responses={404: CustomerNotFound},
249)
250async def get_state( 1a
251 id: CustomerID,
252 auth_subject: auth.CustomerRead,
253 session: AsyncReadSession = Depends(get_db_read_session),
254 redis: Redis = Depends(get_redis),
255) -> CustomerState:
256 """
257 Get a customer state by ID.
259 The customer state includes information about
260 the customer's active subscriptions and benefits.
262 It's the ideal endpoint to use when you need to get a full overview
263 of a customer's status.
264 """
265 customer = await customer_service.get(session, auth_subject, id)
267 if customer is None:
268 raise ResourceNotFound()
270 return await customer_service.get_state(session, redis, customer)
273@router.get( 1a
274 "/external/{external_id}/state",
275 summary="Get Customer State by External ID",
276 response_model=CustomerState,
277 responses={404: CustomerNotFound},
278)
279async def get_state_external( 1a
280 external_id: ExternalCustomerID,
281 auth_subject: auth.CustomerRead,
282 session: AsyncReadSession = Depends(get_db_read_session),
283 redis: Redis = Depends(get_redis),
284) -> CustomerState:
285 """
286 Get a customer state by external ID.
288 The customer state includes information about
289 the customer's active subscriptions and benefits.
291 It's the ideal endpoint to use when you need to get a full overview
292 of a customer's status.
293 """
294 customer = await customer_service.get_external(session, auth_subject, external_id)
296 if customer is None:
297 raise ResourceNotFound()
299 return await customer_service.get_state(session, redis, customer)
302@router.post( 1a
303 "/",
304 response_model=CustomerWithMembers,
305 status_code=201,
306 summary="Create Customer",
307 responses={201: {"description": "Customer created."}},
308)
309async def create( 1a
310 customer_create: CustomerCreate,
311 auth_subject: auth.CustomerWrite,
312 include_members: bool = Query(
313 False,
314 description="Include members in the response. Only populated when set to true.",
315 ),
316 session: AsyncSession = Depends(get_db_session),
317) -> CustomerWithMembers:
318 """Create a customer."""
319 created_customer = await customer_service.create(
320 session, customer_create, auth_subject
321 )
323 customer = await session.get(type(created_customer), created_customer.id)
324 if customer is None:
325 raise ResourceNotFound()
327 customer_dict = CustomerSchema.model_validate(customer).model_dump()
328 if include_members:
329 customer_dict["members"] = await customer_service.load_members(
330 session, customer.id
331 )
332 else:
333 customer_dict["members"] = []
334 return CustomerWithMembers(**customer_dict)
337@router.patch( 1a
338 "/{id}",
339 response_model=CustomerWithMembers,
340 summary="Update Customer",
341 responses={
342 200: {"description": "Customer updated."},
343 404: CustomerNotFound,
344 },
345)
346async def update( 1a
347 id: CustomerID,
348 customer_update: CustomerUpdate,
349 auth_subject: auth.CustomerWrite,
350 include_members: bool = Query(
351 False,
352 description="Include members in the response. Only populated when set to true.",
353 ),
354 session: AsyncSession = Depends(get_db_session),
355) -> CustomerWithMembers:
356 """Update a customer."""
357 customer = await customer_service.get(session, auth_subject, id)
359 if customer is None:
360 raise ResourceNotFound()
362 updated_customer = await customer_service.update(session, customer, customer_update)
364 customer_dict = CustomerSchema.model_validate(updated_customer).model_dump()
365 if include_members:
366 customer_dict["members"] = await customer_service.load_members(
367 session, updated_customer.id
368 )
369 else:
370 customer_dict["members"] = []
371 return CustomerWithMembers(**customer_dict)
374@router.patch( 1a
375 "/external/{external_id}",
376 response_model=CustomerWithMembers,
377 summary="Update Customer by External ID",
378 responses={
379 200: {"description": "Customer updated."},
380 404: CustomerNotFound,
381 },
382)
383async def update_external( 1a
384 external_id: ExternalCustomerID,
385 customer_update: CustomerUpdateExternalID,
386 auth_subject: auth.CustomerWrite,
387 include_members: bool = Query(
388 False,
389 description="Include members in the response. Only populated when set to true.",
390 ),
391 session: AsyncSession = Depends(get_db_session),
392) -> CustomerWithMembers:
393 """Update a customer by external ID."""
394 customer = await customer_service.get_external(session, auth_subject, external_id)
396 if customer is None:
397 raise ResourceNotFound()
399 updated_customer = await customer_service.update(session, customer, customer_update)
401 customer_dict = CustomerSchema.model_validate(updated_customer).model_dump()
402 if include_members:
403 customer_dict["members"] = await customer_service.load_members(
404 session, updated_customer.id
405 )
406 else:
407 customer_dict["members"] = []
408 return CustomerWithMembers(**customer_dict)
411@router.delete( 1a
412 "/{id}",
413 status_code=204,
414 summary="Delete Customer",
415 responses={
416 204: {"description": "Customer deleted."},
417 404: CustomerNotFound,
418 },
419)
420async def delete( 1a
421 id: CustomerID,
422 auth_subject: auth.CustomerWrite,
423 session: AsyncSession = Depends(get_db_session),
424) -> None:
425 """
426 Delete a customer.
428 This action cannot be undone and will immediately:
429 - Cancel any active subscriptions for the customer
430 - Revoke all their benefits
431 - Clear any `external_id`
433 Use it only in the context of deleting a user within your
434 own service. Otherwise, use more granular API endpoints to cancel
435 a specific subscription or revoke certain benefits.
437 Note: The customers information will nonetheless be retained for historic
438 orders and subscriptions.
439 """
440 customer = await customer_service.get(session, auth_subject, id)
442 if customer is None:
443 raise ResourceNotFound()
445 await customer_service.delete(session, customer)
448@router.delete( 1a
449 "/external/{external_id}",
450 status_code=204,
451 summary="Delete Customer by External ID",
452 responses={
453 204: {"description": "Customer deleted."},
454 404: CustomerNotFound,
455 },
456)
457async def delete_external( 1a
458 external_id: ExternalCustomerID,
459 auth_subject: auth.CustomerWrite,
460 session: AsyncSession = Depends(get_db_session),
461) -> None:
462 """
463 Delete a customer by external ID.
465 Immediately cancels any active subscriptions and revokes any active benefits.
466 """
467 customer = await customer_service.get_external(session, auth_subject, external_id)
469 if customer is None:
470 raise ResourceNotFound()
472 await customer_service.delete(session, customer)