Coverage for polar/order/service.py: 14%
812 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import uuid 1a
2from collections.abc import AsyncIterator, Sequence 1a
3from contextlib import asynccontextmanager 1a
4from datetime import UTC, datetime 1a
5from typing import Any, Literal 1a
6from urllib.parse import urlencode 1a
8import stripe as stripe_lib 1a
9import structlog 1a
10from sqlalchemy import select 1a
11from sqlalchemy.orm import contains_eager, joinedload 1a
13from polar.account.repository import AccountRepository 1a
14from polar.auth.models import AuthSubject 1a
15from polar.billing_entry.service import billing_entry as billing_entry_service 1a
16from polar.checkout.eventstream import CheckoutEvent, publish_checkout_event 1a
17from polar.checkout.guard import has_product_checkout 1a
18from polar.checkout.repository import CheckoutRepository 1a
19from polar.config import settings 1a
20from polar.customer.repository import CustomerRepository 1a
21from polar.customer_portal.schemas.order import ( 1a
22 CustomerOrderPaymentConfirmation,
23 CustomerOrderUpdate,
24)
25from polar.customer_session.service import customer_session as customer_session_service 1a
26from polar.discount.service import discount as discount_service 1a
27from polar.email.react import render_email_template 1a
28from polar.email.schemas import ( 1a
29 EmailAdapter,
30)
31from polar.email.sender import Attachment, enqueue_email 1a
32from polar.enums import PaymentProcessor 1a
33from polar.event.service import event as event_service 1a
34from polar.event.system import OrderPaidMetadata, SystemEvent, build_system_event 1a
35from polar.eventstream.service import publish as eventstream_publish 1a
36from polar.exceptions import PolarError 1a
37from polar.held_balance.service import held_balance as held_balance_service 1a
38from polar.integrations.stripe.schemas import ProductType 1a
39from polar.integrations.stripe.service import stripe as stripe_service 1a
40from polar.integrations.stripe.utils import get_expandable_id 1a
41from polar.invoice.service import invoice as invoice_service 1a
42from polar.kit.address import Address, AddressInput 1a
43from polar.kit.db.postgres import AsyncReadSession, AsyncSession 1a
44from polar.kit.metadata import MetadataQuery, apply_metadata_clause 1a
45from polar.kit.pagination import PaginationParams 1a
46from polar.kit.sorting import Sorting 1a
47from polar.kit.tax import ( 1a
48 TaxabilityReason,
49 TaxRate,
50 calculate_tax,
51 from_stripe_tax_rate,
52 from_stripe_tax_rate_details,
53)
54from polar.kit.utils import utc_now 1a
55from polar.logging import Logger 1a
56from polar.models import ( 1a
57 Checkout,
58 Customer,
59 Discount,
60 Order,
61 OrderItem,
62 Organization,
63 Payment,
64 PaymentMethod,
65 Product,
66 ProductPrice,
67 Subscription,
68 Transaction,
69 User,
70 WalletTransaction,
71)
72from polar.models.held_balance import HeldBalance 1a
73from polar.models.order import ( 1a
74 OrderBillingReason,
75 OrderBillingReasonInternal,
76 OrderStatus,
77)
78from polar.models.product import ProductBillingType 1a
79from polar.models.subscription import SubscriptionStatus 1a
80from polar.models.subscription_meter import SubscriptionMeter 1a
81from polar.models.transaction import TransactionType 1a
82from polar.models.webhook_endpoint import WebhookEventType 1a
83from polar.notifications.notification import ( 1a
84 MaintainerNewProductSaleNotificationPayload,
85 NotificationType,
86)
87from polar.notifications.service import PartialNotification 1a
88from polar.notifications.service import notifications as notifications_service 1a
89from polar.organization.repository import OrganizationRepository 1a
90from polar.organization.service import organization as organization_service 1a
91from polar.payment.repository import PaymentRepository 1a
92from polar.payment_method.repository import PaymentMethodRepository 1a
93from polar.product.guard import is_custom_price, is_seat_price, is_static_price 1a
94from polar.product.repository import ProductPriceRepository 1a
95from polar.subscription.repository import SubscriptionRepository 1a
96from polar.subscription.service import subscription as subscription_service 1a
97from polar.transaction.service.balance import PaymentTransactionForChargeDoesNotExist 1a
98from polar.transaction.service.balance import ( 1a
99 balance_transaction as balance_transaction_service,
100)
101from polar.transaction.service.platform_fee import ( 1a
102 platform_fee_transaction as platform_fee_transaction_service,
103)
104from polar.wallet.repository import WalletTransactionRepository 1a
105from polar.wallet.service import wallet as wallet_service 1a
106from polar.webhook.service import webhook as webhook_service 1a
107from polar.worker import enqueue_job 1a
109from .repository import OrderRepository 1a
110from .schemas import OrderInvoice, OrderUpdate 1a
111from .sorting import OrderSortProperty 1a
113log: Logger = structlog.get_logger() 1a
116class OrderError(PolarError): ... 1a
119class RecurringProduct(OrderError): 1a
120 def __init__(self, checkout: Checkout, product: Product) -> None: 1a
121 self.checkout = checkout
122 self.product = product
123 message = (
124 f"Checkout {checkout.id} is for product {product.id}, "
125 "which is a recurring product."
126 )
127 super().__init__(message)
130class NotRecurringProduct(OrderError): 1a
131 def __init__(self, checkout: Checkout, product: Product) -> None: 1a
132 self.checkout = checkout
133 self.product = product
134 message = (
135 f"Checkout {checkout.id} is for product {product.id}, "
136 "which is not a recurring product."
137 )
138 super().__init__(message)
141class MissingCheckoutCustomer(OrderError): 1a
142 def __init__(self, checkout: Checkout) -> None: 1a
143 self.checkout = checkout
144 message = f"Checkout {checkout.id} is missing a customer."
145 super().__init__(message)
148class MissingStripeCustomerID(OrderError): 1a
149 def __init__(self, checkout: Checkout, customer: Customer) -> None: 1a
150 self.checkout = checkout
151 self.customer = customer
152 message = (
153 f"Checkout {checkout.id}'s customer {customer.id} "
154 "is missing a Stripe customer ID."
155 )
156 super().__init__(message)
159class NotAnOrderInvoice(OrderError): 1a
160 def __init__(self, invoice_id: str) -> None: 1a
161 self.invoice_id = invoice_id
162 message = (
163 f"Received invoice {invoice_id} from Stripe, but it is not an order."
164 " Check if it's an issue pledge."
165 )
166 super().__init__(message)
169class NotASubscriptionInvoice(OrderError): 1a
170 def __init__(self, invoice_id: str) -> None: 1a
171 self.invoice_id = invoice_id
172 message = (
173 f"Received invoice {invoice_id} from Stripe, but it it not linked to a subscription."
174 " One-time purchases invoices are handled directly upon creation."
175 )
176 super().__init__(message)
179class OrderDoesNotExist(OrderError): 1a
180 def __init__(self, invoice_id: str) -> None: 1a
181 self.invoice_id = invoice_id
182 message = (
183 f"Received invoice {invoice_id} from Stripe, "
184 "but no associated Order exists."
185 )
186 super().__init__(message)
189class DiscountDoesNotExist(OrderError): 1a
190 def __init__(self, invoice_id: str, coupon_id: str) -> None: 1a
191 self.invoice_id = invoice_id
192 self.coupon_id = coupon_id
193 message = (
194 f"Received invoice {invoice_id} from Stripe with coupon {coupon_id}, "
195 f"but no associated Discount exists."
196 )
197 super().__init__(message)
200class CheckoutDoesNotExist(OrderError): 1a
201 def __init__(self, invoice_id: str, checkout_id: str) -> None: 1a
202 self.invoice_id = invoice_id
203 self.checkout_id = checkout_id
204 message = (
205 f"Received invoice {invoice_id} from Stripe with checkout {checkout_id}, "
206 f"but no associated Checkout exists."
207 )
208 super().__init__(message)
211class SubscriptionDoesNotExist(OrderError): 1a
212 def __init__(self, invoice_id: str, stripe_subscription_id: str) -> None: 1a
213 self.invoice_id = invoice_id
214 self.stripe_subscription_id = stripe_subscription_id
215 message = (
216 f"Received invoice {invoice_id} from Stripe "
217 f"for subscription {stripe_subscription_id}, "
218 f"but no associated Subscription exists."
219 )
220 super().__init__(message)
223class AlreadyBalancedOrder(OrderError): 1a
224 def __init__(self, order: Order, payment_transaction: Transaction) -> None: 1a
225 self.order = order
226 self.payment_transaction = payment_transaction
227 message = (
228 f"The order {order.id} with payment {payment_transaction.id} "
229 "has already been balanced."
230 )
231 super().__init__(message)
234class NotPaidOrder(OrderError): 1a
235 def __init__(self, order: Order) -> None: 1a
236 self.order = order
237 message = f"Order {order.id} is not paid, so an invoice cannot be generated."
238 super().__init__(message, 422)
241class MissingInvoiceBillingDetails(OrderError): 1a
242 def __init__(self, order: Order) -> None: 1a
243 self.order = order
244 message = (
245 "Billing name and address are required "
246 "to generate an invoice for this order."
247 )
248 super().__init__(message, 422)
251class InvoiceDoesNotExist(OrderError): 1a
252 def __init__(self, order: Order) -> None: 1a
253 self.order = order
254 message = f"No invoice exists for order {order.id}."
255 super().__init__(message, 404)
258class OrderNotEligibleForRetry(OrderError): 1a
259 def __init__(self, order: Order) -> None: 1a
260 self.order = order
261 message = f"Order {order.id} is not eligible for payment retry."
262 super().__init__(message, 422)
265class NoPendingBillingEntries(OrderError): 1a
266 def __init__(self, subscription: Subscription) -> None: 1a
267 self.subscription = subscription
268 message = (
269 f"No pending billing entries found for subscription {subscription.id}."
270 )
271 super().__init__(message)
274class OrderNotPending(OrderError): 1a
275 def __init__(self, order: Order) -> None: 1a
276 self.order = order
277 message = f"Order {order.id} is not pending"
278 super().__init__(message)
281class PaymentAlreadyInProgress(OrderError): 1a
282 def __init__(self, order: Order) -> None: 1a
283 self.order = order
284 message = f"Payment for order {order.id} is already in progress"
285 super().__init__(message, 409)
288class CardPaymentFailed(OrderError): 1a
289 """Exception for card-related payment failures that should not be retried."""
291 def __init__(self, order: Order, stripe_error: stripe_lib.CardError) -> None: 1a
292 self.order = order
293 self.stripe_error = stripe_error
294 message = f"Card payment failed for order {order.id}: {stripe_error.user_message or stripe_error.code}"
295 super().__init__(message, 402)
298class InvalidPaymentProcessor(OrderError): 1a
299 def __init__(self, payment_processor: PaymentProcessor) -> None: 1a
300 self.payment_processor = payment_processor
301 message = f"Invalid payment processor: {payment_processor}"
302 super().__init__(message, 422)
305class PaymentRetryValidationError(OrderError): 1a
306 def __init__(self, message: str) -> None: 1a
307 super().__init__(message, 422)
310class SubscriptionNotTrialing(OrderError): 1a
311 def __init__(self, subscription: Subscription) -> None: 1a
312 self.subscription = subscription
313 message = f"Subscription {subscription.id} is not in trialing status."
314 super().__init__(message)
317def _is_empty_customer_address(customer_address: dict[str, Any] | None) -> bool: 1a
318 return customer_address is None or customer_address["country"] is None
321class OrderService: 1a
322 @asynccontextmanager 1a
323 async def acquire_payment_lock( 1a
324 self, session: AsyncSession, order: Order, *, release_on_success: bool = True
325 ) -> AsyncIterator[None]:
326 """
327 Context manager to acquire and release a payment lock for an order.
328 """
330 repository = OrderRepository.from_session(session)
332 # Try to acquire the lock
333 lock_acquired = await repository.acquire_payment_lock_by_id(order.id)
334 if not lock_acquired:
335 raise PaymentAlreadyInProgress(order)
337 try:
338 yield
339 except Exception:
340 await repository.release_payment_lock(order, flush=True)
341 raise
342 else:
343 if release_on_success:
344 await repository.release_payment_lock(order, flush=True)
346 async def list( 1a
347 self,
348 session: AsyncReadSession,
349 auth_subject: AuthSubject[User | Organization],
350 *,
351 organization_id: Sequence[uuid.UUID] | None = None,
352 product_id: Sequence[uuid.UUID] | None = None,
353 product_billing_type: Sequence[ProductBillingType] | None = None,
354 discount_id: Sequence[uuid.UUID] | None = None,
355 customer_id: Sequence[uuid.UUID] | None = None,
356 checkout_id: Sequence[uuid.UUID] | None = None,
357 metadata: MetadataQuery | None = None,
358 pagination: PaginationParams,
359 sorting: list[Sorting[OrderSortProperty]] = [
360 (OrderSortProperty.created_at, True)
361 ],
362 ) -> tuple[Sequence[Order], int]:
363 repository = OrderRepository.from_session(session)
364 statement = repository.get_readable_statement(auth_subject)
366 statement = (
367 statement.join(Order.discount, isouter=True)
368 .join(Order.product, isouter=True)
369 .options(
370 *repository.get_eager_options(
371 customer_load=contains_eager(Order.customer),
372 product_load=contains_eager(Order.product),
373 discount_load=contains_eager(Order.discount),
374 )
375 )
376 )
378 if organization_id is not None:
379 statement = statement.where(Customer.organization_id.in_(organization_id))
381 if product_id is not None:
382 statement = statement.where(Order.product_id.in_(product_id))
384 if product_billing_type is not None:
385 statement = statement.where(Product.billing_type.in_(product_billing_type))
387 if discount_id is not None:
388 statement = statement.where(Order.discount_id.in_(discount_id))
390 # TODO:
391 # Once we add `external_customer_id` be sure to filter for non-deleted.
392 # Since it could be shared across soft deleted records whereas the unique ID cannot.
393 if customer_id is not None:
394 statement = statement.where(Order.customer_id.in_(customer_id))
396 if checkout_id is not None:
397 statement = statement.where(Order.checkout_id.in_(checkout_id))
399 if metadata is not None:
400 statement = apply_metadata_clause(Order, statement, metadata)
402 statement = repository.apply_sorting(statement, sorting)
404 return await repository.paginate(
405 statement, limit=pagination.limit, page=pagination.page
406 )
408 async def get( 1a
409 self,
410 session: AsyncReadSession,
411 auth_subject: AuthSubject[User | Organization],
412 id: uuid.UUID,
413 ) -> Order | None:
414 repository = OrderRepository.from_session(session)
415 statement = (
416 repository.get_readable_statement(auth_subject)
417 .options(
418 *repository.get_eager_options(
419 customer_load=contains_eager(Order.customer),
420 product_load=joinedload(Order.product),
421 )
422 )
423 .where(Order.id == id)
424 )
425 return await repository.get_one_or_none(statement)
427 async def update( 1a
428 self,
429 session: AsyncSession,
430 order: Order,
431 order_update: OrderUpdate | CustomerOrderUpdate,
432 ) -> Order:
433 repository = OrderRepository.from_session(session)
434 order = await repository.update(
435 order, update_dict=order_update.model_dump(exclude_unset=True)
436 )
438 await self.send_webhook(session, order, WebhookEventType.order_updated)
440 return order
442 async def trigger_invoice_generation( 1a
443 self, session: AsyncSession, order: Order
444 ) -> None:
445 if not order.paid:
446 raise NotPaidOrder(order)
448 if order.billing_name is None or order.billing_address is None:
449 raise MissingInvoiceBillingDetails(order)
451 enqueue_job("order.invoice", order_id=order.id)
453 async def generate_invoice(self, session: AsyncSession, order: Order) -> Order: 1a
454 invoice_path = await invoice_service.create_order_invoice(order)
455 repository = OrderRepository.from_session(session)
456 order = await repository.update(
457 order, update_dict={"invoice_path": invoice_path}
458 )
460 await eventstream_publish(
461 "order.invoice_generated",
462 {"order_id": order.id},
463 customer_id=order.customer_id,
464 organization_id=order.organization.id,
465 )
467 await self.send_webhook(session, order, WebhookEventType.order_updated)
469 return order
471 async def get_order_invoice(self, order: Order) -> OrderInvoice: 1a
472 if order.invoice_path is None:
473 raise InvoiceDoesNotExist(order)
475 url, _ = await invoice_service.get_order_invoice_url(order)
476 return OrderInvoice(url=url)
478 async def create_from_checkout_one_time( 1a
479 self, session: AsyncSession, checkout: Checkout, payment: Payment | None = None
480 ) -> Order:
481 assert has_product_checkout(checkout)
483 product = checkout.product
484 if product.is_recurring:
485 raise RecurringProduct(checkout, product)
487 order = await self._create_order_from_checkout(
488 session, checkout, OrderBillingReasonInternal.purchase, payment
489 )
491 # For seat-based orders, benefits are granted when seats are claimed
492 # For non-seat orders, grant benefits immediately
493 prices = checkout.prices[product.id]
494 has_seat_price = any(is_seat_price(price) for price in prices)
495 if not has_seat_price:
496 enqueue_job(
497 "benefit.enqueue_benefits_grants",
498 task="grant",
499 customer_id=order.customer.id,
500 product_id=product.id,
501 order_id=order.id,
502 )
504 # Trigger notifications
505 organization = checkout.organization
506 await self.send_admin_notification(session, organization, order)
508 return order
510 async def create_from_checkout_subscription( 1a
511 self,
512 session: AsyncSession,
513 checkout: Checkout,
514 subscription: Subscription,
515 billing_reason: Literal[
516 OrderBillingReasonInternal.subscription_create,
517 OrderBillingReasonInternal.subscription_update,
518 ],
519 payment: Payment | None = None,
520 ) -> Order:
521 assert has_product_checkout(checkout)
523 product = checkout.product
524 if not product.is_recurring:
525 raise NotRecurringProduct(checkout, product)
527 if subscription.trialing:
528 return await self.create_trial_order(
529 session, subscription, billing_reason, checkout
530 )
532 return await self._create_order_from_checkout(
533 session, checkout, billing_reason, payment, subscription
534 )
536 async def _create_order_from_checkout( 1a
537 self,
538 session: AsyncSession,
539 checkout: Checkout,
540 billing_reason: OrderBillingReasonInternal,
541 payment: Payment | None = None,
542 subscription: Subscription | None = None,
543 ) -> Order:
544 customer = checkout.customer
545 if customer is None:
546 raise MissingCheckoutCustomer(checkout)
548 items: list[OrderItem] = []
549 if has_product_checkout(checkout):
550 prices = checkout.prices[checkout.product_id]
551 for price in prices:
552 # Don't create an item for metered prices
553 if not is_static_price(price):
554 continue
555 if is_custom_price(price):
556 item = OrderItem.from_price(price, 0, checkout.amount)
557 else:
558 item = OrderItem.from_price(price, 0, seats=checkout.seats)
559 items.append(item)
561 discount_amount = checkout.discount_amount
563 # Retrieve tax data
564 tax_amount = checkout.tax_amount or 0
565 taxability_reason = None
566 tax_rate: TaxRate | None = None
567 tax_id = customer.tax_id
568 if checkout.tax_processor_id is not None:
569 calculation = await stripe_service.get_tax_calculation(
570 checkout.tax_processor_id
571 )
572 assert tax_amount == calculation.tax_amount_exclusive
573 assert len(calculation.tax_breakdown) > 0
574 if len(calculation.tax_breakdown) > 1:
575 log.warning(
576 "Multiple tax breakdowns found for checkout",
577 checkout_id=checkout.id,
578 calculation_id=calculation.id,
579 )
580 breakdown = calculation.tax_breakdown[0]
581 taxability_reason = TaxabilityReason.from_stripe(
582 breakdown.taxability_reason, tax_amount
583 )
584 tax_rate = from_stripe_tax_rate_details(breakdown.tax_rate_details)
586 organization = checkout.organization
587 invoice_number = await organization_service.get_next_invoice_number(
588 session, organization, customer
589 )
591 repository = OrderRepository.from_session(session)
592 order = await repository.create(
593 Order(
594 status=OrderStatus.paid,
595 subtotal_amount=checkout.amount,
596 discount_amount=discount_amount,
597 tax_amount=tax_amount,
598 currency=checkout.currency,
599 billing_reason=billing_reason,
600 billing_name=customer.billing_name,
601 billing_address=customer.billing_address,
602 taxability_reason=taxability_reason,
603 tax_id=tax_id,
604 tax_rate=tax_rate,
605 invoice_number=invoice_number,
606 customer=customer,
607 product=checkout.product,
608 discount=checkout.discount,
609 subscription=subscription,
610 checkout=checkout,
611 user_metadata=checkout.user_metadata,
612 custom_field_data=checkout.custom_field_data,
613 items=items,
614 seats=checkout.seats,
615 ),
616 flush=True,
617 )
619 # Link payment and balance transaction to the order
620 if payment is not None:
621 payment_repository = PaymentRepository.from_session(session)
622 assert payment.amount == order.total_amount
623 await payment_repository.update(payment, update_dict={"order": order})
624 enqueue_job(
625 "order.balance", order_id=order.id, charge_id=payment.processor_id
626 )
628 # Record tax transaction
629 if checkout.tax_processor_id is not None:
630 transaction = await stripe_service.create_tax_transaction(
631 checkout.tax_processor_id, str(order.id)
632 )
633 await repository.update(
634 order, update_dict={"tax_transaction_processor_id": transaction.id}
635 )
637 await self._on_order_created(session, order)
639 return order
641 async def create_subscription_order( 1a
642 self,
643 session: AsyncSession,
644 subscription: Subscription,
645 billing_reason: OrderBillingReasonInternal,
646 ) -> Order:
647 items = await billing_entry_service.create_order_items_from_pending(
648 session, subscription
649 )
650 if len(items) == 0:
651 raise NoPendingBillingEntries(subscription)
653 order_id = uuid.uuid4()
654 customer = subscription.customer
655 billing_address = customer.billing_address
656 product = subscription.product
658 subtotal_amount = sum(item.amount for item in items)
660 discount = subscription.discount
661 discount_amount = 0
662 if discount is not None:
663 # Discount only applies to cycle and meter items, as prorations
664 # use "last month's" discount and so this month's discount
665 # shouldn't apply to those.
666 discountable_amount = sum(
667 item.amount for item in items if item.discountable
668 )
669 discount_amount = discount.get_discount_amount(discountable_amount)
671 # Calculate tax
672 taxable_amount = subtotal_amount - discount_amount
673 tax_amount = 0
674 taxability_reason: TaxabilityReason | None = None
675 tax_rate: TaxRate | None = None
676 tax_id = customer.tax_id
677 tax_calculation_processor_id: str | None = None
679 if (
680 taxable_amount != 0
681 and product.is_tax_applicable
682 and billing_address is not None
683 ):
684 tax_calculation = await calculate_tax(
685 order_id,
686 subscription.currency,
687 # Stripe doesn't support calculating negative tax amounts
688 taxable_amount if taxable_amount >= 0 else -taxable_amount,
689 product.tax_code,
690 billing_address,
691 [tax_id] if tax_id is not None else [],
692 subscription.tax_exempted,
693 )
694 if taxable_amount >= 0:
695 tax_calculation_processor_id = tax_calculation["processor_id"]
696 tax_amount = tax_calculation["amount"]
697 else:
698 # When the taxable amount is negative it's usually due to a credit proration
699 # this means we "owe" the customer money -- but we don't pay it back at this
700 # point. This also means that there's no money transaction going on, and we
701 # don't have to record the tax transaction either.
702 tax_calculation_processor_id = None
703 tax_amount = -tax_calculation["amount"]
705 taxability_reason = tax_calculation["taxability_reason"]
706 tax_rate = tax_calculation["tax_rate"]
708 invoice_number = await organization_service.get_next_invoice_number(
709 session, subscription.organization, customer
710 )
712 total_amount = subtotal_amount - discount_amount + tax_amount
713 customer_balance = await wallet_service.get_billing_wallet_balance(
714 session, customer, subscription.currency
715 )
717 # Calculate balance change and applied amount
718 if total_amount >= 0:
719 # Order is a charge: use customer balance if available
720 balance_change = -min(total_amount, customer_balance)
721 applied_balance_amount = balance_change
722 else:
723 # Order is a credit: always add to balance
724 balance_change = -total_amount
725 # Track how much existing debt was cleared
726 if customer_balance < 0:
727 applied_balance_amount = min(-total_amount, -customer_balance)
728 else:
729 applied_balance_amount = 0
731 repository = OrderRepository.from_session(session)
732 order = await repository.create(
733 Order(
734 id=order_id,
735 status=OrderStatus.pending,
736 subtotal_amount=subtotal_amount,
737 discount_amount=discount_amount,
738 tax_amount=tax_amount,
739 applied_balance_amount=applied_balance_amount,
740 currency=subscription.currency,
741 billing_reason=billing_reason,
742 billing_name=customer.billing_name,
743 billing_address=billing_address,
744 taxability_reason=taxability_reason,
745 tax_id=tax_id,
746 tax_rate=tax_rate,
747 tax_calculation_processor_id=tax_calculation_processor_id,
748 invoice_number=invoice_number,
749 customer=customer,
750 product=subscription.product,
751 discount=discount,
752 subscription=subscription,
753 checkout=None,
754 items=items,
755 user_metadata=subscription.user_metadata,
756 custom_field_data=subscription.custom_field_data,
757 ),
758 flush=True,
759 )
761 # Impact customer's balance
762 if balance_change != 0:
763 await wallet_service.create_balance_transaction(
764 session, customer, balance_change, subscription.currency, order=order
765 )
767 # Reset the associated meters, if any
768 if billing_reason in {
769 OrderBillingReasonInternal.subscription_cycle,
770 OrderBillingReasonInternal.subscription_cycle_after_trial,
771 OrderBillingReasonInternal.subscription_update,
772 }:
773 await subscription_service.reset_meters(session, subscription)
775 # If the due amount is less or equal than zero, mark it as paid immediately
776 if order.due_amount <= 0:
777 order = await repository.update(
778 order, update_dict={"status": OrderStatus.paid}
779 )
780 else:
781 enqueue_job(
782 "order.trigger_payment",
783 order_id=order.id,
784 payment_method_id=subscription.payment_method_id,
785 )
787 await self._on_order_created(session, order)
789 return order
791 async def create_trial_order( 1a
792 self,
793 session: AsyncSession,
794 subscription: Subscription,
795 billing_reason: Literal[
796 OrderBillingReasonInternal.subscription_create,
797 OrderBillingReasonInternal.subscription_update,
798 ],
799 checkout: Checkout | None = None,
800 ) -> Order:
801 if not subscription.trialing:
802 raise SubscriptionNotTrialing(subscription)
803 assert subscription.trial_start is not None
804 assert subscription.trial_end is not None
806 product = subscription.product
807 customer = subscription.customer
809 items: list[OrderItem] = [
810 OrderItem.from_trial(
811 product, subscription.trial_start, subscription.trial_end
812 )
813 ]
815 organization = subscription.organization
816 invoice_number = await organization_service.get_next_invoice_number(
817 session, organization, customer
818 )
820 repository = OrderRepository.from_session(session)
821 order = await repository.create(
822 Order(
823 status=OrderStatus.paid,
824 subtotal_amount=sum(item.amount for item in items),
825 discount_amount=0,
826 tax_amount=0,
827 currency=subscription.currency,
828 billing_reason=billing_reason,
829 billing_name=customer.billing_name,
830 billing_address=customer.billing_address,
831 taxability_reason=None,
832 tax_id=customer.tax_id,
833 tax_rate=None,
834 invoice_number=invoice_number,
835 customer=customer,
836 product=product,
837 discount=None,
838 subscription=subscription,
839 checkout=checkout,
840 user_metadata=subscription.user_metadata,
841 custom_field_data=subscription.custom_field_data,
842 items=items,
843 ),
844 flush=True,
845 )
847 await self._on_order_created(session, order)
849 return order
851 async def create_wallet_order( 1a
852 self,
853 session: AsyncSession,
854 wallet_transaction: WalletTransaction,
855 payment: Payment | None,
856 ) -> Order:
857 wallet = wallet_transaction.wallet
858 items: list[OrderItem] = [
859 OrderItem.from_wallet(wallet, wallet_transaction.amount)
860 ]
862 customer = wallet.customer
863 billing_address = customer.billing_address
865 subtotal_amount = sum(item.amount for item in items)
867 # Retrieve tax data
868 tax_amount = wallet_transaction.tax_amount or 0
869 taxability_reason = None
870 tax_rate: TaxRate | None = None
871 tax_id = customer.tax_id
872 if wallet_transaction.tax_calculation_processor_id is not None:
873 calculation = await stripe_service.get_tax_calculation(
874 wallet_transaction.tax_calculation_processor_id
875 )
876 assert tax_amount == calculation.tax_amount_exclusive
877 assert len(calculation.tax_breakdown) > 0
878 breakdown = calculation.tax_breakdown[0]
879 taxability_reason = TaxabilityReason.from_stripe(
880 breakdown.taxability_reason, tax_amount
881 )
882 tax_rate = from_stripe_tax_rate_details(breakdown.tax_rate_details)
884 invoice_number = await organization_service.get_next_invoice_number(
885 session, wallet.organization, wallet.customer
886 )
888 repository = OrderRepository.from_session(session)
889 order = await repository.create(
890 Order(
891 status=OrderStatus.paid,
892 subtotal_amount=subtotal_amount,
893 discount_amount=0,
894 tax_amount=tax_amount,
895 applied_balance_amount=0,
896 currency=wallet.currency,
897 billing_reason=OrderBillingReasonInternal.purchase,
898 billing_name=customer.billing_name,
899 billing_address=billing_address,
900 taxability_reason=taxability_reason,
901 tax_id=tax_id,
902 tax_rate=tax_rate,
903 invoice_number=invoice_number,
904 customer=customer,
905 items=items,
906 product=None,
907 discount=None,
908 subscription=None,
909 checkout=None,
910 ),
911 flush=True,
912 )
914 # Link payment and balance transaction to the order
915 if payment is not None:
916 payment_repository = PaymentRepository.from_session(session)
917 assert payment.amount == order.total_amount
918 await payment_repository.update(payment, update_dict={"order": order})
919 enqueue_job(
920 "order.balance", order_id=order.id, charge_id=payment.processor_id
921 )
923 # Link wallet transaction to the order
924 wallet_transaction_repository = WalletTransactionRepository.from_session(
925 session
926 )
927 await wallet_transaction_repository.update(
928 wallet_transaction, update_dict={"order": order}
929 )
931 # Record tax transaction
932 if wallet_transaction.tax_calculation_processor_id is not None:
933 transaction = await stripe_service.create_tax_transaction(
934 wallet_transaction.tax_calculation_processor_id, str(order.id)
935 )
936 await repository.update(
937 order, update_dict={"tax_transaction_processor_id": transaction.id}
938 )
940 await self._on_order_created(session, order)
942 return order
944 async def trigger_payment( 1a
945 self, session: AsyncSession, order: Order, payment_method: PaymentMethod
946 ) -> None:
947 if order.status != OrderStatus.pending:
948 raise OrderNotPending(order)
950 if order.payment_lock_acquired_at is not None:
951 log.warn("Payment already in progress", order_id=order.id)
952 raise PaymentAlreadyInProgress(order)
954 if (
955 payment_method.processor == PaymentProcessor.stripe
956 and order.due_amount < 50
957 ):
958 # Stripe requires a minimum amount of 50 cents, mark it as paid
959 repository = OrderRepository.from_session(session)
960 previous_status = order.status
961 order = await repository.update(
962 order, update_dict={"status": OrderStatus.paid}
963 )
965 # Add to the customer's balance
966 await wallet_service.create_balance_transaction(
967 session,
968 order.customer,
969 -order.due_amount,
970 order.currency,
971 order=order,
972 )
974 await self._on_order_updated(session, order, previous_status)
975 return
977 async with self.acquire_payment_lock(session, order, release_on_success=False):
978 if payment_method.processor == PaymentProcessor.stripe:
979 metadata: dict[str, Any] = {"order_id": str(order.id)}
981 if order.tax_rate is not None:
982 metadata["tax_amount"] = order.tax_amount
983 metadata["tax_country"] = order.tax_rate["country"]
984 metadata["tax_state"] = order.tax_rate["state"]
986 stripe_customer_id = order.customer.stripe_customer_id
987 assert stripe_customer_id is not None
989 try:
990 await stripe_service.create_payment_intent(
991 amount=order.due_amount,
992 currency=order.currency,
993 payment_method=payment_method.processor_id,
994 customer=stripe_customer_id,
995 confirm=True,
996 off_session=True,
997 statement_descriptor_suffix=order.statement_descriptor_suffix,
998 description=f"{order.organization.name} — {order.description}",
999 metadata=metadata,
1000 )
1001 except stripe_lib.CardError as e:
1002 # Card errors (declines, expired cards, etc.) should not be retried
1003 # They will be handled by the dunning process
1004 log.info(
1005 "Card payment failed",
1006 order_id=order.id,
1007 error_code=e.code,
1008 error_message=e.user_message,
1009 )
1010 raise CardPaymentFailed(order, e) from e
1012 async def process_retry_payment( 1a
1013 self,
1014 session: AsyncSession,
1015 order: Order,
1016 confirmation_token_id: str | None,
1017 payment_processor: PaymentProcessor,
1018 payment_method_id: uuid.UUID | None = None,
1019 ) -> CustomerOrderPaymentConfirmation:
1020 """
1021 Process retry payment with direct confirmation (confirm=True).
1022 Follows checkout flow pattern - creates PaymentIntent and lets webhooks handle everything else.
1023 """
1025 if order.status != OrderStatus.pending:
1026 log.warning("Order is not pending", order_id=order.id, status=order.status)
1027 raise OrderNotEligibleForRetry(order)
1029 if order.next_payment_attempt_at is None:
1030 log.warning("Order is not eligible for retry", order_id=order.id)
1031 raise OrderNotEligibleForRetry(order)
1033 if order.subscription is None:
1034 log.warning("Order is not a subscription", order_id=order.id)
1035 raise OrderNotEligibleForRetry(order)
1037 if order.payment_lock_acquired_at is not None:
1038 log.warning(
1039 "Payment already in progress",
1040 order_id=order.id,
1041 lock_acquired_at=order.payment_lock_acquired_at,
1042 )
1043 raise PaymentAlreadyInProgress(order)
1045 if payment_processor != PaymentProcessor.stripe:
1046 log.warning(
1047 "Invalid payment processor", payment_processor=payment_processor
1048 )
1049 raise OrderNotEligibleForRetry(payment_processor)
1051 if confirmation_token_id is None and payment_method_id is None:
1052 raise PaymentRetryValidationError(
1053 "Either confirmation_token_id or payment_method_id must be provided"
1054 )
1055 if confirmation_token_id is not None and payment_method_id is not None:
1056 raise PaymentRetryValidationError(
1057 "Only one of confirmation_token_id or payment_method_id can be provided"
1058 )
1060 customer_repository = CustomerRepository.from_session(session)
1061 customer = await customer_repository.get_by_id(order.customer_id)
1062 assert customer is not None, "Customer must exist"
1064 org_repository = OrganizationRepository.from_session(session)
1065 organization = await org_repository.get_by_id(customer.organization_id)
1066 assert organization is not None, "Organization must exist"
1068 if customer.stripe_customer_id is None:
1069 log.warning("Customer is not a Stripe customer", customer_id=customer.id)
1070 raise OrderNotEligibleForRetry(order)
1072 saved_payment_method: PaymentMethod | None = None
1073 if payment_method_id is not None:
1074 payment_method_repository = PaymentMethodRepository.from_session(session)
1075 saved_payment_method = await payment_method_repository.get_by_id(
1076 payment_method_id
1077 )
1078 if (
1079 saved_payment_method is None
1080 or saved_payment_method.customer_id != customer.id
1081 ):
1082 raise PaymentRetryValidationError(
1083 "Payment method does not belong to customer"
1084 )
1086 metadata: dict[str, Any] = {
1087 "order_id": str(order.id),
1088 }
1089 if order.tax_rate is not None:
1090 metadata["tax_amount"] = str(order.tax_amount)
1091 metadata["tax_country"] = order.tax_rate["country"]
1092 metadata["tax_state"] = order.tax_rate["state"]
1094 try:
1095 async with self.acquire_payment_lock(
1096 session, order, release_on_success=True
1097 ):
1098 if saved_payment_method is not None:
1099 # Using saved payment method
1100 payment_intent = await stripe_service.create_payment_intent(
1101 amount=order.total_amount,
1102 currency=order.currency,
1103 payment_method=saved_payment_method.processor_id,
1104 customer=customer.stripe_customer_id,
1105 confirm=True,
1106 statement_descriptor_suffix=order.statement_descriptor_suffix,
1107 description=f"{order.organization.name} — {order.description}",
1108 metadata=metadata,
1109 return_url=settings.generate_frontend_url(
1110 f"/portal/orders/{str(order.id)}"
1111 ),
1112 )
1113 else:
1114 # Using confirmation token (new payment method)
1115 assert confirmation_token_id is not None
1116 payment_intent = await stripe_service.create_payment_intent(
1117 amount=order.total_amount,
1118 currency=order.currency,
1119 automatic_payment_methods={"enabled": True},
1120 confirm=True,
1121 confirmation_token=confirmation_token_id,
1122 customer=customer.stripe_customer_id,
1123 setup_future_usage="off_session",
1124 statement_descriptor_suffix=order.statement_descriptor_suffix,
1125 description=f"{order.organization.name} — {order.description}",
1126 metadata=metadata,
1127 return_url=settings.generate_frontend_url(
1128 f"/portal/orders/{str(order.id)}"
1129 ),
1130 )
1132 if payment_intent.status == "succeeded":
1133 log.info(
1134 "Retry payment succeeded immediately",
1135 order_id=order.id,
1136 payment_intent_id=payment_intent.id,
1137 )
1139 return CustomerOrderPaymentConfirmation(
1140 status="succeeded",
1141 client_secret=None,
1142 error=None,
1143 )
1145 elif payment_intent.status == "requires_action":
1146 log.info(
1147 "Retry payment requires additional action",
1148 order_id=order.id,
1149 payment_intent_id=payment_intent.id,
1150 status=payment_intent.status,
1151 )
1153 return CustomerOrderPaymentConfirmation(
1154 status="requires_action",
1155 client_secret=payment_intent.client_secret,
1156 error=None,
1157 )
1159 else:
1160 error_message = "Payment failed"
1161 if (
1162 payment_intent.last_payment_error
1163 and payment_intent.last_payment_error.message
1164 ):
1165 error_message = payment_intent.last_payment_error.message
1167 log.warning(
1168 "Retry payment failed",
1169 order_id=order.id,
1170 payment_intent_id=payment_intent.id,
1171 status=payment_intent.status,
1172 error=error_message,
1173 )
1175 return CustomerOrderPaymentConfirmation(
1176 status="failed",
1177 client_secret=None,
1178 error=error_message,
1179 )
1181 except stripe_lib.StripeError as stripe_exc:
1182 log.warning(
1183 "Stripe error during retry payment",
1184 order_id=order.id,
1185 stripe_error_code=stripe_exc.code,
1186 stripe_error_message=str(stripe_exc),
1187 )
1189 error_message = (
1190 stripe_exc.error.message
1191 if stripe_exc.error and stripe_exc.error.message
1192 else "Payment failed. Please try again."
1193 )
1195 return CustomerOrderPaymentConfirmation(
1196 status="failed",
1197 client_secret=None,
1198 error=error_message,
1199 )
1201 except Exception as exc:
1202 log.error(
1203 "Exception during retry payment",
1204 order_id=order.id,
1205 error=str(exc),
1206 exc_info=True, # Include full traceback
1207 )
1209 return CustomerOrderPaymentConfirmation(
1210 status="failed",
1211 client_secret=None,
1212 error="Payment failed. Please try again.",
1213 )
1215 async def handle_payment( 1a
1216 self, session: AsyncSession, order: Order, payment: Payment | None
1217 ) -> Order:
1218 # Stripe invoices may already have been marked as paid, so ignore the check
1219 if order.stripe_invoice_id is None and order.status != OrderStatus.pending:
1220 raise OrderNotPending(order)
1222 previous_status = order.status
1223 update_dict: dict[str, Any] = {}
1225 if order.status == OrderStatus.pending:
1226 update_dict["status"] = OrderStatus.paid
1228 # Clear retry attempt date on successful payment
1229 if order.next_payment_attempt_at is not None:
1230 update_dict["next_payment_attempt_at"] = None
1232 # Clear payment lock on successful payment
1233 if order.payment_lock_acquired_at is not None:
1234 log.info(
1235 "Clearing payment lock on order due to successful payment",
1236 order_id=order.id,
1237 )
1238 update_dict["payment_lock_acquired_at"] = None
1240 # Balance the order in the ledger
1241 if payment is not None:
1242 enqueue_job(
1243 "order.balance", order_id=order.id, charge_id=payment.processor_id
1244 )
1246 # Record tax transaction
1247 if (
1248 order.tax_calculation_processor_id is not None
1249 and order.tax_transaction_processor_id is None
1250 ):
1251 transaction = await stripe_service.create_tax_transaction(
1252 order.tax_calculation_processor_id, str(order.id)
1253 )
1254 update_dict["tax_transaction_processor_id"] = transaction.id
1256 repository = OrderRepository.from_session(session)
1257 order = await repository.update(order, update_dict=update_dict)
1259 # If this was a subscription retry success, reactivate the subscription
1260 if (
1261 previous_status == OrderStatus.pending
1262 and order.subscription is not None
1263 and order.subscription.status == SubscriptionStatus.past_due
1264 ):
1265 await subscription_service.mark_active(session, order.subscription)
1267 if update_dict:
1268 await self._on_order_updated(session, order, previous_status)
1270 return order
1272 async def create_order_from_stripe( 1a
1273 self, session: AsyncSession, invoice: stripe_lib.Invoice
1274 ) -> Order:
1275 assert invoice.id is not None
1277 if invoice.metadata and invoice.metadata.get("type") in {ProductType.pledge}:
1278 raise NotAnOrderInvoice(invoice.id)
1280 if invoice.subscription is None:
1281 raise NotASubscriptionInvoice(invoice.id)
1283 # Get subscription
1284 stripe_subscription_id = get_expandable_id(invoice.subscription)
1285 subscription_repository = SubscriptionRepository.from_session(session)
1286 subscription = await subscription_repository.get_by_stripe_subscription_id(
1287 stripe_subscription_id,
1288 options=(
1289 joinedload(Subscription.product).joinedload(Product.organization),
1290 joinedload(Subscription.customer),
1291 joinedload(Subscription.meters).joinedload(SubscriptionMeter.meter),
1292 ),
1293 )
1294 if subscription is None:
1295 raise SubscriptionDoesNotExist(invoice.id, stripe_subscription_id)
1297 # Get customer
1298 customer = subscription.customer
1300 # Retrieve billing address
1301 billing_address: Address | None = None
1302 if customer.billing_address is not None:
1303 billing_address = customer.billing_address
1304 elif not _is_empty_customer_address(invoice.customer_address):
1305 billing_address = AddressInput.model_validate(invoice.customer_address)
1306 # Try to retrieve the country from the payment method
1307 elif invoice.charge is not None:
1308 charge = await stripe_service.get_charge(get_expandable_id(invoice.charge))
1309 if payment_method_details := charge.payment_method_details:
1310 if card := getattr(payment_method_details, "card", None):
1311 billing_address = Address.model_validate({"country": card.country})
1313 # Get Discount if available
1314 discount: Discount | None = None
1315 if invoice.discount is not None:
1316 coupon = invoice.discount.coupon
1317 if (metadata := coupon.metadata) is None:
1318 raise DiscountDoesNotExist(invoice.id, coupon.id)
1319 discount_id = metadata["discount_id"]
1320 discount = await discount_service.get(
1321 session, uuid.UUID(discount_id), allow_deleted=True
1322 )
1323 if discount is None:
1324 raise DiscountDoesNotExist(invoice.id, coupon.id)
1326 # Get Checkout if available
1327 checkout: Checkout | None = None
1328 invoice_metadata = invoice.metadata or {}
1329 subscription_metadata = (
1330 invoice.subscription_details.metadata or {}
1331 if invoice.subscription_details
1332 else {}
1333 )
1334 checkout_id = invoice_metadata.get("checkout_id") or subscription_metadata.get(
1335 "checkout_id"
1336 )
1337 if checkout_id is not None:
1338 chekout_repository = CheckoutRepository.from_session(session)
1339 checkout = await chekout_repository.get_by_id(uuid.UUID(checkout_id))
1340 if checkout is None:
1341 raise CheckoutDoesNotExist(invoice.id, checkout_id)
1343 # Handle items
1344 product_price_repository = ProductPriceRepository.from_session(session)
1345 items: list[OrderItem] = []
1346 for line in invoice.lines:
1347 tax_amount = sum([tax.amount for tax in line.tax_amounts])
1348 product_price: ProductPrice | None = None
1349 price = line.price
1350 if price is not None:
1351 if price.metadata and price.metadata.get("product_price_id"):
1352 product_price = await product_price_repository.get_by_id(
1353 uuid.UUID(price.metadata["product_price_id"]),
1354 options=product_price_repository.get_eager_options(),
1355 )
1356 else:
1357 product_price = (
1358 await product_price_repository.get_by_stripe_price_id(
1359 price.id,
1360 options=product_price_repository.get_eager_options(),
1361 )
1362 )
1364 items.append(
1365 OrderItem(
1366 label=line.description or "",
1367 amount=line.amount,
1368 tax_amount=tax_amount,
1369 proration=line.proration,
1370 product_price=product_price,
1371 )
1372 )
1374 if invoice.status == "draft":
1375 # Add pending billing entries
1376 stripe_customer_id = customer.stripe_customer_id
1377 assert stripe_customer_id is not None
1378 pending_items = await billing_entry_service.create_order_items_from_pending(
1379 session,
1380 subscription,
1381 stripe_invoice_id=invoice.id,
1382 stripe_customer_id=stripe_customer_id,
1383 )
1384 items.extend(pending_items)
1385 # Reload the invoice to get totals with added pending items
1386 if len(pending_items) > 0:
1387 invoice = await stripe_service.get_invoice(invoice.id)
1389 # Update statement descriptor
1390 # Stripe doesn't allow to set statement descriptor on the subscription itself,
1391 # so we need to set it manually on each new invoice.
1392 assert invoice.id is not None
1393 await stripe_service.update_invoice(
1394 invoice.id,
1395 statement_descriptor=subscription.organization.statement_descriptor_prefixed,
1396 )
1398 # Determine billing reason
1399 billing_reason = OrderBillingReason.subscription_cycle
1400 if invoice.billing_reason is not None:
1401 try:
1402 billing_reason = OrderBillingReason(invoice.billing_reason)
1403 except ValueError as e:
1404 log.error(
1405 "Unknown billing reason, fallback to 'subscription_cycle'",
1406 invoice_id=invoice.id,
1407 billing_reason=invoice.billing_reason,
1408 )
1410 # Calculate discount amount
1411 discount_amount = 0
1412 if invoice.total_discount_amounts:
1413 for stripe_discount_amount in invoice.total_discount_amounts:
1414 discount_amount += stripe_discount_amount.amount
1416 # Retrieve tax data
1417 tax_id = customer.tax_id
1418 tax_calculation_processor_id: str | None = None
1419 tax_amount = invoice.tax or 0
1420 taxability_reason: TaxabilityReason | None = None
1421 tax_rate: TaxRate | None = None
1423 # If the subscription is tax-exempted, we need to retrieve tax rate manually:
1424 # we don't apply tax on the invoice, but we need to know the rate for our
1425 # accounting and fulfillment purposes.
1426 if subscription.tax_exempted:
1427 product = subscription.product
1428 assert invoice.id is not None
1429 assert customer.billing_address is not None
1430 tax_calculation = await calculate_tax(
1431 invoice.id,
1432 invoice.currency,
1433 invoice.subtotal,
1434 product.tax_code,
1435 customer.billing_address,
1436 [customer.tax_id] if customer.tax_id is not None else [],
1437 subscription.tax_exempted,
1438 )
1439 tax_calculation_processor_id = tax_calculation["processor_id"]
1440 tax_amount = tax_calculation["amount"]
1441 taxability_reason = tax_calculation["taxability_reason"]
1442 tax_rate = tax_calculation["tax_rate"]
1443 # Automatic tax is enabled, so we can directly take the data from Stripe
1444 else:
1445 for total_tax_amount in invoice.total_tax_amounts:
1446 taxability_reason = TaxabilityReason.from_stripe(
1447 total_tax_amount.taxability_reason, tax_amount
1448 )
1449 stripe_tax_rate = await stripe_service.get_tax_rate(
1450 get_expandable_id(total_tax_amount.tax_rate)
1451 )
1452 try:
1453 tax_rate = from_stripe_tax_rate(stripe_tax_rate)
1454 except ValueError:
1455 continue
1456 else:
1457 break
1459 # Ensure it inherits original metadata and custom fields
1460 user_metadata = (
1461 checkout.user_metadata
1462 if checkout is not None
1463 else subscription.user_metadata
1464 )
1465 custom_field_data = (
1466 checkout.custom_field_data
1467 if checkout is not None
1468 else subscription.custom_field_data
1469 )
1471 invoice_number = await organization_service.get_next_invoice_number(
1472 session, subscription.organization, customer
1473 )
1475 repository = OrderRepository.from_session(session)
1476 order = await repository.create(
1477 Order(
1478 status=OrderStatus.paid
1479 if invoice.status == "paid"
1480 else OrderStatus.pending,
1481 subtotal_amount=invoice.subtotal,
1482 discount_amount=discount_amount,
1483 tax_amount=tax_amount,
1484 currency=invoice.currency,
1485 billing_reason=billing_reason,
1486 billing_name=customer.billing_name,
1487 billing_address=billing_address,
1488 stripe_invoice_id=invoice.id,
1489 taxability_reason=taxability_reason,
1490 tax_id=tax_id,
1491 tax_rate=tax_rate,
1492 tax_calculation_processor_id=tax_calculation_processor_id,
1493 invoice_number=invoice_number,
1494 customer=customer,
1495 product=subscription.product,
1496 discount=discount,
1497 subscription=subscription,
1498 checkout=checkout,
1499 items=items,
1500 user_metadata=user_metadata,
1501 custom_field_data=custom_field_data,
1502 created_at=datetime.fromtimestamp(invoice.created, tz=UTC),
1503 ),
1504 flush=True,
1505 )
1507 # Reset the associated meters, if any
1508 if billing_reason == OrderBillingReason.subscription_cycle:
1509 await subscription_service.reset_meters(session, subscription)
1511 await self._on_order_created(session, order)
1513 return order
1515 async def send_admin_notification( 1a
1516 self, session: AsyncSession, organization: Organization, order: Order
1517 ) -> None:
1518 product = order.product
1520 if product is None:
1521 return
1523 if organization.notification_settings["new_order"]:
1524 await notifications_service.send_to_org_members(
1525 session,
1526 org_id=organization.id,
1527 notif=PartialNotification(
1528 type=NotificationType.maintainer_new_product_sale,
1529 payload=MaintainerNewProductSaleNotificationPayload(
1530 customer_name=order.customer.email,
1531 product_name=product.name,
1532 product_price_amount=order.net_amount,
1533 organization_name=organization.slug,
1534 ),
1535 ),
1536 )
1538 async def update_order_from_stripe( 1a
1539 self, session: AsyncSession, invoice: stripe_lib.Invoice
1540 ) -> Order:
1541 repository = OrderRepository.from_session(session)
1542 assert invoice.id is not None
1543 order = await repository.get_by_stripe_invoice_id(
1544 invoice.id, options=repository.get_eager_options()
1545 )
1546 if order is None:
1547 raise OrderDoesNotExist(invoice.id)
1549 previous_status = order.status
1550 status = OrderStatus.paid if invoice.status == "paid" else OrderStatus.pending
1551 order = await repository.update(order, update_dict={"status": status})
1553 # Enqueue the balance creation for out-of-band subscription creation orders
1554 if (
1555 order.paid
1556 and invoice.metadata
1557 and (charge_id := invoice.metadata.get("charge_id"))
1558 ):
1559 enqueue_job("order.balance", order_id=order.id, charge_id=charge_id)
1561 await self._on_order_updated(session, order, previous_status)
1562 return order
1564 async def send_confirmation_email( 1a
1565 self, session: AsyncSession, order: Order
1566 ) -> None:
1567 organization_repository = OrganizationRepository.from_session(session)
1568 organization = await organization_repository.get_by_customer(order.customer_id)
1570 template_name: Literal[
1571 "order_confirmation",
1572 "subscription_confirmation",
1573 "subscription_cycled",
1574 "subscription_updated",
1575 ]
1576 subject_template: str
1577 url_path_template: str
1579 match order.billing_reason:
1580 case OrderBillingReasonInternal.purchase:
1581 template_name = "order_confirmation"
1582 subject_template = "Your {description} order confirmation"
1583 url_path_template = "/{organization}/portal"
1584 url_params = {
1585 "customer_session_token": "{token}",
1586 "id": "{order}",
1587 "email": "{email}",
1588 }
1589 case OrderBillingReasonInternal.subscription_create:
1590 template_name = "subscription_confirmation"
1591 subject_template = "Your {description} subscription"
1592 url_path_template = "/{organization}/portal"
1593 url_params = {
1594 "customer_session_token": "{token}",
1595 "id": "{subscription}",
1596 "email": "{email}",
1597 }
1598 case (
1599 OrderBillingReasonInternal.subscription_cycle
1600 | OrderBillingReasonInternal.subscription_cycle_after_trial
1601 ):
1602 template_name = "subscription_cycled"
1603 subject_template = "Your {description} subscription has been renewed"
1604 url_path_template = "/{organization}/portal"
1605 url_params = {
1606 "customer_session_token": "{token}",
1607 "id": "{subscription}",
1608 "email": "{email}",
1609 }
1610 case OrderBillingReasonInternal.subscription_update:
1611 template_name = "subscription_updated"
1612 subject_template = "Your subscription has changed to {description}"
1613 url_path_template = "/{organization}/portal"
1614 url_params = {
1615 "customer_session_token": "{token}",
1616 "id": "{subscription}",
1617 "email": "{email}",
1618 }
1620 if not organization.customer_email_settings[template_name]:
1621 return
1623 product = order.product
1624 customer = order.customer
1625 subscription = order.subscription
1626 token, _ = await customer_session_service.create_customer_session(
1627 session, customer
1628 )
1630 # Build query parameters with proper URL encoding
1631 params = {
1632 key: value.format(
1633 token=token,
1634 order=order.id,
1635 subscription=subscription.id if subscription else "",
1636 email=customer.email,
1637 )
1638 for key, value in url_params.items()
1639 }
1640 query_string = urlencode(params)
1641 url_path = url_path_template.format(organization=organization.slug)
1642 url = settings.generate_frontend_url(f"{url_path}?{query_string}")
1643 subject = subject_template.format(description=order.description)
1644 email = EmailAdapter.validate_python(
1645 {
1646 "template": template_name,
1647 "props": {
1648 "email": customer.email,
1649 "organization": organization,
1650 "product": product,
1651 "order": order,
1652 "subscription": subscription,
1653 "url": url,
1654 },
1655 }
1656 )
1658 # Generate invoice to attach to the email
1659 invoice_path: str | None = None
1660 if invoice_path is None:
1661 if order.billing_name is None or order.billing_address is None:
1662 log.warning(
1663 "Cannot generate invoice, missing billing info", order_id=order.id
1664 )
1665 else:
1666 order = await self.generate_invoice(session, order)
1667 invoice_path = order.invoice_path
1669 attachments: list[Attachment] = []
1670 if invoice_path is not None:
1671 invoice = await self.get_order_invoice(order)
1672 attachments = [
1673 {"remote_url": invoice.url, "filename": order.invoice_filename}
1674 ]
1676 body = render_email_template(email)
1677 enqueue_email(
1678 **organization.email_from_reply,
1679 to_email_addr=customer.email,
1680 subject=subject,
1681 html_content=body,
1682 attachments=attachments,
1683 )
1685 async def update_product_benefits_grants( 1a
1686 self, session: AsyncSession, product: Product
1687 ) -> None:
1688 statement = select(Order).where(
1689 Order.product_id == product.id,
1690 Order.deleted_at.is_(None),
1691 Order.subscription_id.is_(None),
1692 )
1693 orders = await session.stream_scalars(
1694 statement,
1695 execution_options={"yield_per": settings.DATABASE_STREAM_YIELD_PER},
1696 )
1697 async for order in orders:
1698 # Skip seat-based orders - benefits are granted when seats are claimed
1699 if order.seats is not None:
1700 continue
1702 enqueue_job(
1703 "benefit.enqueue_benefits_grants",
1704 task="grant",
1705 customer_id=order.customer_id,
1706 product_id=product.id,
1707 order_id=order.id,
1708 )
1710 async def update_refunds( 1a
1711 self,
1712 session: AsyncSession,
1713 order: Order,
1714 *,
1715 refunded_amount: int,
1716 refunded_tax_amount: int,
1717 ) -> Order:
1718 order.update_refunds(refunded_amount, refunded_tax_amount=refunded_tax_amount)
1719 session.add(order)
1720 return order
1722 async def create_order_balance( 1a
1723 self, session: AsyncSession, order: Order, charge_id: str
1724 ) -> None:
1725 organization = order.organization
1726 account_repository = AccountRepository.from_session(session)
1727 account = await account_repository.get_by_organization(organization.id)
1729 # Retrieve the payment transaction and link it to the order
1730 payment_transaction = await balance_transaction_service.get_by(
1731 session, type=TransactionType.payment, charge_id=charge_id
1732 )
1733 if payment_transaction is None:
1734 raise PaymentTransactionForChargeDoesNotExist(charge_id)
1736 # Make sure to take the amount from the payment transaction and not the order
1737 # Orders invoices may apply customer balances which won't reflect the actual payment amount
1738 transfer_amount = payment_transaction.amount
1740 payment_transaction.order = order
1741 payment_transaction.payment_customer = order.customer
1742 session.add(payment_transaction)
1744 # Prepare an held balance
1745 # It'll be used if the account is not created yet
1746 held_balance = HeldBalance(
1747 amount=transfer_amount, order=order, payment_transaction=payment_transaction
1748 )
1750 # No account, create the held balance
1751 if account is None:
1752 held_balance.organization = organization
1754 # Sanity check: make sure we didn't already create a held balance for this order
1755 existing_held_balance = await held_balance_service.get_by(
1756 session,
1757 payment_transaction_id=payment_transaction.id,
1758 organization_id=organization.id,
1759 )
1760 if existing_held_balance is not None:
1761 raise AlreadyBalancedOrder(order, payment_transaction)
1763 await held_balance_service.create(session, held_balance=held_balance)
1765 return
1767 # Sanity check: make sure we didn't already create a balance for this order
1768 existing_balance_transaction = await balance_transaction_service.get_by(
1769 session,
1770 type=TransactionType.balance,
1771 payment_transaction_id=payment_transaction.id,
1772 account_id=account.id,
1773 )
1774 if existing_balance_transaction is not None:
1775 raise AlreadyBalancedOrder(order, payment_transaction)
1777 # Account created, create the balance immediately
1778 balance_transactions = (
1779 await balance_transaction_service.create_balance_from_charge(
1780 session,
1781 source_account=None,
1782 destination_account=account,
1783 charge_id=charge_id,
1784 amount=transfer_amount,
1785 order=order,
1786 )
1787 )
1788 platform_fee_transactions = (
1789 await platform_fee_transaction_service.create_fees_reversal_balances(
1790 session, balance_transactions=balance_transactions
1791 )
1792 )
1793 order.platform_fee_amount = sum(
1794 incoming.amount for _, incoming in platform_fee_transactions
1795 )
1796 session.add(order)
1798 async def send_webhook( 1a
1799 self,
1800 session: AsyncSession,
1801 order: Order,
1802 event_type: Literal[
1803 WebhookEventType.order_created,
1804 WebhookEventType.order_updated,
1805 WebhookEventType.order_paid,
1806 ],
1807 ) -> None:
1808 await session.refresh(order.customer, {"organization"})
1809 if order.product is not None:
1810 await session.refresh(order.product, {"prices"})
1812 # Refresh order items with their product_price.product relationship loaded
1813 # This is needed for webhook serialization which accesses `legacy_product_price.product`
1814 for item in order.items:
1815 if item.product_price:
1816 await session.refresh(item.product_price, {"product"})
1818 organization = order.organization
1819 await webhook_service.send(session, organization, event_type, order)
1821 async def _on_order_created(self, session: AsyncSession, order: Order) -> None: 1a
1822 enqueue_job("order.confirmation_email", order.id)
1823 await self.send_webhook(session, order, WebhookEventType.order_created)
1825 if order.paid:
1826 await self._on_order_updated(
1827 session,
1828 order,
1829 OrderStatus.pending, # Pretend the previous status was pending to trigger the paid event
1830 )
1832 # Notify checkout channel that an order has been created from it
1833 if order.checkout:
1834 await publish_checkout_event(
1835 order.checkout.client_secret, CheckoutEvent.order_created
1836 )
1838 async def _on_order_updated( 1a
1839 self, session: AsyncSession, order: Order, previous_status: OrderStatus
1840 ) -> None:
1841 await self.send_webhook(session, order, WebhookEventType.order_updated)
1843 became_paid = (
1844 order.status == OrderStatus.paid and previous_status != OrderStatus.paid
1845 )
1846 if became_paid:
1847 await self._on_order_paid(session, order)
1849 async def _on_order_paid(self, session: AsyncSession, order: Order) -> None: 1a
1850 assert order.paid
1852 await self.send_webhook(session, order, WebhookEventType.order_paid)
1854 await event_service.create_event(
1855 session,
1856 build_system_event(
1857 SystemEvent.order_paid,
1858 customer=order.customer,
1859 organization=order.organization,
1860 metadata=OrderPaidMetadata(
1861 order_id=str(order.id),
1862 amount=order.total_amount,
1863 currency=order.currency,
1864 ),
1865 ),
1866 )
1868 if order.subscription_id is not None and order.billing_reason in (
1869 OrderBillingReasonInternal.subscription_cycle,
1870 OrderBillingReasonInternal.subscription_cycle_after_trial,
1871 ):
1872 enqueue_job(
1873 "benefit.enqueue_benefit_grant_cycles",
1874 subscription_id=order.subscription_id,
1875 )
1877 async def handle_payment_failure( 1a
1878 self, session: AsyncSession, order: Order
1879 ) -> Order:
1880 """Handle payment failure for an order, initiating dunning if necessary."""
1881 # Don't process payment failure if the order is already paid
1882 if order.status == OrderStatus.paid:
1883 log.warning(
1884 "Ignoring payment failure for already paid order",
1885 order_id=order.id,
1886 )
1887 return order
1889 # Clear payment lock on failure
1890 if order.payment_lock_acquired_at is not None:
1891 log.info(
1892 "Clearing payment lock on order due to payment failure",
1893 order_id=order.id,
1894 )
1895 repository = OrderRepository.from_session(session)
1896 order = await repository.release_payment_lock(order)
1898 if order.subscription is None:
1899 return order
1901 if order.subscription.stripe_subscription_id is not None:
1902 # If the subscription is managed by Stripe, we don't handle dunning. Stripe will handle it.
1903 return order
1905 if order.next_payment_attempt_at is None:
1906 return await self._handle_first_dunning_attempt(session, order)
1908 return await self._handle_consecutive_dunning_attempts(session, order)
1910 async def _handle_first_dunning_attempt( 1a
1911 self, session: AsyncSession, order: Order
1912 ) -> Order:
1913 """Handle the first dunning attempt for an order, setting the next payment
1914 attempt date and marking the subscription as past due.
1915 """
1917 first_retry_date = utc_now() + settings.DUNNING_RETRY_INTERVALS[0]
1919 repository = OrderRepository.from_session(session)
1920 order = await repository.update(
1921 order, update_dict={"next_payment_attempt_at": first_retry_date}
1922 )
1924 assert order.subscription is not None
1925 await subscription_service.mark_past_due(session, order.subscription)
1927 return order
1929 async def _handle_consecutive_dunning_attempts( 1a
1930 self, session: AsyncSession, order: Order
1931 ) -> Order:
1932 """Handle consecutive dunning attempts for an order."""
1933 payment_repository = PaymentRepository.from_session(session)
1934 failed_attempts = await payment_repository.count_failed_payments_for_order(
1935 order.id
1936 )
1938 repository = OrderRepository.from_session(session)
1940 if failed_attempts >= len(settings.DUNNING_RETRY_INTERVALS):
1941 # No more retries, mark subscription as unpaid and clear retry date
1942 order = await repository.update(
1943 order, update_dict={"next_payment_attempt_at": None}
1944 )
1946 subscription = order.subscription
1947 if subscription is not None and subscription.can_cancel(immediately=True):
1948 await subscription_service.revoke(session, subscription)
1950 return order
1952 # Schedule next retry using the appropriate interval
1953 next_interval = settings.DUNNING_RETRY_INTERVALS[failed_attempts]
1954 next_retry_date = utc_now() + next_interval
1956 order = await repository.update(
1957 order, update_dict={"next_payment_attempt_at": next_retry_date}
1958 )
1960 # Re-enqueue benefit revocation to check if grace period has expired
1961 subscription = order.subscription
1962 if subscription is not None:
1963 await subscription_service.enqueue_benefits_grants(session, subscription)
1965 return order
1967 async def process_dunning_order(self, session: AsyncSession, order: Order) -> Order: 1a
1968 """Process a single order due for dunning payment retry."""
1969 if order.subscription is None:
1970 log.warning(
1971 "Order has no subscription, skipping dunning",
1972 order_id=order.id,
1973 )
1974 return order
1976 if order.subscription.status == SubscriptionStatus.canceled:
1977 log.info(
1978 "Order subscription is cancelled, removing order from dunning process",
1979 order_id=order.id,
1980 subscription_id=order.subscription.id,
1981 )
1983 repository = OrderRepository.from_session(session)
1984 return await repository.update(
1985 order, update_dict={"next_payment_attempt_at": None}
1986 )
1988 if order.subscription.payment_method_id is None:
1989 log.warning(
1990 "Order subscription has no payment method, skipping dunning",
1991 order_id=order.id,
1992 subscription_id=order.subscription.id,
1993 )
1994 return order
1996 log.info(
1997 "Processing dunning order",
1998 order_id=order.id,
1999 subscription_id=order.subscription.id,
2000 )
2002 # Enqueue a payment retry for this order
2003 enqueue_job(
2004 "order.trigger_payment",
2005 order_id=order.id,
2006 payment_method_id=order.subscription.payment_method_id,
2007 )
2009 return order
2012order = OrderService() 1a