Coverage for polar/order/tasks.py: 31%
126 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import uuid 1a
3import stripe as stripe_lib 1a
4import structlog 1a
5from dramatiq import Retry 1a
6from sqlalchemy.orm import joinedload 1a
8from polar.exceptions import PolarTaskError 1a
9from polar.logging import Logger 1a
10from polar.models import Customer, Order 1a
11from polar.models.order import OrderBillingReasonInternal 1a
12from polar.payment_method.repository import PaymentMethodRepository 1a
13from polar.product.repository import ProductRepository 1a
14from polar.subscription.repository import SubscriptionRepository 1a
15from polar.transaction.service.balance import PaymentTransactionForChargeDoesNotExist 1a
16from polar.worker import ( 1a
17 AsyncSessionMaker,
18 CronTrigger,
19 TaskPriority,
20 actor,
21 can_retry,
22 enqueue_job,
23)
25from .repository import OrderRepository 1a
26from .service import CardPaymentFailed, NoPendingBillingEntries 1a
27from .service import order as order_service 1a
29log: Logger = structlog.get_logger() 1a
31MAX_RETRIES = 10 1a
34class OrderTaskError(PolarTaskError): ... 1a
37class SubscriptionDoesNotExist(OrderTaskError): 1a
38 def __init__(self, subscription_id: uuid.UUID) -> None: 1a
39 self.subscription_id = subscription_id
40 message = f"The subscription with id {subscription_id} does not exist."
41 super().__init__(message)
44class ProductDoesNotExist(OrderTaskError): 1a
45 def __init__(self, product_id: uuid.UUID) -> None: 1a
46 self.product_id = product_id
47 message = f"The product with id {product_id} does not exist."
48 super().__init__(message)
51class OrderDoesNotExist(OrderTaskError): 1a
52 def __init__(self, order_id: uuid.UUID) -> None: 1a
53 self.order_id = order_id
54 message = f"The order with id {order_id} does not exist."
55 super().__init__(message)
58class PaymentMethodDoesNotExist(OrderTaskError): 1a
59 def __init__(self, payment_method_id: uuid.UUID) -> None: 1a
60 self.payment_method_id = payment_method_id
61 message = f"The payment method with id {payment_method_id} does not exist."
62 super().__init__(message)
65@actor(actor_name="order.create_subscription_order", priority=TaskPriority.LOW) 1a
66async def create_subscription_order( 1a
67 subscription_id: uuid.UUID, order_reason: OrderBillingReasonInternal
68) -> None:
69 async with AsyncSessionMaker() as session:
70 repository = SubscriptionRepository.from_session(session)
71 subscription = await repository.get_by_id(
72 subscription_id, options=repository.get_eager_options()
73 )
74 if subscription is None:
75 raise SubscriptionDoesNotExist(subscription_id)
77 try:
78 await order_service.create_subscription_order(
79 session, subscription, order_reason
80 )
81 except NoPendingBillingEntries:
82 # Skip creating an order if there are no pending billing entries.
83 # Usually happens if the subscription is now canceled, and no usage-based billing is pending
84 pass
87@actor(actor_name="order.trigger_payment", priority=TaskPriority.LOW) 1a
88async def trigger_payment(order_id: uuid.UUID, payment_method_id: uuid.UUID) -> None: 1a
89 async with AsyncSessionMaker() as session:
90 repository = OrderRepository.from_session(session)
91 order = await repository.get_by_id(
92 order_id, options=repository.get_eager_options()
93 )
94 if order is None:
95 raise OrderDoesNotExist(order_id)
97 payment_method_repository = PaymentMethodRepository.from_session(session)
98 payment_method = await payment_method_repository.get_by_id_and_customer(
99 payment_method_id, order.customer_id
100 )
101 if payment_method is None:
102 log.info(
103 "Payment method not found, triggering dunning process",
104 order_id=order_id,
105 payment_method_id=payment_method_id,
106 )
107 await order_service.handle_payment_failure(session, order)
108 return
110 try:
111 await order_service.trigger_payment(session, order, payment_method)
112 except CardPaymentFailed:
113 # Card errors should not be retried - they will be handled by the dunning process
114 # Log the failure but don't retry the task
115 log.info(
116 "Card payment failed, not retrying - will be handled by dunning",
117 order_id=order_id,
118 )
119 return
120 except (
121 stripe_lib.APIConnectionError,
122 stripe_lib.APIError,
123 stripe_lib.RateLimitError,
124 ) as e:
125 # Network/availability errors should be retried
126 log.error(
127 "Stripe service error during payment trigger, retrying",
128 order_id=order_id,
129 error_type=type(e).__name__,
130 error_message=str(e),
131 )
132 if can_retry():
133 raise Retry() from e
134 else:
135 raise
138@actor(actor_name="order.balance", priority=TaskPriority.LOW) 1a
139async def create_order_balance(order_id: uuid.UUID, charge_id: str) -> None: 1a
140 async with AsyncSessionMaker() as session:
141 repository = OrderRepository.from_session(session)
142 order = await repository.get_by_id(
143 order_id,
144 options=(joinedload(Order.customer).joinedload(Customer.organization),),
145 )
146 if order is None:
147 raise OrderDoesNotExist(order_id)
149 try:
150 await order_service.create_order_balance(session, order, charge_id)
151 except PaymentTransactionForChargeDoesNotExist as e:
152 # Retry because Stripe webhooks order is not guaranteed,
153 # so we might not have been able to handle subscription.created
154 # or charge.succeeded yet!
155 if can_retry():
156 raise Retry() from e
157 # Raise the exception to be notified about it
158 else:
159 raise
162@actor(actor_name="order.update_product_benefits_grants", priority=TaskPriority.MEDIUM) 1a
163async def update_product_benefits_grants(product_id: uuid.UUID) -> None: 1a
164 async with AsyncSessionMaker() as session:
165 product_repository = ProductRepository.from_session(session)
166 product = await product_repository.get_by_id(product_id)
167 if product is None:
168 raise ProductDoesNotExist(product_id)
170 await order_service.update_product_benefits_grants(session, product)
173@actor(actor_name="order.confirmation_email", priority=TaskPriority.LOW) 1a
174async def order_confirmation_email(order_id: uuid.UUID) -> None: 1a
175 async with AsyncSessionMaker() as session:
176 repository = OrderRepository.from_session(session)
177 order = await repository.get_by_id(
178 order_id, options=repository.get_eager_options()
179 )
180 if order is None:
181 raise OrderDoesNotExist(order_id)
183 await order_service.send_confirmation_email(session, order)
186@actor(actor_name="order.invoice", priority=TaskPriority.LOW) 1a
187async def order_invoice(order_id: uuid.UUID) -> None: 1a
188 async with AsyncSessionMaker() as session:
189 repository = OrderRepository.from_session(session)
190 order = await repository.get_by_id(
191 order_id, options=repository.get_eager_options()
192 )
193 if order is None:
194 raise OrderDoesNotExist(order_id)
196 await order_service.generate_invoice(session, order)
199@actor( 1a
200 actor_name="order.process_dunning",
201 cron_trigger=CronTrigger.from_crontab("0 * * * *"),
202 priority=TaskPriority.MEDIUM,
203)
204async def process_dunning() -> None: 1a
205 """Process all orders that are due for dunning (payment retry)."""
206 async with AsyncSessionMaker() as session:
207 order_repository = OrderRepository.from_session(session)
208 due_orders = await order_repository.get_due_dunning_orders()
210 for order in due_orders:
211 enqueue_job("order.process_dunning_order", order.id)
214@actor(actor_name="order.process_dunning_order", priority=TaskPriority.MEDIUM) 1a
215async def process_dunning_order(order_id: uuid.UUID) -> None: 1a
216 """Process a single order due for dunning (payment retry)."""
217 async with AsyncSessionMaker() as session:
218 order_repository = OrderRepository.from_session(session)
219 order = await order_repository.get_by_id(
220 order_id, options=order_repository.get_eager_options()
221 )
222 if order is None:
223 raise OrderDoesNotExist(order_id)
225 await order_service.process_dunning_order(session, order)