Coverage for polar/integrations/stripe/tasks.py: 33%

269 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import functools 1a

2import uuid 1a

3from collections.abc import Awaitable, Callable 1a

4from typing import ParamSpec, cast 1a

5 

6import stripe as stripe_lib 1a

7import structlog 1a

8from dramatiq import Retry 1a

9 

10from polar.account.service import account as account_service 1a

11from polar.checkout.service import NotConfirmedCheckout 1a

12from polar.exceptions import PolarTaskError 1a

13from polar.external_event.service import external_event as external_event_service 1a

14from polar.integrations.stripe.schemas import PaymentIntentSuccessWebhook, ProductType 1a

15from polar.locker import Locker 1a

16from polar.logging import Logger 1a

17from polar.order.service import ( 1a

18 NotAnOrderInvoice, 

19 NotASubscriptionInvoice, 

20 OrderDoesNotExist, 

21) 

22from polar.order.service import ( 1a

23 SubscriptionDoesNotExist as OrderSubscriptionDoesNotExist, 

24) 

25from polar.order.service import order as order_service 1a

26from polar.payment.service import UnhandledPaymentIntent 1a

27from polar.payment.service import payment as payment_service 1a

28from polar.payment_method.service import payment_method as payment_method_service 1a

29from polar.payout.service import payout as payout_service 1a

30from polar.pledge.service import pledge as pledge_service 1a

31from polar.refund.service import refund as refund_service 1a

32from polar.subscription.service import SubscriptionDoesNotExist, SubscriptionLocked 1a

33from polar.subscription.service import subscription as subscription_service 1a

34from polar.transaction.service.dispute import ( 1a

35 DisputeClosed, 

36) 

37from polar.transaction.service.dispute import ( 1a

38 dispute_transaction as dispute_transaction_service, 

39) 

40from polar.user.service import user as user_service 1a

41from polar.worker import ( 1a

42 AsyncSessionMaker, 

43 RedisMiddleware, 

44 TaskPriority, 

45 actor, 

46 can_retry, 

47 get_retries, 

48) 

49 

50from . import payment 1a

51from .service import stripe as stripe_service 1a

52 

53log: Logger = structlog.get_logger() 1a

54 

55 

56Params = ParamSpec("Params") 1a

57 

58 

59def stripe_api_connection_error_retry[**Params, ReturnValue]( 1a

60 func: Callable[Params, Awaitable[ReturnValue]], 

61) -> Callable[Params, Awaitable[ReturnValue]]: 

62 @functools.wraps(func) 1a

63 async def wrapper(*args: Params.args, **kwargs: Params.kwargs) -> ReturnValue: 1a

64 try: 

65 return await func(*args, **kwargs) 

66 except stripe_lib.APIConnectionError as e: 

67 log.warning( 

68 "Retry after Stripe API connection error", 

69 e=str(e), 

70 job_try=get_retries(), 

71 ) 

72 raise Retry() from e 

73 

74 return wrapper 1a

75 

76 

77class StripeTaskError(PolarTaskError): ... 1a

78 

79 

80@actor(actor_name="stripe.webhook.account.updated", priority=TaskPriority.HIGH) 1a

81@stripe_api_connection_error_retry 1a

82async def account_updated(event_id: uuid.UUID) -> None: 1a

83 async with AsyncSessionMaker() as session: 

84 async with external_event_service.handle_stripe(session, event_id) as event: 

85 stripe_account = cast(stripe_lib.Account, event.stripe_data.data.object) 

86 await account_service.update_account_from_stripe( 

87 session, stripe_account=stripe_account 

88 ) 

89 

90 

91@actor(actor_name="stripe.webhook.payment_intent.succeeded", priority=TaskPriority.HIGH) 1a

92@stripe_api_connection_error_retry 1a

93async def payment_intent_succeeded(event_id: uuid.UUID) -> None: 1a

94 async with AsyncSessionMaker() as session: 

95 async with external_event_service.handle_stripe(session, event_id) as event: 

96 payment_intent = cast( 

97 stripe_lib.PaymentIntent, event.stripe_data.data.object 

98 ) 

99 payload = PaymentIntentSuccessWebhook.model_validate(payment_intent) 

100 

101 # payment for pay_on_completion 

102 # metadata is on the invoice, not the payment_intent 

103 if payload.invoice: 

104 invoice = await stripe_service.get_invoice(payload.invoice) 

105 if ( 

106 invoice.metadata 

107 and invoice.metadata.get("type") == ProductType.pledge 

108 ): 

109 await pledge_service.handle_payment_intent_success( 

110 session=session, 

111 payload=payload, 

112 ) 

113 return 

114 

115 # Handle retry payments - save credit card and update subscription payment method 

116 if payment_intent.metadata and payment_intent.metadata.get("order_id"): 

117 order = await payment.resolve_order(session, payment_intent, None) 

118 if order is not None: 

119 payment_method = await payment_method_service.upsert_from_stripe_payment_intent_for_order( 

120 session, payment_intent, order 

121 ) 

122 

123 if payment_method and order.subscription: 

124 await subscription_service.update_payment_method_from_retry( 

125 session, order.subscription, payment_method 

126 ) 

127 return 

128 

129 

130@actor( 1a

131 actor_name="stripe.webhook.payment_intent.payment_failed", 

132 priority=TaskPriority.HIGH, 

133) 

134@stripe_api_connection_error_retry 1a

135async def payment_intent_payment_failed(event_id: uuid.UUID) -> None: 1a

136 async with AsyncSessionMaker() as session: 

137 async with external_event_service.handle_stripe(session, event_id) as event: 

138 payment_intent = cast( 

139 stripe_lib.PaymentIntent, event.stripe_data.data.object 

140 ) 

141 try: 

142 await payment.handle_failure(session, payment_intent) 

143 

144 except UnhandledPaymentIntent: 

145 pass 

146 except payment.OrderDoesNotExist as e: 

147 # Retry because we may not have been able to handle the order yet 

148 if can_retry(): 

149 raise Retry() from e 

150 # Raise the exception to be notified about it 

151 else: 

152 raise 

153 

154 

155@actor(actor_name="stripe.webhook.setup_intent.succeeded", priority=TaskPriority.HIGH) 1a

156@stripe_api_connection_error_retry 1a

157async def setup_intent_succeeded(event_id: uuid.UUID) -> None: 1a

158 async with AsyncSessionMaker() as session: 

159 async with external_event_service.handle_stripe(session, event_id) as event: 

160 setup_intent = cast(stripe_lib.SetupIntent, event.stripe_data.data.object) 

161 try: 

162 await payment.handle_success(session, setup_intent) 

163 except (NotConfirmedCheckout, payment.OrderDoesNotExist) as e: 

164 # Retry because we've seen in the wild a Stripe webhook coming 

165 # *before* we updated the Checkout Session status in the database! 

166 if can_retry(): 

167 raise Retry() from e 

168 # Raise the exception to be notified about it 

169 else: 

170 raise 

171 except payment.OutdatedCheckoutIntent: 

172 # Ignore outdated setup intents 

173 # Expected flow after a a trial already redeemed error 

174 pass 

175 

176 

177@actor( 1a

178 actor_name="stripe.webhook.setup_intent.setup_failed", priority=TaskPriority.HIGH 

179) 

180@stripe_api_connection_error_retry 1a

181async def setup_intent_setup_failed(event_id: uuid.UUID) -> None: 1a

182 async with AsyncSessionMaker() as session: 

183 async with external_event_service.handle_stripe(session, event_id) as event: 

184 setup_intent = cast(stripe_lib.SetupIntent, event.stripe_data.data.object) 

185 try: 

186 await payment.handle_failure(session, setup_intent) 

187 except payment.OrderDoesNotExist as e: 

188 # Retry because we may not have been able to handle the order yet 

189 if can_retry(): 

190 raise Retry() from e 

191 # Raise the exception to be notified about it 

192 else: 

193 raise 

194 

195 

196@actor(actor_name="stripe.webhook.charge.pending", priority=TaskPriority.HIGH) 1a

197async def charge_pending(event_id: uuid.UUID) -> None: 1a

198 async with AsyncSessionMaker() as session: 

199 async with external_event_service.handle_stripe(session, event_id) as event: 

200 charge = cast(stripe_lib.Charge, event.stripe_data.data.object) 

201 checkout = await payment.resolve_checkout(session, charge) 

202 try: 

203 order = await payment.resolve_order(session, charge, checkout) 

204 except payment.OrderDoesNotExist as e: 

205 # Retry because we may not have been able to handle the order yet 

206 if can_retry(): 

207 raise Retry() from e 

208 # Raise the exception to be notified about it 

209 else: 

210 raise 

211 await payment_service.upsert_from_stripe_charge( 

212 session, charge, checkout, None, order 

213 ) 

214 

215 

216@actor(actor_name="stripe.webhook.charge.failed", priority=TaskPriority.HIGH) 1a

217async def charge_failed(event_id: uuid.UUID) -> None: 1a

218 async with AsyncSessionMaker() as session: 

219 async with external_event_service.handle_stripe(session, event_id) as event: 

220 charge = cast(stripe_lib.Charge, event.stripe_data.data.object) 

221 try: 

222 await payment.handle_failure(session, charge) 

223 except payment.OrderDoesNotExist as e: 

224 # Retry because we may not have been able to handle the order yet 

225 if can_retry(): 

226 raise Retry() from e 

227 # Raise the exception to be notified about it 

228 else: 

229 raise 

230 

231 

232@actor(actor_name="stripe.webhook.charge.succeeded", priority=TaskPriority.HIGH) 1a

233@stripe_api_connection_error_retry 1a

234async def charge_succeeded(event_id: uuid.UUID) -> None: 1a

235 async with AsyncSessionMaker() as session: 

236 async with external_event_service.handle_stripe(session, event_id) as event: 

237 charge = cast(stripe_lib.Charge, event.stripe_data.data.object) 

238 try: 

239 await payment.handle_success(session, charge) 

240 except (NotConfirmedCheckout, payment.OrderDoesNotExist) as e: 

241 # Retry because we've seen in the wild a Stripe webhook coming 

242 # *before* we updated the Checkout Session status in the database! 

243 if can_retry(): 

244 raise Retry() from e 

245 # Raise the exception to be notified about it 

246 else: 

247 raise 

248 

249 

250@actor(actor_name="stripe.webhook.refund.created", priority=TaskPriority.HIGH) 1a

251@stripe_api_connection_error_retry 1a

252async def refund_created(event_id: uuid.UUID) -> None: 1a

253 async with AsyncSessionMaker() as session: 

254 async with external_event_service.handle_stripe(session, event_id) as event: 

255 refund = cast(stripe_lib.Refund, event.stripe_data.data.object) 

256 log.info( 

257 "stripe.webhook.refund.created", 

258 refund_id=refund.id, 

259 charge_id=refund.charge, 

260 payment_intent=refund.payment_intent, 

261 ) 

262 await refund_service.create_from_stripe(session, stripe_refund=refund) 

263 

264 

265@actor(actor_name="stripe.webhook.refund.updated", priority=TaskPriority.HIGH) 1a

266@stripe_api_connection_error_retry 1a

267async def refund_updated(event_id: uuid.UUID) -> None: 1a

268 async with AsyncSessionMaker() as session: 

269 async with external_event_service.handle_stripe(session, event_id) as event: 

270 refund = cast(stripe_lib.Refund, event.stripe_data.data.object) 

271 log.info( 

272 "stripe.webhook.refund.updated", 

273 refund_id=refund.id, 

274 charge_id=refund.charge, 

275 payment_intent=refund.payment_intent, 

276 ) 

277 await refund_service.upsert_from_stripe(session, stripe_refund=refund) 

278 

279 

280@actor(actor_name="stripe.webhook.refund.failed", priority=TaskPriority.HIGH) 1a

281@stripe_api_connection_error_retry 1a

282async def refund_failed(event_id: uuid.UUID) -> None: 1a

283 async with AsyncSessionMaker() as session: 

284 async with external_event_service.handle_stripe(session, event_id) as event: 

285 refund = cast(stripe_lib.Refund, event.stripe_data.data.object) 

286 log.info( 

287 "stripe.webhook.refund.failed", 

288 refund_id=refund.id, 

289 charge_id=refund.charge, 

290 payment_intent=refund.payment_intent, 

291 ) 

292 await refund_service.upsert_from_stripe(session, stripe_refund=refund) 

293 

294 

295@actor(actor_name="stripe.webhook.charge.dispute.closed", priority=TaskPriority.HIGH) 1a

296@stripe_api_connection_error_retry 1a

297async def charge_dispute_closed(event_id: uuid.UUID) -> None: 1a

298 async with AsyncSessionMaker() as session: 

299 async with external_event_service.handle_stripe(session, event_id) as event: 

300 dispute = cast(stripe_lib.Dispute, event.stripe_data.data.object) 

301 

302 try: 

303 await dispute_transaction_service.create_dispute( 

304 session, dispute=dispute 

305 ) 

306 except DisputeClosed: 

307 # The dispute was closed without any action, do nothing 

308 pass 

309 

310 

311@actor( 1a

312 actor_name="stripe.webhook.customer.subscription.updated", 

313 priority=TaskPriority.HIGH, 

314) 

315@stripe_api_connection_error_retry 1a

316async def customer_subscription_updated(event_id: uuid.UUID) -> None: 1a

317 async with AsyncSessionMaker() as session: 

318 async with external_event_service.handle_stripe(session, event_id) as event: 

319 subscription = cast(stripe_lib.Subscription, event.stripe_data.data.object) 

320 redis = RedisMiddleware.get() 

321 locker = Locker(redis) 

322 try: 

323 await subscription_service.update_from_stripe( 

324 session, locker, stripe_subscription=subscription 

325 ) 

326 except (SubscriptionDoesNotExist, SubscriptionLocked) as e: 

327 log.warning(e.message, event_id=event.id) 

328 # Retry because Stripe webhooks order is not guaranteed, 

329 # so we might not have been able to handle subscription.created yet! 

330 if can_retry(): 

331 raise Retry() from e 

332 # Raise the exception to be notified about it 

333 else: 

334 raise 

335 

336 

337@actor( 1a

338 actor_name="stripe.webhook.customer.subscription.deleted", 

339 priority=TaskPriority.HIGH, 

340) 

341@stripe_api_connection_error_retry 1a

342async def customer_subscription_deleted(event_id: uuid.UUID) -> None: 1a

343 async with AsyncSessionMaker() as session: 

344 async with external_event_service.handle_stripe(session, event_id) as event: 

345 subscription = cast(stripe_lib.Subscription, event.stripe_data.data.object) 

346 redis = RedisMiddleware.get() 

347 locker = Locker(redis) 

348 try: 

349 await subscription_service.update_from_stripe( 

350 session, locker, stripe_subscription=subscription 

351 ) 

352 except (SubscriptionDoesNotExist, SubscriptionLocked) as e: 

353 log.warning(e.message, event_id=event.id) 

354 # Retry because Stripe webhooks order is not guaranteed, 

355 # so we might not have been able to handle subscription.created yet! 

356 if can_retry(): 

357 raise Retry() from e 

358 # Raise the exception to be notified about it 

359 else: 

360 raise 

361 

362 

363@actor(actor_name="stripe.webhook.invoice.created", priority=TaskPriority.HIGH) 1a

364@stripe_api_connection_error_retry 1a

365async def invoice_created(event_id: uuid.UUID) -> None: 1a

366 async with AsyncSessionMaker() as session: 

367 async with external_event_service.handle_stripe(session, event_id) as event: 

368 invoice = cast(stripe_lib.Invoice, event.stripe_data.data.object) 

369 try: 

370 await order_service.create_order_from_stripe(session, invoice=invoice) 

371 except OrderSubscriptionDoesNotExist as e: 

372 log.warning(e.message, event_id=event.id) 

373 # Retry because Stripe webhooks order is not guaranteed, 

374 # so we might not have been able to handle subscription.created yet! 

375 if can_retry(): 

376 raise Retry() from e 

377 # Raise the exception to be notified about it 

378 else: 

379 raise 

380 except (NotAnOrderInvoice, NotASubscriptionInvoice): 

381 # Ignore invoices that are not for products (pledges) and subscriptions 

382 return 

383 

384 

385@actor(actor_name="stripe.webhook.invoice.paid", priority=TaskPriority.HIGH) 1a

386@stripe_api_connection_error_retry 1a

387async def invoice_paid(event_id: uuid.UUID) -> None: 1a

388 async with AsyncSessionMaker() as session: 

389 async with external_event_service.handle_stripe(session, event_id) as event: 

390 invoice = cast(stripe_lib.Invoice, event.stripe_data.data.object) 

391 try: 

392 await order_service.update_order_from_stripe(session, invoice=invoice) 

393 except OrderDoesNotExist as e: 

394 log.warning(e.message, event_id=event.id) 

395 # Retry because Stripe webhooks order is not guaranteed, 

396 # so we might not have been able to handle invoice.created yet! 

397 if can_retry(): 

398 raise Retry() from e 

399 # Raise the exception to be notified about it 

400 else: 

401 raise 

402 

403 

404@actor(actor_name="stripe.webhook.payout.updated", priority=TaskPriority.LOW) 1a

405@stripe_api_connection_error_retry 1a

406async def payout_updated(event_id: uuid.UUID) -> None: 1a

407 async with AsyncSessionMaker() as session: 

408 async with external_event_service.handle_stripe(session, event_id) as event: 

409 payout = cast(stripe_lib.Payout, event.stripe_data.data.object) 

410 await payout_service.update_from_stripe(session, payout) 

411 

412 

413@actor(actor_name="stripe.webhook.payout.paid", priority=TaskPriority.LOW) 1a

414@stripe_api_connection_error_retry 1a

415async def payout_paid(event_id: uuid.UUID) -> None: 1a

416 async with AsyncSessionMaker() as session: 

417 async with external_event_service.handle_stripe(session, event_id) as event: 

418 payout = cast(stripe_lib.Payout, event.stripe_data.data.object) 

419 await payout_service.update_from_stripe(session, payout) 

420 

421 

422@actor( 1a

423 actor_name="stripe.webhook.identity.verification_session.verified", 

424 priority=TaskPriority.HIGH, 

425) 

426async def identity_verification_session_verified(event_id: uuid.UUID) -> None: 1a

427 async with AsyncSessionMaker() as session: 

428 async with external_event_service.handle_stripe(session, event_id) as event: 

429 verification_session = cast( 

430 stripe_lib.identity.VerificationSession, event.stripe_data.data.object 

431 ) 

432 await user_service.identity_verification_verified( 

433 session, verification_session 

434 ) 

435 

436 

437@actor( 1a

438 actor_name="stripe.webhook.identity.verification_session.processing", 

439 priority=TaskPriority.HIGH, 

440) 

441async def identity_verification_session_processing(event_id: uuid.UUID) -> None: 1a

442 async with AsyncSessionMaker() as session: 

443 async with external_event_service.handle_stripe(session, event_id) as event: 

444 verification_session = cast( 

445 stripe_lib.identity.VerificationSession, event.stripe_data.data.object 

446 ) 

447 await user_service.identity_verification_pending( 

448 session, verification_session 

449 ) 

450 

451 

452@actor( 1a

453 actor_name="stripe.webhook.identity.verification_session.requires_input", 

454 priority=TaskPriority.HIGH, 

455) 

456async def identity_verification_session_requires_input(event_id: uuid.UUID) -> None: 1a

457 async with AsyncSessionMaker() as session: 

458 async with external_event_service.handle_stripe(session, event_id) as event: 

459 verification_session = cast( 

460 stripe_lib.identity.VerificationSession, event.stripe_data.data.object 

461 ) 

462 await user_service.identity_verification_failed( 

463 session, verification_session 

464 )