Coverage for polar/customer_portal/service/customer.py: 23%

97 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from collections.abc import Sequence 1a

2from typing import cast 1a

3from uuid import UUID 1a

4 

5import stripe as stripe_lib 1a

6 

7from polar.auth.models import AuthSubject 1a

8from polar.customer.repository import CustomerRepository 1a

9from polar.exceptions import PolarError, PolarRequestValidationError 1a

10from polar.integrations.stripe.service import stripe as stripe_service 1a

11from polar.integrations.stripe.utils import get_expandable_id 1a

12from polar.kit.pagination import PaginationParams 1a

13from polar.kit.tax import InvalidTaxID, to_stripe_tax_id, validate_tax_id 1a

14from polar.models import Customer, PaymentMethod 1a

15from polar.payment_method.service import payment_method as payment_method_service 1a

16from polar.postgres import AsyncSession 1a

17 

18from ..repository.payment_method import CustomerPaymentMethodRepository 1a

19from ..schemas.customer import ( 1a

20 CustomerPaymentMethodConfirm, 

21 CustomerPaymentMethodCreate, 

22 CustomerPaymentMethodCreateRequiresActionResponse, 

23 CustomerPaymentMethodCreateResponse, 

24 CustomerPaymentMethodCreateSucceededResponse, 

25 CustomerPortalCustomerUpdate, 

26) 

27 

28 

29class CustomerError(PolarError): ... 1a

30 

31 

32class CustomerNotReady(CustomerError): 1a

33 def __init__(self, customer: Customer) -> None: 1a

34 self.customer = customer 

35 super().__init__("Customer is not ready for this operation.", 403) 

36 

37 

38class CustomerService: 1a

39 async def update( 1a

40 self, 

41 session: AsyncSession, 

42 customer: Customer, 

43 customer_update: CustomerPortalCustomerUpdate, 

44 ) -> Customer: 

45 if customer_update.billing_name is not None: 

46 customer.billing_name = customer_update.billing_name 

47 

48 customer.billing_address = ( 

49 customer_update.billing_address or customer.billing_address 

50 ) 

51 

52 tax_id = customer_update.tax_id or ( 

53 customer.tax_id[0] if customer.tax_id else None 

54 ) 

55 if tax_id is not None: 

56 if customer.billing_address is None: 

57 raise PolarRequestValidationError( 

58 [ 

59 { 

60 "type": "missing", 

61 "loc": ("body", "billing_address"), 

62 "msg": "Country is required to validate tax ID.", 

63 "input": None, 

64 } 

65 ] 

66 ) 

67 try: 

68 customer.tax_id = validate_tax_id( 

69 tax_id, customer.billing_address.country 

70 ) 

71 except InvalidTaxID as e: 

72 raise PolarRequestValidationError( 

73 [ 

74 { 

75 "type": "invalid", 

76 "loc": ("body", "tax_id"), 

77 "msg": "Invalid tax ID.", 

78 "input": customer_update.tax_id, 

79 } 

80 ] 

81 ) from e 

82 

83 repository = CustomerRepository.from_session(session) 

84 customer = await repository.update( 

85 customer, 

86 update_dict=customer_update.model_dump( 

87 exclude_unset=True, 

88 exclude={"billing_name", "billing_address", "tax_id"}, 

89 ), 

90 ) 

91 

92 if customer.stripe_customer_id is not None: 

93 params: stripe_lib.Customer.ModifyParams = {"email": customer.email} 

94 if customer.billing_name is not None and customer.name is None: 

95 params["name"] = customer.billing_name 

96 if customer.billing_address is not None: 

97 params["address"] = customer.billing_address.to_dict() # type: ignore 

98 await stripe_service.update_customer( 

99 customer.stripe_customer_id, 

100 tax_id=to_stripe_tax_id(customer.tax_id) 

101 if customer.tax_id is not None 

102 else None, 

103 **params, 

104 ) 

105 

106 return customer 

107 

108 async def list_payment_methods( 1a

109 self, 

110 session: AsyncSession, 

111 auth_subject: AuthSubject[Customer], 

112 *, 

113 pagination: PaginationParams, 

114 ) -> tuple[Sequence[PaymentMethod], int]: 

115 repository = CustomerPaymentMethodRepository.from_session(session) 

116 statement = repository.get_readable_statement(auth_subject).order_by( 

117 PaymentMethod.created_at.desc() 

118 ) 

119 return await repository.paginate( 

120 statement, limit=pagination.limit, page=pagination.page 

121 ) 

122 

123 async def get_payment_method( 1a

124 self, session: AsyncSession, auth_subject: AuthSubject[Customer], id: UUID 

125 ) -> PaymentMethod | None: 

126 repository = CustomerPaymentMethodRepository.from_session(session) 

127 statement = repository.get_readable_statement(auth_subject).where( 

128 PaymentMethod.id == id 

129 ) 

130 return await repository.get_one_or_none(statement) 

131 

132 async def add_payment_method( 1a

133 self, 

134 session: AsyncSession, 

135 customer: Customer, 

136 payment_method_create: CustomerPaymentMethodCreate, 

137 ) -> CustomerPaymentMethodCreateResponse: 

138 if customer.stripe_customer_id is None: 

139 params: stripe_lib.Customer.CreateParams = { 

140 "email": customer.email, 

141 } 

142 if customer.name is not None: 

143 params["name"] = customer.name 

144 if customer.billing_address is not None: 

145 params["address"] = customer.billing_address.to_dict() # type: ignore 

146 if customer.tax_id is not None: 

147 params["tax_id_data"] = [to_stripe_tax_id(customer.tax_id)] 

148 stripe_customer = await stripe_service.create_customer(**params) 

149 repository = CustomerRepository.from_session(session) 

150 customer = await repository.update( 

151 customer, update_dict={"stripe_customer_id": stripe_customer.id} 

152 ) 

153 assert customer.stripe_customer_id is not None 

154 

155 setup_intent = await stripe_service.create_setup_intent( 

156 automatic_payment_methods={"enabled": True}, 

157 confirm=True, 

158 confirmation_token=payment_method_create.confirmation_token_id, 

159 customer=customer.stripe_customer_id, 

160 metadata={ 

161 "customer_id": str(customer.id), 

162 }, 

163 return_url=payment_method_create.return_url, 

164 expand=["payment_method"], 

165 payment_method_options={ # type: ignore 

166 "klarna": {"currency": "usd"}, 

167 }, 

168 ) 

169 

170 return await self._save_payment_method( 

171 session, 

172 customer, 

173 setup_intent, 

174 set_default=payment_method_create.set_default, 

175 ) 

176 

177 async def confirm_payment_method( 1a

178 self, 

179 session: AsyncSession, 

180 customer: Customer, 

181 payment_method_confirm: CustomerPaymentMethodConfirm, 

182 ) -> CustomerPaymentMethodCreateResponse: 

183 if customer.stripe_customer_id is None: 

184 raise CustomerNotReady(customer) 

185 

186 try: 

187 setup_intent = await stripe_service.get_setup_intent( 

188 payment_method_confirm.setup_intent_id, 

189 expand=["payment_method"], 

190 ) 

191 except stripe_lib.InvalidRequestError as e: 

192 raise PolarRequestValidationError( 

193 [ 

194 { 

195 "type": "invalid", 

196 "loc": ("body", "setup_intent_id"), 

197 "msg": "Invalid setup_intent_id.", 

198 "input": payment_method_confirm.setup_intent_id, 

199 } 

200 ] 

201 ) from e 

202 

203 if ( 

204 setup_intent.customer is None 

205 or get_expandable_id(setup_intent.customer) != customer.stripe_customer_id 

206 ): 

207 raise PolarRequestValidationError( 

208 [ 

209 { 

210 "type": "invalid", 

211 "loc": ("body", "setup_intent_id"), 

212 "msg": "Invalid setup_intent_id.", 

213 "input": payment_method_confirm.setup_intent_id, 

214 } 

215 ] 

216 ) 

217 

218 return await self._save_payment_method( 

219 session, 

220 customer, 

221 setup_intent, 

222 set_default=payment_method_confirm.set_default, 

223 ) 

224 

225 async def _save_payment_method( 1a

226 self, 

227 session: AsyncSession, 

228 customer: Customer, 

229 setup_intent: stripe_lib.SetupIntent, 

230 set_default: bool, 

231 ) -> CustomerPaymentMethodCreateResponse: 

232 assert customer.stripe_customer_id is not None 

233 

234 if setup_intent.status == "requires_action": 

235 assert setup_intent.client_secret is not None 

236 return CustomerPaymentMethodCreateRequiresActionResponse( 

237 status="requires_action", 

238 client_secret=setup_intent.client_secret, 

239 ) 

240 

241 if set_default: 

242 assert setup_intent.payment_method is not None 

243 await stripe_service.update_customer( 

244 customer.stripe_customer_id, 

245 invoice_settings={ 

246 "default_payment_method": get_expandable_id( 

247 setup_intent.payment_method 

248 ) 

249 }, 

250 ) 

251 

252 payment_method = await payment_method_service.upsert_from_stripe( 

253 session, 

254 customer, 

255 cast(stripe_lib.PaymentMethod, setup_intent.payment_method), 

256 flush=True, 

257 ) 

258 if set_default: 

259 repository = CustomerRepository.from_session(session) 

260 customer = await repository.update( 

261 customer, update_dict={"default_payment_method": payment_method} 

262 ) 

263 

264 return CustomerPaymentMethodCreateSucceededResponse.model_validate( 

265 { 

266 "status": "succeeded", 

267 "payment_method": payment_method, 

268 } 

269 ) 

270 

271 async def delete_payment_method( 1a

272 self, session: AsyncSession, payment_method: PaymentMethod 

273 ) -> None: 

274 # Get the customer before deletion to trigger webhooks 

275 customer_repository = CustomerRepository.from_session(session) 

276 customer = await customer_repository.get_by_id(payment_method.customer_id) 

277 assert customer is not None 

278 

279 await payment_method_service.delete(session, payment_method) 

280 

281 # Trigger customer update webhooks 

282 # This ensures customer.updated and customer.state_changed events are sent 

283 await customer_repository.update(customer, flush=True) 

284 

285 

286customer = CustomerService() 1a