Coverage for polar/customer_portal/service/customer.py: 23%
97 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1from collections.abc import Sequence 1a
2from typing import cast 1a
3from uuid import UUID 1a
5import stripe as stripe_lib 1a
7from polar.auth.models import AuthSubject 1a
8from polar.customer.repository import CustomerRepository 1a
9from polar.exceptions import PolarError, PolarRequestValidationError 1a
10from polar.integrations.stripe.service import stripe as stripe_service 1a
11from polar.integrations.stripe.utils import get_expandable_id 1a
12from polar.kit.pagination import PaginationParams 1a
13from polar.kit.tax import InvalidTaxID, to_stripe_tax_id, validate_tax_id 1a
14from polar.models import Customer, PaymentMethod 1a
15from polar.payment_method.service import payment_method as payment_method_service 1a
16from polar.postgres import AsyncSession 1a
18from ..repository.payment_method import CustomerPaymentMethodRepository 1a
19from ..schemas.customer import ( 1a
20 CustomerPaymentMethodConfirm,
21 CustomerPaymentMethodCreate,
22 CustomerPaymentMethodCreateRequiresActionResponse,
23 CustomerPaymentMethodCreateResponse,
24 CustomerPaymentMethodCreateSucceededResponse,
25 CustomerPortalCustomerUpdate,
26)
29class CustomerError(PolarError): ... 1a
32class CustomerNotReady(CustomerError): 1a
33 def __init__(self, customer: Customer) -> None: 1a
34 self.customer = customer
35 super().__init__("Customer is not ready for this operation.", 403)
38class CustomerService: 1a
39 async def update( 1a
40 self,
41 session: AsyncSession,
42 customer: Customer,
43 customer_update: CustomerPortalCustomerUpdate,
44 ) -> Customer:
45 if customer_update.billing_name is not None:
46 customer.billing_name = customer_update.billing_name
48 customer.billing_address = (
49 customer_update.billing_address or customer.billing_address
50 )
52 tax_id = customer_update.tax_id or (
53 customer.tax_id[0] if customer.tax_id else None
54 )
55 if tax_id is not None:
56 if customer.billing_address is None:
57 raise PolarRequestValidationError(
58 [
59 {
60 "type": "missing",
61 "loc": ("body", "billing_address"),
62 "msg": "Country is required to validate tax ID.",
63 "input": None,
64 }
65 ]
66 )
67 try:
68 customer.tax_id = validate_tax_id(
69 tax_id, customer.billing_address.country
70 )
71 except InvalidTaxID as e:
72 raise PolarRequestValidationError(
73 [
74 {
75 "type": "invalid",
76 "loc": ("body", "tax_id"),
77 "msg": "Invalid tax ID.",
78 "input": customer_update.tax_id,
79 }
80 ]
81 ) from e
83 repository = CustomerRepository.from_session(session)
84 customer = await repository.update(
85 customer,
86 update_dict=customer_update.model_dump(
87 exclude_unset=True,
88 exclude={"billing_name", "billing_address", "tax_id"},
89 ),
90 )
92 if customer.stripe_customer_id is not None:
93 params: stripe_lib.Customer.ModifyParams = {"email": customer.email}
94 if customer.billing_name is not None and customer.name is None:
95 params["name"] = customer.billing_name
96 if customer.billing_address is not None:
97 params["address"] = customer.billing_address.to_dict() # type: ignore
98 await stripe_service.update_customer(
99 customer.stripe_customer_id,
100 tax_id=to_stripe_tax_id(customer.tax_id)
101 if customer.tax_id is not None
102 else None,
103 **params,
104 )
106 return customer
108 async def list_payment_methods( 1a
109 self,
110 session: AsyncSession,
111 auth_subject: AuthSubject[Customer],
112 *,
113 pagination: PaginationParams,
114 ) -> tuple[Sequence[PaymentMethod], int]:
115 repository = CustomerPaymentMethodRepository.from_session(session)
116 statement = repository.get_readable_statement(auth_subject).order_by(
117 PaymentMethod.created_at.desc()
118 )
119 return await repository.paginate(
120 statement, limit=pagination.limit, page=pagination.page
121 )
123 async def get_payment_method( 1a
124 self, session: AsyncSession, auth_subject: AuthSubject[Customer], id: UUID
125 ) -> PaymentMethod | None:
126 repository = CustomerPaymentMethodRepository.from_session(session)
127 statement = repository.get_readable_statement(auth_subject).where(
128 PaymentMethod.id == id
129 )
130 return await repository.get_one_or_none(statement)
132 async def add_payment_method( 1a
133 self,
134 session: AsyncSession,
135 customer: Customer,
136 payment_method_create: CustomerPaymentMethodCreate,
137 ) -> CustomerPaymentMethodCreateResponse:
138 if customer.stripe_customer_id is None:
139 params: stripe_lib.Customer.CreateParams = {
140 "email": customer.email,
141 }
142 if customer.name is not None:
143 params["name"] = customer.name
144 if customer.billing_address is not None:
145 params["address"] = customer.billing_address.to_dict() # type: ignore
146 if customer.tax_id is not None:
147 params["tax_id_data"] = [to_stripe_tax_id(customer.tax_id)]
148 stripe_customer = await stripe_service.create_customer(**params)
149 repository = CustomerRepository.from_session(session)
150 customer = await repository.update(
151 customer, update_dict={"stripe_customer_id": stripe_customer.id}
152 )
153 assert customer.stripe_customer_id is not None
155 setup_intent = await stripe_service.create_setup_intent(
156 automatic_payment_methods={"enabled": True},
157 confirm=True,
158 confirmation_token=payment_method_create.confirmation_token_id,
159 customer=customer.stripe_customer_id,
160 metadata={
161 "customer_id": str(customer.id),
162 },
163 return_url=payment_method_create.return_url,
164 expand=["payment_method"],
165 payment_method_options={ # type: ignore
166 "klarna": {"currency": "usd"},
167 },
168 )
170 return await self._save_payment_method(
171 session,
172 customer,
173 setup_intent,
174 set_default=payment_method_create.set_default,
175 )
177 async def confirm_payment_method( 1a
178 self,
179 session: AsyncSession,
180 customer: Customer,
181 payment_method_confirm: CustomerPaymentMethodConfirm,
182 ) -> CustomerPaymentMethodCreateResponse:
183 if customer.stripe_customer_id is None:
184 raise CustomerNotReady(customer)
186 try:
187 setup_intent = await stripe_service.get_setup_intent(
188 payment_method_confirm.setup_intent_id,
189 expand=["payment_method"],
190 )
191 except stripe_lib.InvalidRequestError as e:
192 raise PolarRequestValidationError(
193 [
194 {
195 "type": "invalid",
196 "loc": ("body", "setup_intent_id"),
197 "msg": "Invalid setup_intent_id.",
198 "input": payment_method_confirm.setup_intent_id,
199 }
200 ]
201 ) from e
203 if (
204 setup_intent.customer is None
205 or get_expandable_id(setup_intent.customer) != customer.stripe_customer_id
206 ):
207 raise PolarRequestValidationError(
208 [
209 {
210 "type": "invalid",
211 "loc": ("body", "setup_intent_id"),
212 "msg": "Invalid setup_intent_id.",
213 "input": payment_method_confirm.setup_intent_id,
214 }
215 ]
216 )
218 return await self._save_payment_method(
219 session,
220 customer,
221 setup_intent,
222 set_default=payment_method_confirm.set_default,
223 )
225 async def _save_payment_method( 1a
226 self,
227 session: AsyncSession,
228 customer: Customer,
229 setup_intent: stripe_lib.SetupIntent,
230 set_default: bool,
231 ) -> CustomerPaymentMethodCreateResponse:
232 assert customer.stripe_customer_id is not None
234 if setup_intent.status == "requires_action":
235 assert setup_intent.client_secret is not None
236 return CustomerPaymentMethodCreateRequiresActionResponse(
237 status="requires_action",
238 client_secret=setup_intent.client_secret,
239 )
241 if set_default:
242 assert setup_intent.payment_method is not None
243 await stripe_service.update_customer(
244 customer.stripe_customer_id,
245 invoice_settings={
246 "default_payment_method": get_expandable_id(
247 setup_intent.payment_method
248 )
249 },
250 )
252 payment_method = await payment_method_service.upsert_from_stripe(
253 session,
254 customer,
255 cast(stripe_lib.PaymentMethod, setup_intent.payment_method),
256 flush=True,
257 )
258 if set_default:
259 repository = CustomerRepository.from_session(session)
260 customer = await repository.update(
261 customer, update_dict={"default_payment_method": payment_method}
262 )
264 return CustomerPaymentMethodCreateSucceededResponse.model_validate(
265 {
266 "status": "succeeded",
267 "payment_method": payment_method,
268 }
269 )
271 async def delete_payment_method( 1a
272 self, session: AsyncSession, payment_method: PaymentMethod
273 ) -> None:
274 # Get the customer before deletion to trigger webhooks
275 customer_repository = CustomerRepository.from_session(session)
276 customer = await customer_repository.get_by_id(payment_method.customer_id)
277 assert customer is not None
279 await payment_method_service.delete(session, payment_method)
281 # Trigger customer update webhooks
282 # This ensures customer.updated and customer.state_changed events are sent
283 await customer_repository.update(customer, flush=True)
286customer = CustomerService() 1a