Coverage for polar/integrations/stripe/tasks.py: 33%
269 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import functools 1a
2import uuid 1a
3from collections.abc import Awaitable, Callable 1a
4from typing import ParamSpec, cast 1a
6import stripe as stripe_lib 1a
7import structlog 1a
8from dramatiq import Retry 1a
10from polar.account.service import account as account_service 1a
11from polar.checkout.service import NotConfirmedCheckout 1a
12from polar.exceptions import PolarTaskError 1a
13from polar.external_event.service import external_event as external_event_service 1a
14from polar.integrations.stripe.schemas import PaymentIntentSuccessWebhook, ProductType 1a
15from polar.locker import Locker 1a
16from polar.logging import Logger 1a
17from polar.order.service import ( 1a
18 NotAnOrderInvoice,
19 NotASubscriptionInvoice,
20 OrderDoesNotExist,
21)
22from polar.order.service import ( 1a
23 SubscriptionDoesNotExist as OrderSubscriptionDoesNotExist,
24)
25from polar.order.service import order as order_service 1a
26from polar.payment.service import UnhandledPaymentIntent 1a
27from polar.payment.service import payment as payment_service 1a
28from polar.payment_method.service import payment_method as payment_method_service 1a
29from polar.payout.service import payout as payout_service 1a
30from polar.pledge.service import pledge as pledge_service 1a
31from polar.refund.service import refund as refund_service 1a
32from polar.subscription.service import SubscriptionDoesNotExist, SubscriptionLocked 1a
33from polar.subscription.service import subscription as subscription_service 1a
34from polar.transaction.service.dispute import ( 1a
35 DisputeClosed,
36)
37from polar.transaction.service.dispute import ( 1a
38 dispute_transaction as dispute_transaction_service,
39)
40from polar.user.service import user as user_service 1a
41from polar.worker import ( 1a
42 AsyncSessionMaker,
43 RedisMiddleware,
44 TaskPriority,
45 actor,
46 can_retry,
47 get_retries,
48)
50from . import payment 1a
51from .service import stripe as stripe_service 1a
53log: Logger = structlog.get_logger() 1a
56Params = ParamSpec("Params") 1a
59def stripe_api_connection_error_retry[**Params, ReturnValue]( 1a
60 func: Callable[Params, Awaitable[ReturnValue]],
61) -> Callable[Params, Awaitable[ReturnValue]]:
62 @functools.wraps(func) 1a
63 async def wrapper(*args: Params.args, **kwargs: Params.kwargs) -> ReturnValue: 1a
64 try:
65 return await func(*args, **kwargs)
66 except stripe_lib.APIConnectionError as e:
67 log.warning(
68 "Retry after Stripe API connection error",
69 e=str(e),
70 job_try=get_retries(),
71 )
72 raise Retry() from e
74 return wrapper 1a
77class StripeTaskError(PolarTaskError): ... 1a
80@actor(actor_name="stripe.webhook.account.updated", priority=TaskPriority.HIGH) 1a
81@stripe_api_connection_error_retry 1a
82async def account_updated(event_id: uuid.UUID) -> None: 1a
83 async with AsyncSessionMaker() as session:
84 async with external_event_service.handle_stripe(session, event_id) as event:
85 stripe_account = cast(stripe_lib.Account, event.stripe_data.data.object)
86 await account_service.update_account_from_stripe(
87 session, stripe_account=stripe_account
88 )
91@actor(actor_name="stripe.webhook.payment_intent.succeeded", priority=TaskPriority.HIGH) 1a
92@stripe_api_connection_error_retry 1a
93async def payment_intent_succeeded(event_id: uuid.UUID) -> None: 1a
94 async with AsyncSessionMaker() as session:
95 async with external_event_service.handle_stripe(session, event_id) as event:
96 payment_intent = cast(
97 stripe_lib.PaymentIntent, event.stripe_data.data.object
98 )
99 payload = PaymentIntentSuccessWebhook.model_validate(payment_intent)
101 # payment for pay_on_completion
102 # metadata is on the invoice, not the payment_intent
103 if payload.invoice:
104 invoice = await stripe_service.get_invoice(payload.invoice)
105 if (
106 invoice.metadata
107 and invoice.metadata.get("type") == ProductType.pledge
108 ):
109 await pledge_service.handle_payment_intent_success(
110 session=session,
111 payload=payload,
112 )
113 return
115 # Handle retry payments - save credit card and update subscription payment method
116 if payment_intent.metadata and payment_intent.metadata.get("order_id"):
117 order = await payment.resolve_order(session, payment_intent, None)
118 if order is not None:
119 payment_method = await payment_method_service.upsert_from_stripe_payment_intent_for_order(
120 session, payment_intent, order
121 )
123 if payment_method and order.subscription:
124 await subscription_service.update_payment_method_from_retry(
125 session, order.subscription, payment_method
126 )
127 return
130@actor( 1a
131 actor_name="stripe.webhook.payment_intent.payment_failed",
132 priority=TaskPriority.HIGH,
133)
134@stripe_api_connection_error_retry 1a
135async def payment_intent_payment_failed(event_id: uuid.UUID) -> None: 1a
136 async with AsyncSessionMaker() as session:
137 async with external_event_service.handle_stripe(session, event_id) as event:
138 payment_intent = cast(
139 stripe_lib.PaymentIntent, event.stripe_data.data.object
140 )
141 try:
142 await payment.handle_failure(session, payment_intent)
144 except UnhandledPaymentIntent:
145 pass
146 except payment.OrderDoesNotExist as e:
147 # Retry because we may not have been able to handle the order yet
148 if can_retry():
149 raise Retry() from e
150 # Raise the exception to be notified about it
151 else:
152 raise
155@actor(actor_name="stripe.webhook.setup_intent.succeeded", priority=TaskPriority.HIGH) 1a
156@stripe_api_connection_error_retry 1a
157async def setup_intent_succeeded(event_id: uuid.UUID) -> None: 1a
158 async with AsyncSessionMaker() as session:
159 async with external_event_service.handle_stripe(session, event_id) as event:
160 setup_intent = cast(stripe_lib.SetupIntent, event.stripe_data.data.object)
161 try:
162 await payment.handle_success(session, setup_intent)
163 except (NotConfirmedCheckout, payment.OrderDoesNotExist) as e:
164 # Retry because we've seen in the wild a Stripe webhook coming
165 # *before* we updated the Checkout Session status in the database!
166 if can_retry():
167 raise Retry() from e
168 # Raise the exception to be notified about it
169 else:
170 raise
171 except payment.OutdatedCheckoutIntent:
172 # Ignore outdated setup intents
173 # Expected flow after a a trial already redeemed error
174 pass
177@actor( 1a
178 actor_name="stripe.webhook.setup_intent.setup_failed", priority=TaskPriority.HIGH
179)
180@stripe_api_connection_error_retry 1a
181async def setup_intent_setup_failed(event_id: uuid.UUID) -> None: 1a
182 async with AsyncSessionMaker() as session:
183 async with external_event_service.handle_stripe(session, event_id) as event:
184 setup_intent = cast(stripe_lib.SetupIntent, event.stripe_data.data.object)
185 try:
186 await payment.handle_failure(session, setup_intent)
187 except payment.OrderDoesNotExist as e:
188 # Retry because we may not have been able to handle the order yet
189 if can_retry():
190 raise Retry() from e
191 # Raise the exception to be notified about it
192 else:
193 raise
196@actor(actor_name="stripe.webhook.charge.pending", priority=TaskPriority.HIGH) 1a
197async def charge_pending(event_id: uuid.UUID) -> None: 1a
198 async with AsyncSessionMaker() as session:
199 async with external_event_service.handle_stripe(session, event_id) as event:
200 charge = cast(stripe_lib.Charge, event.stripe_data.data.object)
201 checkout = await payment.resolve_checkout(session, charge)
202 try:
203 order = await payment.resolve_order(session, charge, checkout)
204 except payment.OrderDoesNotExist as e:
205 # Retry because we may not have been able to handle the order yet
206 if can_retry():
207 raise Retry() from e
208 # Raise the exception to be notified about it
209 else:
210 raise
211 await payment_service.upsert_from_stripe_charge(
212 session, charge, checkout, None, order
213 )
216@actor(actor_name="stripe.webhook.charge.failed", priority=TaskPriority.HIGH) 1a
217async def charge_failed(event_id: uuid.UUID) -> None: 1a
218 async with AsyncSessionMaker() as session:
219 async with external_event_service.handle_stripe(session, event_id) as event:
220 charge = cast(stripe_lib.Charge, event.stripe_data.data.object)
221 try:
222 await payment.handle_failure(session, charge)
223 except payment.OrderDoesNotExist as e:
224 # Retry because we may not have been able to handle the order yet
225 if can_retry():
226 raise Retry() from e
227 # Raise the exception to be notified about it
228 else:
229 raise
232@actor(actor_name="stripe.webhook.charge.succeeded", priority=TaskPriority.HIGH) 1a
233@stripe_api_connection_error_retry 1a
234async def charge_succeeded(event_id: uuid.UUID) -> None: 1a
235 async with AsyncSessionMaker() as session:
236 async with external_event_service.handle_stripe(session, event_id) as event:
237 charge = cast(stripe_lib.Charge, event.stripe_data.data.object)
238 try:
239 await payment.handle_success(session, charge)
240 except (NotConfirmedCheckout, payment.OrderDoesNotExist) as e:
241 # Retry because we've seen in the wild a Stripe webhook coming
242 # *before* we updated the Checkout Session status in the database!
243 if can_retry():
244 raise Retry() from e
245 # Raise the exception to be notified about it
246 else:
247 raise
250@actor(actor_name="stripe.webhook.refund.created", priority=TaskPriority.HIGH) 1a
251@stripe_api_connection_error_retry 1a
252async def refund_created(event_id: uuid.UUID) -> None: 1a
253 async with AsyncSessionMaker() as session:
254 async with external_event_service.handle_stripe(session, event_id) as event:
255 refund = cast(stripe_lib.Refund, event.stripe_data.data.object)
256 log.info(
257 "stripe.webhook.refund.created",
258 refund_id=refund.id,
259 charge_id=refund.charge,
260 payment_intent=refund.payment_intent,
261 )
262 await refund_service.create_from_stripe(session, stripe_refund=refund)
265@actor(actor_name="stripe.webhook.refund.updated", priority=TaskPriority.HIGH) 1a
266@stripe_api_connection_error_retry 1a
267async def refund_updated(event_id: uuid.UUID) -> None: 1a
268 async with AsyncSessionMaker() as session:
269 async with external_event_service.handle_stripe(session, event_id) as event:
270 refund = cast(stripe_lib.Refund, event.stripe_data.data.object)
271 log.info(
272 "stripe.webhook.refund.updated",
273 refund_id=refund.id,
274 charge_id=refund.charge,
275 payment_intent=refund.payment_intent,
276 )
277 await refund_service.upsert_from_stripe(session, stripe_refund=refund)
280@actor(actor_name="stripe.webhook.refund.failed", priority=TaskPriority.HIGH) 1a
281@stripe_api_connection_error_retry 1a
282async def refund_failed(event_id: uuid.UUID) -> None: 1a
283 async with AsyncSessionMaker() as session:
284 async with external_event_service.handle_stripe(session, event_id) as event:
285 refund = cast(stripe_lib.Refund, event.stripe_data.data.object)
286 log.info(
287 "stripe.webhook.refund.failed",
288 refund_id=refund.id,
289 charge_id=refund.charge,
290 payment_intent=refund.payment_intent,
291 )
292 await refund_service.upsert_from_stripe(session, stripe_refund=refund)
295@actor(actor_name="stripe.webhook.charge.dispute.closed", priority=TaskPriority.HIGH) 1a
296@stripe_api_connection_error_retry 1a
297async def charge_dispute_closed(event_id: uuid.UUID) -> None: 1a
298 async with AsyncSessionMaker() as session:
299 async with external_event_service.handle_stripe(session, event_id) as event:
300 dispute = cast(stripe_lib.Dispute, event.stripe_data.data.object)
302 try:
303 await dispute_transaction_service.create_dispute(
304 session, dispute=dispute
305 )
306 except DisputeClosed:
307 # The dispute was closed without any action, do nothing
308 pass
311@actor( 1a
312 actor_name="stripe.webhook.customer.subscription.updated",
313 priority=TaskPriority.HIGH,
314)
315@stripe_api_connection_error_retry 1a
316async def customer_subscription_updated(event_id: uuid.UUID) -> None: 1a
317 async with AsyncSessionMaker() as session:
318 async with external_event_service.handle_stripe(session, event_id) as event:
319 subscription = cast(stripe_lib.Subscription, event.stripe_data.data.object)
320 redis = RedisMiddleware.get()
321 locker = Locker(redis)
322 try:
323 await subscription_service.update_from_stripe(
324 session, locker, stripe_subscription=subscription
325 )
326 except (SubscriptionDoesNotExist, SubscriptionLocked) as e:
327 log.warning(e.message, event_id=event.id)
328 # Retry because Stripe webhooks order is not guaranteed,
329 # so we might not have been able to handle subscription.created yet!
330 if can_retry():
331 raise Retry() from e
332 # Raise the exception to be notified about it
333 else:
334 raise
337@actor( 1a
338 actor_name="stripe.webhook.customer.subscription.deleted",
339 priority=TaskPriority.HIGH,
340)
341@stripe_api_connection_error_retry 1a
342async def customer_subscription_deleted(event_id: uuid.UUID) -> None: 1a
343 async with AsyncSessionMaker() as session:
344 async with external_event_service.handle_stripe(session, event_id) as event:
345 subscription = cast(stripe_lib.Subscription, event.stripe_data.data.object)
346 redis = RedisMiddleware.get()
347 locker = Locker(redis)
348 try:
349 await subscription_service.update_from_stripe(
350 session, locker, stripe_subscription=subscription
351 )
352 except (SubscriptionDoesNotExist, SubscriptionLocked) as e:
353 log.warning(e.message, event_id=event.id)
354 # Retry because Stripe webhooks order is not guaranteed,
355 # so we might not have been able to handle subscription.created yet!
356 if can_retry():
357 raise Retry() from e
358 # Raise the exception to be notified about it
359 else:
360 raise
363@actor(actor_name="stripe.webhook.invoice.created", priority=TaskPriority.HIGH) 1a
364@stripe_api_connection_error_retry 1a
365async def invoice_created(event_id: uuid.UUID) -> None: 1a
366 async with AsyncSessionMaker() as session:
367 async with external_event_service.handle_stripe(session, event_id) as event:
368 invoice = cast(stripe_lib.Invoice, event.stripe_data.data.object)
369 try:
370 await order_service.create_order_from_stripe(session, invoice=invoice)
371 except OrderSubscriptionDoesNotExist as e:
372 log.warning(e.message, event_id=event.id)
373 # Retry because Stripe webhooks order is not guaranteed,
374 # so we might not have been able to handle subscription.created yet!
375 if can_retry():
376 raise Retry() from e
377 # Raise the exception to be notified about it
378 else:
379 raise
380 except (NotAnOrderInvoice, NotASubscriptionInvoice):
381 # Ignore invoices that are not for products (pledges) and subscriptions
382 return
385@actor(actor_name="stripe.webhook.invoice.paid", priority=TaskPriority.HIGH) 1a
386@stripe_api_connection_error_retry 1a
387async def invoice_paid(event_id: uuid.UUID) -> None: 1a
388 async with AsyncSessionMaker() as session:
389 async with external_event_service.handle_stripe(session, event_id) as event:
390 invoice = cast(stripe_lib.Invoice, event.stripe_data.data.object)
391 try:
392 await order_service.update_order_from_stripe(session, invoice=invoice)
393 except OrderDoesNotExist as e:
394 log.warning(e.message, event_id=event.id)
395 # Retry because Stripe webhooks order is not guaranteed,
396 # so we might not have been able to handle invoice.created yet!
397 if can_retry():
398 raise Retry() from e
399 # Raise the exception to be notified about it
400 else:
401 raise
404@actor(actor_name="stripe.webhook.payout.updated", priority=TaskPriority.LOW) 1a
405@stripe_api_connection_error_retry 1a
406async def payout_updated(event_id: uuid.UUID) -> None: 1a
407 async with AsyncSessionMaker() as session:
408 async with external_event_service.handle_stripe(session, event_id) as event:
409 payout = cast(stripe_lib.Payout, event.stripe_data.data.object)
410 await payout_service.update_from_stripe(session, payout)
413@actor(actor_name="stripe.webhook.payout.paid", priority=TaskPriority.LOW) 1a
414@stripe_api_connection_error_retry 1a
415async def payout_paid(event_id: uuid.UUID) -> None: 1a
416 async with AsyncSessionMaker() as session:
417 async with external_event_service.handle_stripe(session, event_id) as event:
418 payout = cast(stripe_lib.Payout, event.stripe_data.data.object)
419 await payout_service.update_from_stripe(session, payout)
422@actor( 1a
423 actor_name="stripe.webhook.identity.verification_session.verified",
424 priority=TaskPriority.HIGH,
425)
426async def identity_verification_session_verified(event_id: uuid.UUID) -> None: 1a
427 async with AsyncSessionMaker() as session:
428 async with external_event_service.handle_stripe(session, event_id) as event:
429 verification_session = cast(
430 stripe_lib.identity.VerificationSession, event.stripe_data.data.object
431 )
432 await user_service.identity_verification_verified(
433 session, verification_session
434 )
437@actor( 1a
438 actor_name="stripe.webhook.identity.verification_session.processing",
439 priority=TaskPriority.HIGH,
440)
441async def identity_verification_session_processing(event_id: uuid.UUID) -> None: 1a
442 async with AsyncSessionMaker() as session:
443 async with external_event_service.handle_stripe(session, event_id) as event:
444 verification_session = cast(
445 stripe_lib.identity.VerificationSession, event.stripe_data.data.object
446 )
447 await user_service.identity_verification_pending(
448 session, verification_session
449 )
452@actor( 1a
453 actor_name="stripe.webhook.identity.verification_session.requires_input",
454 priority=TaskPriority.HIGH,
455)
456async def identity_verification_session_requires_input(event_id: uuid.UUID) -> None: 1a
457 async with AsyncSessionMaker() as session:
458 async with external_event_service.handle_stripe(session, event_id) as event:
459 verification_session = cast(
460 stripe_lib.identity.VerificationSession, event.stripe_data.data.object
461 )
462 await user_service.identity_verification_failed(
463 session, verification_session
464 )