Coverage for polar/order/service.py: 14%

812 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import uuid 1a

2from collections.abc import AsyncIterator, Sequence 1a

3from contextlib import asynccontextmanager 1a

4from datetime import UTC, datetime 1a

5from typing import Any, Literal 1a

6from urllib.parse import urlencode 1a

7 

8import stripe as stripe_lib 1a

9import structlog 1a

10from sqlalchemy import select 1a

11from sqlalchemy.orm import contains_eager, joinedload 1a

12 

13from polar.account.repository import AccountRepository 1a

14from polar.auth.models import AuthSubject 1a

15from polar.billing_entry.service import billing_entry as billing_entry_service 1a

16from polar.checkout.eventstream import CheckoutEvent, publish_checkout_event 1a

17from polar.checkout.guard import has_product_checkout 1a

18from polar.checkout.repository import CheckoutRepository 1a

19from polar.config import settings 1a

20from polar.customer.repository import CustomerRepository 1a

21from polar.customer_portal.schemas.order import ( 1a

22 CustomerOrderPaymentConfirmation, 

23 CustomerOrderUpdate, 

24) 

25from polar.customer_session.service import customer_session as customer_session_service 1a

26from polar.discount.service import discount as discount_service 1a

27from polar.email.react import render_email_template 1a

28from polar.email.schemas import ( 1a

29 EmailAdapter, 

30) 

31from polar.email.sender import Attachment, enqueue_email 1a

32from polar.enums import PaymentProcessor 1a

33from polar.event.service import event as event_service 1a

34from polar.event.system import OrderPaidMetadata, SystemEvent, build_system_event 1a

35from polar.eventstream.service import publish as eventstream_publish 1a

36from polar.exceptions import PolarError 1a

37from polar.held_balance.service import held_balance as held_balance_service 1a

38from polar.integrations.stripe.schemas import ProductType 1a

39from polar.integrations.stripe.service import stripe as stripe_service 1a

40from polar.integrations.stripe.utils import get_expandable_id 1a

41from polar.invoice.service import invoice as invoice_service 1a

42from polar.kit.address import Address, AddressInput 1a

43from polar.kit.db.postgres import AsyncReadSession, AsyncSession 1a

44from polar.kit.metadata import MetadataQuery, apply_metadata_clause 1a

45from polar.kit.pagination import PaginationParams 1a

46from polar.kit.sorting import Sorting 1a

47from polar.kit.tax import ( 1a

48 TaxabilityReason, 

49 TaxRate, 

50 calculate_tax, 

51 from_stripe_tax_rate, 

52 from_stripe_tax_rate_details, 

53) 

54from polar.kit.utils import utc_now 1a

55from polar.logging import Logger 1a

56from polar.models import ( 1a

57 Checkout, 

58 Customer, 

59 Discount, 

60 Order, 

61 OrderItem, 

62 Organization, 

63 Payment, 

64 PaymentMethod, 

65 Product, 

66 ProductPrice, 

67 Subscription, 

68 Transaction, 

69 User, 

70 WalletTransaction, 

71) 

72from polar.models.held_balance import HeldBalance 1a

73from polar.models.order import ( 1a

74 OrderBillingReason, 

75 OrderBillingReasonInternal, 

76 OrderStatus, 

77) 

78from polar.models.product import ProductBillingType 1a

79from polar.models.subscription import SubscriptionStatus 1a

80from polar.models.subscription_meter import SubscriptionMeter 1a

81from polar.models.transaction import TransactionType 1a

82from polar.models.webhook_endpoint import WebhookEventType 1a

83from polar.notifications.notification import ( 1a

84 MaintainerNewProductSaleNotificationPayload, 

85 NotificationType, 

86) 

87from polar.notifications.service import PartialNotification 1a

88from polar.notifications.service import notifications as notifications_service 1a

89from polar.organization.repository import OrganizationRepository 1a

90from polar.organization.service import organization as organization_service 1a

91from polar.payment.repository import PaymentRepository 1a

92from polar.payment_method.repository import PaymentMethodRepository 1a

93from polar.product.guard import is_custom_price, is_seat_price, is_static_price 1a

94from polar.product.repository import ProductPriceRepository 1a

95from polar.subscription.repository import SubscriptionRepository 1a

96from polar.subscription.service import subscription as subscription_service 1a

97from polar.transaction.service.balance import PaymentTransactionForChargeDoesNotExist 1a

98from polar.transaction.service.balance import ( 1a

99 balance_transaction as balance_transaction_service, 

100) 

101from polar.transaction.service.platform_fee import ( 1a

102 platform_fee_transaction as platform_fee_transaction_service, 

103) 

104from polar.wallet.repository import WalletTransactionRepository 1a

105from polar.wallet.service import wallet as wallet_service 1a

106from polar.webhook.service import webhook as webhook_service 1a

107from polar.worker import enqueue_job 1a

108 

109from .repository import OrderRepository 1a

110from .schemas import OrderInvoice, OrderUpdate 1a

111from .sorting import OrderSortProperty 1a

112 

113log: Logger = structlog.get_logger() 1a

114 

115 

116class OrderError(PolarError): ... 1a

117 

118 

119class RecurringProduct(OrderError): 1a

120 def __init__(self, checkout: Checkout, product: Product) -> None: 1a

121 self.checkout = checkout 

122 self.product = product 

123 message = ( 

124 f"Checkout {checkout.id} is for product {product.id}, " 

125 "which is a recurring product." 

126 ) 

127 super().__init__(message) 

128 

129 

130class NotRecurringProduct(OrderError): 1a

131 def __init__(self, checkout: Checkout, product: Product) -> None: 1a

132 self.checkout = checkout 

133 self.product = product 

134 message = ( 

135 f"Checkout {checkout.id} is for product {product.id}, " 

136 "which is not a recurring product." 

137 ) 

138 super().__init__(message) 

139 

140 

141class MissingCheckoutCustomer(OrderError): 1a

142 def __init__(self, checkout: Checkout) -> None: 1a

143 self.checkout = checkout 

144 message = f"Checkout {checkout.id} is missing a customer." 

145 super().__init__(message) 

146 

147 

148class MissingStripeCustomerID(OrderError): 1a

149 def __init__(self, checkout: Checkout, customer: Customer) -> None: 1a

150 self.checkout = checkout 

151 self.customer = customer 

152 message = ( 

153 f"Checkout {checkout.id}'s customer {customer.id} " 

154 "is missing a Stripe customer ID." 

155 ) 

156 super().__init__(message) 

157 

158 

159class NotAnOrderInvoice(OrderError): 1a

160 def __init__(self, invoice_id: str) -> None: 1a

161 self.invoice_id = invoice_id 

162 message = ( 

163 f"Received invoice {invoice_id} from Stripe, but it is not an order." 

164 " Check if it's an issue pledge." 

165 ) 

166 super().__init__(message) 

167 

168 

169class NotASubscriptionInvoice(OrderError): 1a

170 def __init__(self, invoice_id: str) -> None: 1a

171 self.invoice_id = invoice_id 

172 message = ( 

173 f"Received invoice {invoice_id} from Stripe, but it it not linked to a subscription." 

174 " One-time purchases invoices are handled directly upon creation." 

175 ) 

176 super().__init__(message) 

177 

178 

179class OrderDoesNotExist(OrderError): 1a

180 def __init__(self, invoice_id: str) -> None: 1a

181 self.invoice_id = invoice_id 

182 message = ( 

183 f"Received invoice {invoice_id} from Stripe, " 

184 "but no associated Order exists." 

185 ) 

186 super().__init__(message) 

187 

188 

189class DiscountDoesNotExist(OrderError): 1a

190 def __init__(self, invoice_id: str, coupon_id: str) -> None: 1a

191 self.invoice_id = invoice_id 

192 self.coupon_id = coupon_id 

193 message = ( 

194 f"Received invoice {invoice_id} from Stripe with coupon {coupon_id}, " 

195 f"but no associated Discount exists." 

196 ) 

197 super().__init__(message) 

198 

199 

200class CheckoutDoesNotExist(OrderError): 1a

201 def __init__(self, invoice_id: str, checkout_id: str) -> None: 1a

202 self.invoice_id = invoice_id 

203 self.checkout_id = checkout_id 

204 message = ( 

205 f"Received invoice {invoice_id} from Stripe with checkout {checkout_id}, " 

206 f"but no associated Checkout exists." 

207 ) 

208 super().__init__(message) 

209 

210 

211class SubscriptionDoesNotExist(OrderError): 1a

212 def __init__(self, invoice_id: str, stripe_subscription_id: str) -> None: 1a

213 self.invoice_id = invoice_id 

214 self.stripe_subscription_id = stripe_subscription_id 

215 message = ( 

216 f"Received invoice {invoice_id} from Stripe " 

217 f"for subscription {stripe_subscription_id}, " 

218 f"but no associated Subscription exists." 

219 ) 

220 super().__init__(message) 

221 

222 

223class AlreadyBalancedOrder(OrderError): 1a

224 def __init__(self, order: Order, payment_transaction: Transaction) -> None: 1a

225 self.order = order 

226 self.payment_transaction = payment_transaction 

227 message = ( 

228 f"The order {order.id} with payment {payment_transaction.id} " 

229 "has already been balanced." 

230 ) 

231 super().__init__(message) 

232 

233 

234class NotPaidOrder(OrderError): 1a

235 def __init__(self, order: Order) -> None: 1a

236 self.order = order 

237 message = f"Order {order.id} is not paid, so an invoice cannot be generated." 

238 super().__init__(message, 422) 

239 

240 

241class MissingInvoiceBillingDetails(OrderError): 1a

242 def __init__(self, order: Order) -> None: 1a

243 self.order = order 

244 message = ( 

245 "Billing name and address are required " 

246 "to generate an invoice for this order." 

247 ) 

248 super().__init__(message, 422) 

249 

250 

251class InvoiceDoesNotExist(OrderError): 1a

252 def __init__(self, order: Order) -> None: 1a

253 self.order = order 

254 message = f"No invoice exists for order {order.id}." 

255 super().__init__(message, 404) 

256 

257 

258class OrderNotEligibleForRetry(OrderError): 1a

259 def __init__(self, order: Order) -> None: 1a

260 self.order = order 

261 message = f"Order {order.id} is not eligible for payment retry." 

262 super().__init__(message, 422) 

263 

264 

265class NoPendingBillingEntries(OrderError): 1a

266 def __init__(self, subscription: Subscription) -> None: 1a

267 self.subscription = subscription 

268 message = ( 

269 f"No pending billing entries found for subscription {subscription.id}." 

270 ) 

271 super().__init__(message) 

272 

273 

274class OrderNotPending(OrderError): 1a

275 def __init__(self, order: Order) -> None: 1a

276 self.order = order 

277 message = f"Order {order.id} is not pending" 

278 super().__init__(message) 

279 

280 

281class PaymentAlreadyInProgress(OrderError): 1a

282 def __init__(self, order: Order) -> None: 1a

283 self.order = order 

284 message = f"Payment for order {order.id} is already in progress" 

285 super().__init__(message, 409) 

286 

287 

288class CardPaymentFailed(OrderError): 1a

289 """Exception for card-related payment failures that should not be retried.""" 

290 

291 def __init__(self, order: Order, stripe_error: stripe_lib.CardError) -> None: 1a

292 self.order = order 

293 self.stripe_error = stripe_error 

294 message = f"Card payment failed for order {order.id}: {stripe_error.user_message or stripe_error.code}" 

295 super().__init__(message, 402) 

296 

297 

298class InvalidPaymentProcessor(OrderError): 1a

299 def __init__(self, payment_processor: PaymentProcessor) -> None: 1a

300 self.payment_processor = payment_processor 

301 message = f"Invalid payment processor: {payment_processor}" 

302 super().__init__(message, 422) 

303 

304 

305class PaymentRetryValidationError(OrderError): 1a

306 def __init__(self, message: str) -> None: 1a

307 super().__init__(message, 422) 

308 

309 

310class SubscriptionNotTrialing(OrderError): 1a

311 def __init__(self, subscription: Subscription) -> None: 1a

312 self.subscription = subscription 

313 message = f"Subscription {subscription.id} is not in trialing status." 

314 super().__init__(message) 

315 

316 

317def _is_empty_customer_address(customer_address: dict[str, Any] | None) -> bool: 1a

318 return customer_address is None or customer_address["country"] is None 

319 

320 

321class OrderService: 1a

322 @asynccontextmanager 1a

323 async def acquire_payment_lock( 1a

324 self, session: AsyncSession, order: Order, *, release_on_success: bool = True 

325 ) -> AsyncIterator[None]: 

326 """ 

327 Context manager to acquire and release a payment lock for an order. 

328 """ 

329 

330 repository = OrderRepository.from_session(session) 

331 

332 # Try to acquire the lock 

333 lock_acquired = await repository.acquire_payment_lock_by_id(order.id) 

334 if not lock_acquired: 

335 raise PaymentAlreadyInProgress(order) 

336 

337 try: 

338 yield 

339 except Exception: 

340 await repository.release_payment_lock(order, flush=True) 

341 raise 

342 else: 

343 if release_on_success: 

344 await repository.release_payment_lock(order, flush=True) 

345 

346 async def list( 1a

347 self, 

348 session: AsyncReadSession, 

349 auth_subject: AuthSubject[User | Organization], 

350 *, 

351 organization_id: Sequence[uuid.UUID] | None = None, 

352 product_id: Sequence[uuid.UUID] | None = None, 

353 product_billing_type: Sequence[ProductBillingType] | None = None, 

354 discount_id: Sequence[uuid.UUID] | None = None, 

355 customer_id: Sequence[uuid.UUID] | None = None, 

356 checkout_id: Sequence[uuid.UUID] | None = None, 

357 metadata: MetadataQuery | None = None, 

358 pagination: PaginationParams, 

359 sorting: list[Sorting[OrderSortProperty]] = [ 

360 (OrderSortProperty.created_at, True) 

361 ], 

362 ) -> tuple[Sequence[Order], int]: 

363 repository = OrderRepository.from_session(session) 

364 statement = repository.get_readable_statement(auth_subject) 

365 

366 statement = ( 

367 statement.join(Order.discount, isouter=True) 

368 .join(Order.product, isouter=True) 

369 .options( 

370 *repository.get_eager_options( 

371 customer_load=contains_eager(Order.customer), 

372 product_load=contains_eager(Order.product), 

373 discount_load=contains_eager(Order.discount), 

374 ) 

375 ) 

376 ) 

377 

378 if organization_id is not None: 

379 statement = statement.where(Customer.organization_id.in_(organization_id)) 

380 

381 if product_id is not None: 

382 statement = statement.where(Order.product_id.in_(product_id)) 

383 

384 if product_billing_type is not None: 

385 statement = statement.where(Product.billing_type.in_(product_billing_type)) 

386 

387 if discount_id is not None: 

388 statement = statement.where(Order.discount_id.in_(discount_id)) 

389 

390 # TODO: 

391 # Once we add `external_customer_id` be sure to filter for non-deleted. 

392 # Since it could be shared across soft deleted records whereas the unique ID cannot. 

393 if customer_id is not None: 

394 statement = statement.where(Order.customer_id.in_(customer_id)) 

395 

396 if checkout_id is not None: 

397 statement = statement.where(Order.checkout_id.in_(checkout_id)) 

398 

399 if metadata is not None: 

400 statement = apply_metadata_clause(Order, statement, metadata) 

401 

402 statement = repository.apply_sorting(statement, sorting) 

403 

404 return await repository.paginate( 

405 statement, limit=pagination.limit, page=pagination.page 

406 ) 

407 

408 async def get( 1a

409 self, 

410 session: AsyncReadSession, 

411 auth_subject: AuthSubject[User | Organization], 

412 id: uuid.UUID, 

413 ) -> Order | None: 

414 repository = OrderRepository.from_session(session) 

415 statement = ( 

416 repository.get_readable_statement(auth_subject) 

417 .options( 

418 *repository.get_eager_options( 

419 customer_load=contains_eager(Order.customer), 

420 product_load=joinedload(Order.product), 

421 ) 

422 ) 

423 .where(Order.id == id) 

424 ) 

425 return await repository.get_one_or_none(statement) 

426 

427 async def update( 1a

428 self, 

429 session: AsyncSession, 

430 order: Order, 

431 order_update: OrderUpdate | CustomerOrderUpdate, 

432 ) -> Order: 

433 repository = OrderRepository.from_session(session) 

434 order = await repository.update( 

435 order, update_dict=order_update.model_dump(exclude_unset=True) 

436 ) 

437 

438 await self.send_webhook(session, order, WebhookEventType.order_updated) 

439 

440 return order 

441 

442 async def trigger_invoice_generation( 1a

443 self, session: AsyncSession, order: Order 

444 ) -> None: 

445 if not order.paid: 

446 raise NotPaidOrder(order) 

447 

448 if order.billing_name is None or order.billing_address is None: 

449 raise MissingInvoiceBillingDetails(order) 

450 

451 enqueue_job("order.invoice", order_id=order.id) 

452 

453 async def generate_invoice(self, session: AsyncSession, order: Order) -> Order: 1a

454 invoice_path = await invoice_service.create_order_invoice(order) 

455 repository = OrderRepository.from_session(session) 

456 order = await repository.update( 

457 order, update_dict={"invoice_path": invoice_path} 

458 ) 

459 

460 await eventstream_publish( 

461 "order.invoice_generated", 

462 {"order_id": order.id}, 

463 customer_id=order.customer_id, 

464 organization_id=order.organization.id, 

465 ) 

466 

467 await self.send_webhook(session, order, WebhookEventType.order_updated) 

468 

469 return order 

470 

471 async def get_order_invoice(self, order: Order) -> OrderInvoice: 1a

472 if order.invoice_path is None: 

473 raise InvoiceDoesNotExist(order) 

474 

475 url, _ = await invoice_service.get_order_invoice_url(order) 

476 return OrderInvoice(url=url) 

477 

478 async def create_from_checkout_one_time( 1a

479 self, session: AsyncSession, checkout: Checkout, payment: Payment | None = None 

480 ) -> Order: 

481 assert has_product_checkout(checkout) 

482 

483 product = checkout.product 

484 if product.is_recurring: 

485 raise RecurringProduct(checkout, product) 

486 

487 order = await self._create_order_from_checkout( 

488 session, checkout, OrderBillingReasonInternal.purchase, payment 

489 ) 

490 

491 # For seat-based orders, benefits are granted when seats are claimed 

492 # For non-seat orders, grant benefits immediately 

493 prices = checkout.prices[product.id] 

494 has_seat_price = any(is_seat_price(price) for price in prices) 

495 if not has_seat_price: 

496 enqueue_job( 

497 "benefit.enqueue_benefits_grants", 

498 task="grant", 

499 customer_id=order.customer.id, 

500 product_id=product.id, 

501 order_id=order.id, 

502 ) 

503 

504 # Trigger notifications 

505 organization = checkout.organization 

506 await self.send_admin_notification(session, organization, order) 

507 

508 return order 

509 

510 async def create_from_checkout_subscription( 1a

511 self, 

512 session: AsyncSession, 

513 checkout: Checkout, 

514 subscription: Subscription, 

515 billing_reason: Literal[ 

516 OrderBillingReasonInternal.subscription_create, 

517 OrderBillingReasonInternal.subscription_update, 

518 ], 

519 payment: Payment | None = None, 

520 ) -> Order: 

521 assert has_product_checkout(checkout) 

522 

523 product = checkout.product 

524 if not product.is_recurring: 

525 raise NotRecurringProduct(checkout, product) 

526 

527 if subscription.trialing: 

528 return await self.create_trial_order( 

529 session, subscription, billing_reason, checkout 

530 ) 

531 

532 return await self._create_order_from_checkout( 

533 session, checkout, billing_reason, payment, subscription 

534 ) 

535 

536 async def _create_order_from_checkout( 1a

537 self, 

538 session: AsyncSession, 

539 checkout: Checkout, 

540 billing_reason: OrderBillingReasonInternal, 

541 payment: Payment | None = None, 

542 subscription: Subscription | None = None, 

543 ) -> Order: 

544 customer = checkout.customer 

545 if customer is None: 

546 raise MissingCheckoutCustomer(checkout) 

547 

548 items: list[OrderItem] = [] 

549 if has_product_checkout(checkout): 

550 prices = checkout.prices[checkout.product_id] 

551 for price in prices: 

552 # Don't create an item for metered prices 

553 if not is_static_price(price): 

554 continue 

555 if is_custom_price(price): 

556 item = OrderItem.from_price(price, 0, checkout.amount) 

557 else: 

558 item = OrderItem.from_price(price, 0, seats=checkout.seats) 

559 items.append(item) 

560 

561 discount_amount = checkout.discount_amount 

562 

563 # Retrieve tax data 

564 tax_amount = checkout.tax_amount or 0 

565 taxability_reason = None 

566 tax_rate: TaxRate | None = None 

567 tax_id = customer.tax_id 

568 if checkout.tax_processor_id is not None: 

569 calculation = await stripe_service.get_tax_calculation( 

570 checkout.tax_processor_id 

571 ) 

572 assert tax_amount == calculation.tax_amount_exclusive 

573 assert len(calculation.tax_breakdown) > 0 

574 if len(calculation.tax_breakdown) > 1: 

575 log.warning( 

576 "Multiple tax breakdowns found for checkout", 

577 checkout_id=checkout.id, 

578 calculation_id=calculation.id, 

579 ) 

580 breakdown = calculation.tax_breakdown[0] 

581 taxability_reason = TaxabilityReason.from_stripe( 

582 breakdown.taxability_reason, tax_amount 

583 ) 

584 tax_rate = from_stripe_tax_rate_details(breakdown.tax_rate_details) 

585 

586 organization = checkout.organization 

587 invoice_number = await organization_service.get_next_invoice_number( 

588 session, organization, customer 

589 ) 

590 

591 repository = OrderRepository.from_session(session) 

592 order = await repository.create( 

593 Order( 

594 status=OrderStatus.paid, 

595 subtotal_amount=checkout.amount, 

596 discount_amount=discount_amount, 

597 tax_amount=tax_amount, 

598 currency=checkout.currency, 

599 billing_reason=billing_reason, 

600 billing_name=customer.billing_name, 

601 billing_address=customer.billing_address, 

602 taxability_reason=taxability_reason, 

603 tax_id=tax_id, 

604 tax_rate=tax_rate, 

605 invoice_number=invoice_number, 

606 customer=customer, 

607 product=checkout.product, 

608 discount=checkout.discount, 

609 subscription=subscription, 

610 checkout=checkout, 

611 user_metadata=checkout.user_metadata, 

612 custom_field_data=checkout.custom_field_data, 

613 items=items, 

614 seats=checkout.seats, 

615 ), 

616 flush=True, 

617 ) 

618 

619 # Link payment and balance transaction to the order 

620 if payment is not None: 

621 payment_repository = PaymentRepository.from_session(session) 

622 assert payment.amount == order.total_amount 

623 await payment_repository.update(payment, update_dict={"order": order}) 

624 enqueue_job( 

625 "order.balance", order_id=order.id, charge_id=payment.processor_id 

626 ) 

627 

628 # Record tax transaction 

629 if checkout.tax_processor_id is not None: 

630 transaction = await stripe_service.create_tax_transaction( 

631 checkout.tax_processor_id, str(order.id) 

632 ) 

633 await repository.update( 

634 order, update_dict={"tax_transaction_processor_id": transaction.id} 

635 ) 

636 

637 await self._on_order_created(session, order) 

638 

639 return order 

640 

641 async def create_subscription_order( 1a

642 self, 

643 session: AsyncSession, 

644 subscription: Subscription, 

645 billing_reason: OrderBillingReasonInternal, 

646 ) -> Order: 

647 items = await billing_entry_service.create_order_items_from_pending( 

648 session, subscription 

649 ) 

650 if len(items) == 0: 

651 raise NoPendingBillingEntries(subscription) 

652 

653 order_id = uuid.uuid4() 

654 customer = subscription.customer 

655 billing_address = customer.billing_address 

656 product = subscription.product 

657 

658 subtotal_amount = sum(item.amount for item in items) 

659 

660 discount = subscription.discount 

661 discount_amount = 0 

662 if discount is not None: 

663 # Discount only applies to cycle and meter items, as prorations 

664 # use "last month's" discount and so this month's discount 

665 # shouldn't apply to those. 

666 discountable_amount = sum( 

667 item.amount for item in items if item.discountable 

668 ) 

669 discount_amount = discount.get_discount_amount(discountable_amount) 

670 

671 # Calculate tax 

672 taxable_amount = subtotal_amount - discount_amount 

673 tax_amount = 0 

674 taxability_reason: TaxabilityReason | None = None 

675 tax_rate: TaxRate | None = None 

676 tax_id = customer.tax_id 

677 tax_calculation_processor_id: str | None = None 

678 

679 if ( 

680 taxable_amount != 0 

681 and product.is_tax_applicable 

682 and billing_address is not None 

683 ): 

684 tax_calculation = await calculate_tax( 

685 order_id, 

686 subscription.currency, 

687 # Stripe doesn't support calculating negative tax amounts 

688 taxable_amount if taxable_amount >= 0 else -taxable_amount, 

689 product.tax_code, 

690 billing_address, 

691 [tax_id] if tax_id is not None else [], 

692 subscription.tax_exempted, 

693 ) 

694 if taxable_amount >= 0: 

695 tax_calculation_processor_id = tax_calculation["processor_id"] 

696 tax_amount = tax_calculation["amount"] 

697 else: 

698 # When the taxable amount is negative it's usually due to a credit proration 

699 # this means we "owe" the customer money -- but we don't pay it back at this 

700 # point. This also means that there's no money transaction going on, and we 

701 # don't have to record the tax transaction either. 

702 tax_calculation_processor_id = None 

703 tax_amount = -tax_calculation["amount"] 

704 

705 taxability_reason = tax_calculation["taxability_reason"] 

706 tax_rate = tax_calculation["tax_rate"] 

707 

708 invoice_number = await organization_service.get_next_invoice_number( 

709 session, subscription.organization, customer 

710 ) 

711 

712 total_amount = subtotal_amount - discount_amount + tax_amount 

713 customer_balance = await wallet_service.get_billing_wallet_balance( 

714 session, customer, subscription.currency 

715 ) 

716 

717 # Calculate balance change and applied amount 

718 if total_amount >= 0: 

719 # Order is a charge: use customer balance if available 

720 balance_change = -min(total_amount, customer_balance) 

721 applied_balance_amount = balance_change 

722 else: 

723 # Order is a credit: always add to balance 

724 balance_change = -total_amount 

725 # Track how much existing debt was cleared 

726 if customer_balance < 0: 

727 applied_balance_amount = min(-total_amount, -customer_balance) 

728 else: 

729 applied_balance_amount = 0 

730 

731 repository = OrderRepository.from_session(session) 

732 order = await repository.create( 

733 Order( 

734 id=order_id, 

735 status=OrderStatus.pending, 

736 subtotal_amount=subtotal_amount, 

737 discount_amount=discount_amount, 

738 tax_amount=tax_amount, 

739 applied_balance_amount=applied_balance_amount, 

740 currency=subscription.currency, 

741 billing_reason=billing_reason, 

742 billing_name=customer.billing_name, 

743 billing_address=billing_address, 

744 taxability_reason=taxability_reason, 

745 tax_id=tax_id, 

746 tax_rate=tax_rate, 

747 tax_calculation_processor_id=tax_calculation_processor_id, 

748 invoice_number=invoice_number, 

749 customer=customer, 

750 product=subscription.product, 

751 discount=discount, 

752 subscription=subscription, 

753 checkout=None, 

754 items=items, 

755 user_metadata=subscription.user_metadata, 

756 custom_field_data=subscription.custom_field_data, 

757 ), 

758 flush=True, 

759 ) 

760 

761 # Impact customer's balance 

762 if balance_change != 0: 

763 await wallet_service.create_balance_transaction( 

764 session, customer, balance_change, subscription.currency, order=order 

765 ) 

766 

767 # Reset the associated meters, if any 

768 if billing_reason in { 

769 OrderBillingReasonInternal.subscription_cycle, 

770 OrderBillingReasonInternal.subscription_cycle_after_trial, 

771 OrderBillingReasonInternal.subscription_update, 

772 }: 

773 await subscription_service.reset_meters(session, subscription) 

774 

775 # If the due amount is less or equal than zero, mark it as paid immediately 

776 if order.due_amount <= 0: 

777 order = await repository.update( 

778 order, update_dict={"status": OrderStatus.paid} 

779 ) 

780 else: 

781 enqueue_job( 

782 "order.trigger_payment", 

783 order_id=order.id, 

784 payment_method_id=subscription.payment_method_id, 

785 ) 

786 

787 await self._on_order_created(session, order) 

788 

789 return order 

790 

791 async def create_trial_order( 1a

792 self, 

793 session: AsyncSession, 

794 subscription: Subscription, 

795 billing_reason: Literal[ 

796 OrderBillingReasonInternal.subscription_create, 

797 OrderBillingReasonInternal.subscription_update, 

798 ], 

799 checkout: Checkout | None = None, 

800 ) -> Order: 

801 if not subscription.trialing: 

802 raise SubscriptionNotTrialing(subscription) 

803 assert subscription.trial_start is not None 

804 assert subscription.trial_end is not None 

805 

806 product = subscription.product 

807 customer = subscription.customer 

808 

809 items: list[OrderItem] = [ 

810 OrderItem.from_trial( 

811 product, subscription.trial_start, subscription.trial_end 

812 ) 

813 ] 

814 

815 organization = subscription.organization 

816 invoice_number = await organization_service.get_next_invoice_number( 

817 session, organization, customer 

818 ) 

819 

820 repository = OrderRepository.from_session(session) 

821 order = await repository.create( 

822 Order( 

823 status=OrderStatus.paid, 

824 subtotal_amount=sum(item.amount for item in items), 

825 discount_amount=0, 

826 tax_amount=0, 

827 currency=subscription.currency, 

828 billing_reason=billing_reason, 

829 billing_name=customer.billing_name, 

830 billing_address=customer.billing_address, 

831 taxability_reason=None, 

832 tax_id=customer.tax_id, 

833 tax_rate=None, 

834 invoice_number=invoice_number, 

835 customer=customer, 

836 product=product, 

837 discount=None, 

838 subscription=subscription, 

839 checkout=checkout, 

840 user_metadata=subscription.user_metadata, 

841 custom_field_data=subscription.custom_field_data, 

842 items=items, 

843 ), 

844 flush=True, 

845 ) 

846 

847 await self._on_order_created(session, order) 

848 

849 return order 

850 

851 async def create_wallet_order( 1a

852 self, 

853 session: AsyncSession, 

854 wallet_transaction: WalletTransaction, 

855 payment: Payment | None, 

856 ) -> Order: 

857 wallet = wallet_transaction.wallet 

858 items: list[OrderItem] = [ 

859 OrderItem.from_wallet(wallet, wallet_transaction.amount) 

860 ] 

861 

862 customer = wallet.customer 

863 billing_address = customer.billing_address 

864 

865 subtotal_amount = sum(item.amount for item in items) 

866 

867 # Retrieve tax data 

868 tax_amount = wallet_transaction.tax_amount or 0 

869 taxability_reason = None 

870 tax_rate: TaxRate | None = None 

871 tax_id = customer.tax_id 

872 if wallet_transaction.tax_calculation_processor_id is not None: 

873 calculation = await stripe_service.get_tax_calculation( 

874 wallet_transaction.tax_calculation_processor_id 

875 ) 

876 assert tax_amount == calculation.tax_amount_exclusive 

877 assert len(calculation.tax_breakdown) > 0 

878 breakdown = calculation.tax_breakdown[0] 

879 taxability_reason = TaxabilityReason.from_stripe( 

880 breakdown.taxability_reason, tax_amount 

881 ) 

882 tax_rate = from_stripe_tax_rate_details(breakdown.tax_rate_details) 

883 

884 invoice_number = await organization_service.get_next_invoice_number( 

885 session, wallet.organization, wallet.customer 

886 ) 

887 

888 repository = OrderRepository.from_session(session) 

889 order = await repository.create( 

890 Order( 

891 status=OrderStatus.paid, 

892 subtotal_amount=subtotal_amount, 

893 discount_amount=0, 

894 tax_amount=tax_amount, 

895 applied_balance_amount=0, 

896 currency=wallet.currency, 

897 billing_reason=OrderBillingReasonInternal.purchase, 

898 billing_name=customer.billing_name, 

899 billing_address=billing_address, 

900 taxability_reason=taxability_reason, 

901 tax_id=tax_id, 

902 tax_rate=tax_rate, 

903 invoice_number=invoice_number, 

904 customer=customer, 

905 items=items, 

906 product=None, 

907 discount=None, 

908 subscription=None, 

909 checkout=None, 

910 ), 

911 flush=True, 

912 ) 

913 

914 # Link payment and balance transaction to the order 

915 if payment is not None: 

916 payment_repository = PaymentRepository.from_session(session) 

917 assert payment.amount == order.total_amount 

918 await payment_repository.update(payment, update_dict={"order": order}) 

919 enqueue_job( 

920 "order.balance", order_id=order.id, charge_id=payment.processor_id 

921 ) 

922 

923 # Link wallet transaction to the order 

924 wallet_transaction_repository = WalletTransactionRepository.from_session( 

925 session 

926 ) 

927 await wallet_transaction_repository.update( 

928 wallet_transaction, update_dict={"order": order} 

929 ) 

930 

931 # Record tax transaction 

932 if wallet_transaction.tax_calculation_processor_id is not None: 

933 transaction = await stripe_service.create_tax_transaction( 

934 wallet_transaction.tax_calculation_processor_id, str(order.id) 

935 ) 

936 await repository.update( 

937 order, update_dict={"tax_transaction_processor_id": transaction.id} 

938 ) 

939 

940 await self._on_order_created(session, order) 

941 

942 return order 

943 

944 async def trigger_payment( 1a

945 self, session: AsyncSession, order: Order, payment_method: PaymentMethod 

946 ) -> None: 

947 if order.status != OrderStatus.pending: 

948 raise OrderNotPending(order) 

949 

950 if order.payment_lock_acquired_at is not None: 

951 log.warn("Payment already in progress", order_id=order.id) 

952 raise PaymentAlreadyInProgress(order) 

953 

954 if ( 

955 payment_method.processor == PaymentProcessor.stripe 

956 and order.due_amount < 50 

957 ): 

958 # Stripe requires a minimum amount of 50 cents, mark it as paid 

959 repository = OrderRepository.from_session(session) 

960 previous_status = order.status 

961 order = await repository.update( 

962 order, update_dict={"status": OrderStatus.paid} 

963 ) 

964 

965 # Add to the customer's balance 

966 await wallet_service.create_balance_transaction( 

967 session, 

968 order.customer, 

969 -order.due_amount, 

970 order.currency, 

971 order=order, 

972 ) 

973 

974 await self._on_order_updated(session, order, previous_status) 

975 return 

976 

977 async with self.acquire_payment_lock(session, order, release_on_success=False): 

978 if payment_method.processor == PaymentProcessor.stripe: 

979 metadata: dict[str, Any] = {"order_id": str(order.id)} 

980 

981 if order.tax_rate is not None: 

982 metadata["tax_amount"] = order.tax_amount 

983 metadata["tax_country"] = order.tax_rate["country"] 

984 metadata["tax_state"] = order.tax_rate["state"] 

985 

986 stripe_customer_id = order.customer.stripe_customer_id 

987 assert stripe_customer_id is not None 

988 

989 try: 

990 await stripe_service.create_payment_intent( 

991 amount=order.due_amount, 

992 currency=order.currency, 

993 payment_method=payment_method.processor_id, 

994 customer=stripe_customer_id, 

995 confirm=True, 

996 off_session=True, 

997 statement_descriptor_suffix=order.statement_descriptor_suffix, 

998 description=f"{order.organization.name}{order.description}", 

999 metadata=metadata, 

1000 ) 

1001 except stripe_lib.CardError as e: 

1002 # Card errors (declines, expired cards, etc.) should not be retried 

1003 # They will be handled by the dunning process 

1004 log.info( 

1005 "Card payment failed", 

1006 order_id=order.id, 

1007 error_code=e.code, 

1008 error_message=e.user_message, 

1009 ) 

1010 raise CardPaymentFailed(order, e) from e 

1011 

1012 async def process_retry_payment( 1a

1013 self, 

1014 session: AsyncSession, 

1015 order: Order, 

1016 confirmation_token_id: str | None, 

1017 payment_processor: PaymentProcessor, 

1018 payment_method_id: uuid.UUID | None = None, 

1019 ) -> CustomerOrderPaymentConfirmation: 

1020 """ 

1021 Process retry payment with direct confirmation (confirm=True). 

1022 Follows checkout flow pattern - creates PaymentIntent and lets webhooks handle everything else. 

1023 """ 

1024 

1025 if order.status != OrderStatus.pending: 

1026 log.warning("Order is not pending", order_id=order.id, status=order.status) 

1027 raise OrderNotEligibleForRetry(order) 

1028 

1029 if order.next_payment_attempt_at is None: 

1030 log.warning("Order is not eligible for retry", order_id=order.id) 

1031 raise OrderNotEligibleForRetry(order) 

1032 

1033 if order.subscription is None: 

1034 log.warning("Order is not a subscription", order_id=order.id) 

1035 raise OrderNotEligibleForRetry(order) 

1036 

1037 if order.payment_lock_acquired_at is not None: 

1038 log.warning( 

1039 "Payment already in progress", 

1040 order_id=order.id, 

1041 lock_acquired_at=order.payment_lock_acquired_at, 

1042 ) 

1043 raise PaymentAlreadyInProgress(order) 

1044 

1045 if payment_processor != PaymentProcessor.stripe: 

1046 log.warning( 

1047 "Invalid payment processor", payment_processor=payment_processor 

1048 ) 

1049 raise OrderNotEligibleForRetry(payment_processor) 

1050 

1051 if confirmation_token_id is None and payment_method_id is None: 

1052 raise PaymentRetryValidationError( 

1053 "Either confirmation_token_id or payment_method_id must be provided" 

1054 ) 

1055 if confirmation_token_id is not None and payment_method_id is not None: 

1056 raise PaymentRetryValidationError( 

1057 "Only one of confirmation_token_id or payment_method_id can be provided" 

1058 ) 

1059 

1060 customer_repository = CustomerRepository.from_session(session) 

1061 customer = await customer_repository.get_by_id(order.customer_id) 

1062 assert customer is not None, "Customer must exist" 

1063 

1064 org_repository = OrganizationRepository.from_session(session) 

1065 organization = await org_repository.get_by_id(customer.organization_id) 

1066 assert organization is not None, "Organization must exist" 

1067 

1068 if customer.stripe_customer_id is None: 

1069 log.warning("Customer is not a Stripe customer", customer_id=customer.id) 

1070 raise OrderNotEligibleForRetry(order) 

1071 

1072 saved_payment_method: PaymentMethod | None = None 

1073 if payment_method_id is not None: 

1074 payment_method_repository = PaymentMethodRepository.from_session(session) 

1075 saved_payment_method = await payment_method_repository.get_by_id( 

1076 payment_method_id 

1077 ) 

1078 if ( 

1079 saved_payment_method is None 

1080 or saved_payment_method.customer_id != customer.id 

1081 ): 

1082 raise PaymentRetryValidationError( 

1083 "Payment method does not belong to customer" 

1084 ) 

1085 

1086 metadata: dict[str, Any] = { 

1087 "order_id": str(order.id), 

1088 } 

1089 if order.tax_rate is not None: 

1090 metadata["tax_amount"] = str(order.tax_amount) 

1091 metadata["tax_country"] = order.tax_rate["country"] 

1092 metadata["tax_state"] = order.tax_rate["state"] 

1093 

1094 try: 

1095 async with self.acquire_payment_lock( 

1096 session, order, release_on_success=True 

1097 ): 

1098 if saved_payment_method is not None: 

1099 # Using saved payment method 

1100 payment_intent = await stripe_service.create_payment_intent( 

1101 amount=order.total_amount, 

1102 currency=order.currency, 

1103 payment_method=saved_payment_method.processor_id, 

1104 customer=customer.stripe_customer_id, 

1105 confirm=True, 

1106 statement_descriptor_suffix=order.statement_descriptor_suffix, 

1107 description=f"{order.organization.name}{order.description}", 

1108 metadata=metadata, 

1109 return_url=settings.generate_frontend_url( 

1110 f"/portal/orders/{str(order.id)}" 

1111 ), 

1112 ) 

1113 else: 

1114 # Using confirmation token (new payment method) 

1115 assert confirmation_token_id is not None 

1116 payment_intent = await stripe_service.create_payment_intent( 

1117 amount=order.total_amount, 

1118 currency=order.currency, 

1119 automatic_payment_methods={"enabled": True}, 

1120 confirm=True, 

1121 confirmation_token=confirmation_token_id, 

1122 customer=customer.stripe_customer_id, 

1123 setup_future_usage="off_session", 

1124 statement_descriptor_suffix=order.statement_descriptor_suffix, 

1125 description=f"{order.organization.name}{order.description}", 

1126 metadata=metadata, 

1127 return_url=settings.generate_frontend_url( 

1128 f"/portal/orders/{str(order.id)}" 

1129 ), 

1130 ) 

1131 

1132 if payment_intent.status == "succeeded": 

1133 log.info( 

1134 "Retry payment succeeded immediately", 

1135 order_id=order.id, 

1136 payment_intent_id=payment_intent.id, 

1137 ) 

1138 

1139 return CustomerOrderPaymentConfirmation( 

1140 status="succeeded", 

1141 client_secret=None, 

1142 error=None, 

1143 ) 

1144 

1145 elif payment_intent.status == "requires_action": 

1146 log.info( 

1147 "Retry payment requires additional action", 

1148 order_id=order.id, 

1149 payment_intent_id=payment_intent.id, 

1150 status=payment_intent.status, 

1151 ) 

1152 

1153 return CustomerOrderPaymentConfirmation( 

1154 status="requires_action", 

1155 client_secret=payment_intent.client_secret, 

1156 error=None, 

1157 ) 

1158 

1159 else: 

1160 error_message = "Payment failed" 

1161 if ( 

1162 payment_intent.last_payment_error 

1163 and payment_intent.last_payment_error.message 

1164 ): 

1165 error_message = payment_intent.last_payment_error.message 

1166 

1167 log.warning( 

1168 "Retry payment failed", 

1169 order_id=order.id, 

1170 payment_intent_id=payment_intent.id, 

1171 status=payment_intent.status, 

1172 error=error_message, 

1173 ) 

1174 

1175 return CustomerOrderPaymentConfirmation( 

1176 status="failed", 

1177 client_secret=None, 

1178 error=error_message, 

1179 ) 

1180 

1181 except stripe_lib.StripeError as stripe_exc: 

1182 log.warning( 

1183 "Stripe error during retry payment", 

1184 order_id=order.id, 

1185 stripe_error_code=stripe_exc.code, 

1186 stripe_error_message=str(stripe_exc), 

1187 ) 

1188 

1189 error_message = ( 

1190 stripe_exc.error.message 

1191 if stripe_exc.error and stripe_exc.error.message 

1192 else "Payment failed. Please try again." 

1193 ) 

1194 

1195 return CustomerOrderPaymentConfirmation( 

1196 status="failed", 

1197 client_secret=None, 

1198 error=error_message, 

1199 ) 

1200 

1201 except Exception as exc: 

1202 log.error( 

1203 "Exception during retry payment", 

1204 order_id=order.id, 

1205 error=str(exc), 

1206 exc_info=True, # Include full traceback 

1207 ) 

1208 

1209 return CustomerOrderPaymentConfirmation( 

1210 status="failed", 

1211 client_secret=None, 

1212 error="Payment failed. Please try again.", 

1213 ) 

1214 

1215 async def handle_payment( 1a

1216 self, session: AsyncSession, order: Order, payment: Payment | None 

1217 ) -> Order: 

1218 # Stripe invoices may already have been marked as paid, so ignore the check 

1219 if order.stripe_invoice_id is None and order.status != OrderStatus.pending: 

1220 raise OrderNotPending(order) 

1221 

1222 previous_status = order.status 

1223 update_dict: dict[str, Any] = {} 

1224 

1225 if order.status == OrderStatus.pending: 

1226 update_dict["status"] = OrderStatus.paid 

1227 

1228 # Clear retry attempt date on successful payment 

1229 if order.next_payment_attempt_at is not None: 

1230 update_dict["next_payment_attempt_at"] = None 

1231 

1232 # Clear payment lock on successful payment 

1233 if order.payment_lock_acquired_at is not None: 

1234 log.info( 

1235 "Clearing payment lock on order due to successful payment", 

1236 order_id=order.id, 

1237 ) 

1238 update_dict["payment_lock_acquired_at"] = None 

1239 

1240 # Balance the order in the ledger 

1241 if payment is not None: 

1242 enqueue_job( 

1243 "order.balance", order_id=order.id, charge_id=payment.processor_id 

1244 ) 

1245 

1246 # Record tax transaction 

1247 if ( 

1248 order.tax_calculation_processor_id is not None 

1249 and order.tax_transaction_processor_id is None 

1250 ): 

1251 transaction = await stripe_service.create_tax_transaction( 

1252 order.tax_calculation_processor_id, str(order.id) 

1253 ) 

1254 update_dict["tax_transaction_processor_id"] = transaction.id 

1255 

1256 repository = OrderRepository.from_session(session) 

1257 order = await repository.update(order, update_dict=update_dict) 

1258 

1259 # If this was a subscription retry success, reactivate the subscription 

1260 if ( 

1261 previous_status == OrderStatus.pending 

1262 and order.subscription is not None 

1263 and order.subscription.status == SubscriptionStatus.past_due 

1264 ): 

1265 await subscription_service.mark_active(session, order.subscription) 

1266 

1267 if update_dict: 

1268 await self._on_order_updated(session, order, previous_status) 

1269 

1270 return order 

1271 

1272 async def create_order_from_stripe( 1a

1273 self, session: AsyncSession, invoice: stripe_lib.Invoice 

1274 ) -> Order: 

1275 assert invoice.id is not None 

1276 

1277 if invoice.metadata and invoice.metadata.get("type") in {ProductType.pledge}: 

1278 raise NotAnOrderInvoice(invoice.id) 

1279 

1280 if invoice.subscription is None: 

1281 raise NotASubscriptionInvoice(invoice.id) 

1282 

1283 # Get subscription 

1284 stripe_subscription_id = get_expandable_id(invoice.subscription) 

1285 subscription_repository = SubscriptionRepository.from_session(session) 

1286 subscription = await subscription_repository.get_by_stripe_subscription_id( 

1287 stripe_subscription_id, 

1288 options=( 

1289 joinedload(Subscription.product).joinedload(Product.organization), 

1290 joinedload(Subscription.customer), 

1291 joinedload(Subscription.meters).joinedload(SubscriptionMeter.meter), 

1292 ), 

1293 ) 

1294 if subscription is None: 

1295 raise SubscriptionDoesNotExist(invoice.id, stripe_subscription_id) 

1296 

1297 # Get customer 

1298 customer = subscription.customer 

1299 

1300 # Retrieve billing address 

1301 billing_address: Address | None = None 

1302 if customer.billing_address is not None: 

1303 billing_address = customer.billing_address 

1304 elif not _is_empty_customer_address(invoice.customer_address): 

1305 billing_address = AddressInput.model_validate(invoice.customer_address) 

1306 # Try to retrieve the country from the payment method 

1307 elif invoice.charge is not None: 

1308 charge = await stripe_service.get_charge(get_expandable_id(invoice.charge)) 

1309 if payment_method_details := charge.payment_method_details: 

1310 if card := getattr(payment_method_details, "card", None): 

1311 billing_address = Address.model_validate({"country": card.country}) 

1312 

1313 # Get Discount if available 

1314 discount: Discount | None = None 

1315 if invoice.discount is not None: 

1316 coupon = invoice.discount.coupon 

1317 if (metadata := coupon.metadata) is None: 

1318 raise DiscountDoesNotExist(invoice.id, coupon.id) 

1319 discount_id = metadata["discount_id"] 

1320 discount = await discount_service.get( 

1321 session, uuid.UUID(discount_id), allow_deleted=True 

1322 ) 

1323 if discount is None: 

1324 raise DiscountDoesNotExist(invoice.id, coupon.id) 

1325 

1326 # Get Checkout if available 

1327 checkout: Checkout | None = None 

1328 invoice_metadata = invoice.metadata or {} 

1329 subscription_metadata = ( 

1330 invoice.subscription_details.metadata or {} 

1331 if invoice.subscription_details 

1332 else {} 

1333 ) 

1334 checkout_id = invoice_metadata.get("checkout_id") or subscription_metadata.get( 

1335 "checkout_id" 

1336 ) 

1337 if checkout_id is not None: 

1338 chekout_repository = CheckoutRepository.from_session(session) 

1339 checkout = await chekout_repository.get_by_id(uuid.UUID(checkout_id)) 

1340 if checkout is None: 

1341 raise CheckoutDoesNotExist(invoice.id, checkout_id) 

1342 

1343 # Handle items 

1344 product_price_repository = ProductPriceRepository.from_session(session) 

1345 items: list[OrderItem] = [] 

1346 for line in invoice.lines: 

1347 tax_amount = sum([tax.amount for tax in line.tax_amounts]) 

1348 product_price: ProductPrice | None = None 

1349 price = line.price 

1350 if price is not None: 

1351 if price.metadata and price.metadata.get("product_price_id"): 

1352 product_price = await product_price_repository.get_by_id( 

1353 uuid.UUID(price.metadata["product_price_id"]), 

1354 options=product_price_repository.get_eager_options(), 

1355 ) 

1356 else: 

1357 product_price = ( 

1358 await product_price_repository.get_by_stripe_price_id( 

1359 price.id, 

1360 options=product_price_repository.get_eager_options(), 

1361 ) 

1362 ) 

1363 

1364 items.append( 

1365 OrderItem( 

1366 label=line.description or "", 

1367 amount=line.amount, 

1368 tax_amount=tax_amount, 

1369 proration=line.proration, 

1370 product_price=product_price, 

1371 ) 

1372 ) 

1373 

1374 if invoice.status == "draft": 

1375 # Add pending billing entries 

1376 stripe_customer_id = customer.stripe_customer_id 

1377 assert stripe_customer_id is not None 

1378 pending_items = await billing_entry_service.create_order_items_from_pending( 

1379 session, 

1380 subscription, 

1381 stripe_invoice_id=invoice.id, 

1382 stripe_customer_id=stripe_customer_id, 

1383 ) 

1384 items.extend(pending_items) 

1385 # Reload the invoice to get totals with added pending items 

1386 if len(pending_items) > 0: 

1387 invoice = await stripe_service.get_invoice(invoice.id) 

1388 

1389 # Update statement descriptor 

1390 # Stripe doesn't allow to set statement descriptor on the subscription itself, 

1391 # so we need to set it manually on each new invoice. 

1392 assert invoice.id is not None 

1393 await stripe_service.update_invoice( 

1394 invoice.id, 

1395 statement_descriptor=subscription.organization.statement_descriptor_prefixed, 

1396 ) 

1397 

1398 # Determine billing reason 

1399 billing_reason = OrderBillingReason.subscription_cycle 

1400 if invoice.billing_reason is not None: 

1401 try: 

1402 billing_reason = OrderBillingReason(invoice.billing_reason) 

1403 except ValueError as e: 

1404 log.error( 

1405 "Unknown billing reason, fallback to 'subscription_cycle'", 

1406 invoice_id=invoice.id, 

1407 billing_reason=invoice.billing_reason, 

1408 ) 

1409 

1410 # Calculate discount amount 

1411 discount_amount = 0 

1412 if invoice.total_discount_amounts: 

1413 for stripe_discount_amount in invoice.total_discount_amounts: 

1414 discount_amount += stripe_discount_amount.amount 

1415 

1416 # Retrieve tax data 

1417 tax_id = customer.tax_id 

1418 tax_calculation_processor_id: str | None = None 

1419 tax_amount = invoice.tax or 0 

1420 taxability_reason: TaxabilityReason | None = None 

1421 tax_rate: TaxRate | None = None 

1422 

1423 # If the subscription is tax-exempted, we need to retrieve tax rate manually: 

1424 # we don't apply tax on the invoice, but we need to know the rate for our 

1425 # accounting and fulfillment purposes. 

1426 if subscription.tax_exempted: 

1427 product = subscription.product 

1428 assert invoice.id is not None 

1429 assert customer.billing_address is not None 

1430 tax_calculation = await calculate_tax( 

1431 invoice.id, 

1432 invoice.currency, 

1433 invoice.subtotal, 

1434 product.tax_code, 

1435 customer.billing_address, 

1436 [customer.tax_id] if customer.tax_id is not None else [], 

1437 subscription.tax_exempted, 

1438 ) 

1439 tax_calculation_processor_id = tax_calculation["processor_id"] 

1440 tax_amount = tax_calculation["amount"] 

1441 taxability_reason = tax_calculation["taxability_reason"] 

1442 tax_rate = tax_calculation["tax_rate"] 

1443 # Automatic tax is enabled, so we can directly take the data from Stripe 

1444 else: 

1445 for total_tax_amount in invoice.total_tax_amounts: 

1446 taxability_reason = TaxabilityReason.from_stripe( 

1447 total_tax_amount.taxability_reason, tax_amount 

1448 ) 

1449 stripe_tax_rate = await stripe_service.get_tax_rate( 

1450 get_expandable_id(total_tax_amount.tax_rate) 

1451 ) 

1452 try: 

1453 tax_rate = from_stripe_tax_rate(stripe_tax_rate) 

1454 except ValueError: 

1455 continue 

1456 else: 

1457 break 

1458 

1459 # Ensure it inherits original metadata and custom fields 

1460 user_metadata = ( 

1461 checkout.user_metadata 

1462 if checkout is not None 

1463 else subscription.user_metadata 

1464 ) 

1465 custom_field_data = ( 

1466 checkout.custom_field_data 

1467 if checkout is not None 

1468 else subscription.custom_field_data 

1469 ) 

1470 

1471 invoice_number = await organization_service.get_next_invoice_number( 

1472 session, subscription.organization, customer 

1473 ) 

1474 

1475 repository = OrderRepository.from_session(session) 

1476 order = await repository.create( 

1477 Order( 

1478 status=OrderStatus.paid 

1479 if invoice.status == "paid" 

1480 else OrderStatus.pending, 

1481 subtotal_amount=invoice.subtotal, 

1482 discount_amount=discount_amount, 

1483 tax_amount=tax_amount, 

1484 currency=invoice.currency, 

1485 billing_reason=billing_reason, 

1486 billing_name=customer.billing_name, 

1487 billing_address=billing_address, 

1488 stripe_invoice_id=invoice.id, 

1489 taxability_reason=taxability_reason, 

1490 tax_id=tax_id, 

1491 tax_rate=tax_rate, 

1492 tax_calculation_processor_id=tax_calculation_processor_id, 

1493 invoice_number=invoice_number, 

1494 customer=customer, 

1495 product=subscription.product, 

1496 discount=discount, 

1497 subscription=subscription, 

1498 checkout=checkout, 

1499 items=items, 

1500 user_metadata=user_metadata, 

1501 custom_field_data=custom_field_data, 

1502 created_at=datetime.fromtimestamp(invoice.created, tz=UTC), 

1503 ), 

1504 flush=True, 

1505 ) 

1506 

1507 # Reset the associated meters, if any 

1508 if billing_reason == OrderBillingReason.subscription_cycle: 

1509 await subscription_service.reset_meters(session, subscription) 

1510 

1511 await self._on_order_created(session, order) 

1512 

1513 return order 

1514 

1515 async def send_admin_notification( 1a

1516 self, session: AsyncSession, organization: Organization, order: Order 

1517 ) -> None: 

1518 product = order.product 

1519 

1520 if product is None: 

1521 return 

1522 

1523 if organization.notification_settings["new_order"]: 

1524 await notifications_service.send_to_org_members( 

1525 session, 

1526 org_id=organization.id, 

1527 notif=PartialNotification( 

1528 type=NotificationType.maintainer_new_product_sale, 

1529 payload=MaintainerNewProductSaleNotificationPayload( 

1530 customer_name=order.customer.email, 

1531 product_name=product.name, 

1532 product_price_amount=order.net_amount, 

1533 organization_name=organization.slug, 

1534 ), 

1535 ), 

1536 ) 

1537 

1538 async def update_order_from_stripe( 1a

1539 self, session: AsyncSession, invoice: stripe_lib.Invoice 

1540 ) -> Order: 

1541 repository = OrderRepository.from_session(session) 

1542 assert invoice.id is not None 

1543 order = await repository.get_by_stripe_invoice_id( 

1544 invoice.id, options=repository.get_eager_options() 

1545 ) 

1546 if order is None: 

1547 raise OrderDoesNotExist(invoice.id) 

1548 

1549 previous_status = order.status 

1550 status = OrderStatus.paid if invoice.status == "paid" else OrderStatus.pending 

1551 order = await repository.update(order, update_dict={"status": status}) 

1552 

1553 # Enqueue the balance creation for out-of-band subscription creation orders 

1554 if ( 

1555 order.paid 

1556 and invoice.metadata 

1557 and (charge_id := invoice.metadata.get("charge_id")) 

1558 ): 

1559 enqueue_job("order.balance", order_id=order.id, charge_id=charge_id) 

1560 

1561 await self._on_order_updated(session, order, previous_status) 

1562 return order 

1563 

1564 async def send_confirmation_email( 1a

1565 self, session: AsyncSession, order: Order 

1566 ) -> None: 

1567 organization_repository = OrganizationRepository.from_session(session) 

1568 organization = await organization_repository.get_by_customer(order.customer_id) 

1569 

1570 template_name: Literal[ 

1571 "order_confirmation", 

1572 "subscription_confirmation", 

1573 "subscription_cycled", 

1574 "subscription_updated", 

1575 ] 

1576 subject_template: str 

1577 url_path_template: str 

1578 

1579 match order.billing_reason: 

1580 case OrderBillingReasonInternal.purchase: 

1581 template_name = "order_confirmation" 

1582 subject_template = "Your {description} order confirmation" 

1583 url_path_template = "/{organization}/portal" 

1584 url_params = { 

1585 "customer_session_token": "{token}", 

1586 "id": "{order}", 

1587 "email": "{email}", 

1588 } 

1589 case OrderBillingReasonInternal.subscription_create: 

1590 template_name = "subscription_confirmation" 

1591 subject_template = "Your {description} subscription" 

1592 url_path_template = "/{organization}/portal" 

1593 url_params = { 

1594 "customer_session_token": "{token}", 

1595 "id": "{subscription}", 

1596 "email": "{email}", 

1597 } 

1598 case ( 

1599 OrderBillingReasonInternal.subscription_cycle 

1600 | OrderBillingReasonInternal.subscription_cycle_after_trial 

1601 ): 

1602 template_name = "subscription_cycled" 

1603 subject_template = "Your {description} subscription has been renewed" 

1604 url_path_template = "/{organization}/portal" 

1605 url_params = { 

1606 "customer_session_token": "{token}", 

1607 "id": "{subscription}", 

1608 "email": "{email}", 

1609 } 

1610 case OrderBillingReasonInternal.subscription_update: 

1611 template_name = "subscription_updated" 

1612 subject_template = "Your subscription has changed to {description}" 

1613 url_path_template = "/{organization}/portal" 

1614 url_params = { 

1615 "customer_session_token": "{token}", 

1616 "id": "{subscription}", 

1617 "email": "{email}", 

1618 } 

1619 

1620 if not organization.customer_email_settings[template_name]: 

1621 return 

1622 

1623 product = order.product 

1624 customer = order.customer 

1625 subscription = order.subscription 

1626 token, _ = await customer_session_service.create_customer_session( 

1627 session, customer 

1628 ) 

1629 

1630 # Build query parameters with proper URL encoding 

1631 params = { 

1632 key: value.format( 

1633 token=token, 

1634 order=order.id, 

1635 subscription=subscription.id if subscription else "", 

1636 email=customer.email, 

1637 ) 

1638 for key, value in url_params.items() 

1639 } 

1640 query_string = urlencode(params) 

1641 url_path = url_path_template.format(organization=organization.slug) 

1642 url = settings.generate_frontend_url(f"{url_path}?{query_string}") 

1643 subject = subject_template.format(description=order.description) 

1644 email = EmailAdapter.validate_python( 

1645 { 

1646 "template": template_name, 

1647 "props": { 

1648 "email": customer.email, 

1649 "organization": organization, 

1650 "product": product, 

1651 "order": order, 

1652 "subscription": subscription, 

1653 "url": url, 

1654 }, 

1655 } 

1656 ) 

1657 

1658 # Generate invoice to attach to the email 

1659 invoice_path: str | None = None 

1660 if invoice_path is None: 

1661 if order.billing_name is None or order.billing_address is None: 

1662 log.warning( 

1663 "Cannot generate invoice, missing billing info", order_id=order.id 

1664 ) 

1665 else: 

1666 order = await self.generate_invoice(session, order) 

1667 invoice_path = order.invoice_path 

1668 

1669 attachments: list[Attachment] = [] 

1670 if invoice_path is not None: 

1671 invoice = await self.get_order_invoice(order) 

1672 attachments = [ 

1673 {"remote_url": invoice.url, "filename": order.invoice_filename} 

1674 ] 

1675 

1676 body = render_email_template(email) 

1677 enqueue_email( 

1678 **organization.email_from_reply, 

1679 to_email_addr=customer.email, 

1680 subject=subject, 

1681 html_content=body, 

1682 attachments=attachments, 

1683 ) 

1684 

1685 async def update_product_benefits_grants( 1a

1686 self, session: AsyncSession, product: Product 

1687 ) -> None: 

1688 statement = select(Order).where( 

1689 Order.product_id == product.id, 

1690 Order.deleted_at.is_(None), 

1691 Order.subscription_id.is_(None), 

1692 ) 

1693 orders = await session.stream_scalars( 

1694 statement, 

1695 execution_options={"yield_per": settings.DATABASE_STREAM_YIELD_PER}, 

1696 ) 

1697 async for order in orders: 

1698 # Skip seat-based orders - benefits are granted when seats are claimed 

1699 if order.seats is not None: 

1700 continue 

1701 

1702 enqueue_job( 

1703 "benefit.enqueue_benefits_grants", 

1704 task="grant", 

1705 customer_id=order.customer_id, 

1706 product_id=product.id, 

1707 order_id=order.id, 

1708 ) 

1709 

1710 async def update_refunds( 1a

1711 self, 

1712 session: AsyncSession, 

1713 order: Order, 

1714 *, 

1715 refunded_amount: int, 

1716 refunded_tax_amount: int, 

1717 ) -> Order: 

1718 order.update_refunds(refunded_amount, refunded_tax_amount=refunded_tax_amount) 

1719 session.add(order) 

1720 return order 

1721 

1722 async def create_order_balance( 1a

1723 self, session: AsyncSession, order: Order, charge_id: str 

1724 ) -> None: 

1725 organization = order.organization 

1726 account_repository = AccountRepository.from_session(session) 

1727 account = await account_repository.get_by_organization(organization.id) 

1728 

1729 # Retrieve the payment transaction and link it to the order 

1730 payment_transaction = await balance_transaction_service.get_by( 

1731 session, type=TransactionType.payment, charge_id=charge_id 

1732 ) 

1733 if payment_transaction is None: 

1734 raise PaymentTransactionForChargeDoesNotExist(charge_id) 

1735 

1736 # Make sure to take the amount from the payment transaction and not the order 

1737 # Orders invoices may apply customer balances which won't reflect the actual payment amount 

1738 transfer_amount = payment_transaction.amount 

1739 

1740 payment_transaction.order = order 

1741 payment_transaction.payment_customer = order.customer 

1742 session.add(payment_transaction) 

1743 

1744 # Prepare an held balance 

1745 # It'll be used if the account is not created yet 

1746 held_balance = HeldBalance( 

1747 amount=transfer_amount, order=order, payment_transaction=payment_transaction 

1748 ) 

1749 

1750 # No account, create the held balance 

1751 if account is None: 

1752 held_balance.organization = organization 

1753 

1754 # Sanity check: make sure we didn't already create a held balance for this order 

1755 existing_held_balance = await held_balance_service.get_by( 

1756 session, 

1757 payment_transaction_id=payment_transaction.id, 

1758 organization_id=organization.id, 

1759 ) 

1760 if existing_held_balance is not None: 

1761 raise AlreadyBalancedOrder(order, payment_transaction) 

1762 

1763 await held_balance_service.create(session, held_balance=held_balance) 

1764 

1765 return 

1766 

1767 # Sanity check: make sure we didn't already create a balance for this order 

1768 existing_balance_transaction = await balance_transaction_service.get_by( 

1769 session, 

1770 type=TransactionType.balance, 

1771 payment_transaction_id=payment_transaction.id, 

1772 account_id=account.id, 

1773 ) 

1774 if existing_balance_transaction is not None: 

1775 raise AlreadyBalancedOrder(order, payment_transaction) 

1776 

1777 # Account created, create the balance immediately 

1778 balance_transactions = ( 

1779 await balance_transaction_service.create_balance_from_charge( 

1780 session, 

1781 source_account=None, 

1782 destination_account=account, 

1783 charge_id=charge_id, 

1784 amount=transfer_amount, 

1785 order=order, 

1786 ) 

1787 ) 

1788 platform_fee_transactions = ( 

1789 await platform_fee_transaction_service.create_fees_reversal_balances( 

1790 session, balance_transactions=balance_transactions 

1791 ) 

1792 ) 

1793 order.platform_fee_amount = sum( 

1794 incoming.amount for _, incoming in platform_fee_transactions 

1795 ) 

1796 session.add(order) 

1797 

1798 async def send_webhook( 1a

1799 self, 

1800 session: AsyncSession, 

1801 order: Order, 

1802 event_type: Literal[ 

1803 WebhookEventType.order_created, 

1804 WebhookEventType.order_updated, 

1805 WebhookEventType.order_paid, 

1806 ], 

1807 ) -> None: 

1808 await session.refresh(order.customer, {"organization"}) 

1809 if order.product is not None: 

1810 await session.refresh(order.product, {"prices"}) 

1811 

1812 # Refresh order items with their product_price.product relationship loaded 

1813 # This is needed for webhook serialization which accesses `legacy_product_price.product` 

1814 for item in order.items: 

1815 if item.product_price: 

1816 await session.refresh(item.product_price, {"product"}) 

1817 

1818 organization = order.organization 

1819 await webhook_service.send(session, organization, event_type, order) 

1820 

1821 async def _on_order_created(self, session: AsyncSession, order: Order) -> None: 1a

1822 enqueue_job("order.confirmation_email", order.id) 

1823 await self.send_webhook(session, order, WebhookEventType.order_created) 

1824 

1825 if order.paid: 

1826 await self._on_order_updated( 

1827 session, 

1828 order, 

1829 OrderStatus.pending, # Pretend the previous status was pending to trigger the paid event 

1830 ) 

1831 

1832 # Notify checkout channel that an order has been created from it 

1833 if order.checkout: 

1834 await publish_checkout_event( 

1835 order.checkout.client_secret, CheckoutEvent.order_created 

1836 ) 

1837 

1838 async def _on_order_updated( 1a

1839 self, session: AsyncSession, order: Order, previous_status: OrderStatus 

1840 ) -> None: 

1841 await self.send_webhook(session, order, WebhookEventType.order_updated) 

1842 

1843 became_paid = ( 

1844 order.status == OrderStatus.paid and previous_status != OrderStatus.paid 

1845 ) 

1846 if became_paid: 

1847 await self._on_order_paid(session, order) 

1848 

1849 async def _on_order_paid(self, session: AsyncSession, order: Order) -> None: 1a

1850 assert order.paid 

1851 

1852 await self.send_webhook(session, order, WebhookEventType.order_paid) 

1853 

1854 await event_service.create_event( 

1855 session, 

1856 build_system_event( 

1857 SystemEvent.order_paid, 

1858 customer=order.customer, 

1859 organization=order.organization, 

1860 metadata=OrderPaidMetadata( 

1861 order_id=str(order.id), 

1862 amount=order.total_amount, 

1863 currency=order.currency, 

1864 ), 

1865 ), 

1866 ) 

1867 

1868 if order.subscription_id is not None and order.billing_reason in ( 

1869 OrderBillingReasonInternal.subscription_cycle, 

1870 OrderBillingReasonInternal.subscription_cycle_after_trial, 

1871 ): 

1872 enqueue_job( 

1873 "benefit.enqueue_benefit_grant_cycles", 

1874 subscription_id=order.subscription_id, 

1875 ) 

1876 

1877 async def handle_payment_failure( 1a

1878 self, session: AsyncSession, order: Order 

1879 ) -> Order: 

1880 """Handle payment failure for an order, initiating dunning if necessary.""" 

1881 # Don't process payment failure if the order is already paid 

1882 if order.status == OrderStatus.paid: 

1883 log.warning( 

1884 "Ignoring payment failure for already paid order", 

1885 order_id=order.id, 

1886 ) 

1887 return order 

1888 

1889 # Clear payment lock on failure 

1890 if order.payment_lock_acquired_at is not None: 

1891 log.info( 

1892 "Clearing payment lock on order due to payment failure", 

1893 order_id=order.id, 

1894 ) 

1895 repository = OrderRepository.from_session(session) 

1896 order = await repository.release_payment_lock(order) 

1897 

1898 if order.subscription is None: 

1899 return order 

1900 

1901 if order.subscription.stripe_subscription_id is not None: 

1902 # If the subscription is managed by Stripe, we don't handle dunning. Stripe will handle it. 

1903 return order 

1904 

1905 if order.next_payment_attempt_at is None: 

1906 return await self._handle_first_dunning_attempt(session, order) 

1907 

1908 return await self._handle_consecutive_dunning_attempts(session, order) 

1909 

1910 async def _handle_first_dunning_attempt( 1a

1911 self, session: AsyncSession, order: Order 

1912 ) -> Order: 

1913 """Handle the first dunning attempt for an order, setting the next payment 

1914 attempt date and marking the subscription as past due. 

1915 """ 

1916 

1917 first_retry_date = utc_now() + settings.DUNNING_RETRY_INTERVALS[0] 

1918 

1919 repository = OrderRepository.from_session(session) 

1920 order = await repository.update( 

1921 order, update_dict={"next_payment_attempt_at": first_retry_date} 

1922 ) 

1923 

1924 assert order.subscription is not None 

1925 await subscription_service.mark_past_due(session, order.subscription) 

1926 

1927 return order 

1928 

1929 async def _handle_consecutive_dunning_attempts( 1a

1930 self, session: AsyncSession, order: Order 

1931 ) -> Order: 

1932 """Handle consecutive dunning attempts for an order.""" 

1933 payment_repository = PaymentRepository.from_session(session) 

1934 failed_attempts = await payment_repository.count_failed_payments_for_order( 

1935 order.id 

1936 ) 

1937 

1938 repository = OrderRepository.from_session(session) 

1939 

1940 if failed_attempts >= len(settings.DUNNING_RETRY_INTERVALS): 

1941 # No more retries, mark subscription as unpaid and clear retry date 

1942 order = await repository.update( 

1943 order, update_dict={"next_payment_attempt_at": None} 

1944 ) 

1945 

1946 subscription = order.subscription 

1947 if subscription is not None and subscription.can_cancel(immediately=True): 

1948 await subscription_service.revoke(session, subscription) 

1949 

1950 return order 

1951 

1952 # Schedule next retry using the appropriate interval 

1953 next_interval = settings.DUNNING_RETRY_INTERVALS[failed_attempts] 

1954 next_retry_date = utc_now() + next_interval 

1955 

1956 order = await repository.update( 

1957 order, update_dict={"next_payment_attempt_at": next_retry_date} 

1958 ) 

1959 

1960 # Re-enqueue benefit revocation to check if grace period has expired 

1961 subscription = order.subscription 

1962 if subscription is not None: 

1963 await subscription_service.enqueue_benefits_grants(session, subscription) 

1964 

1965 return order 

1966 

1967 async def process_dunning_order(self, session: AsyncSession, order: Order) -> Order: 1a

1968 """Process a single order due for dunning payment retry.""" 

1969 if order.subscription is None: 

1970 log.warning( 

1971 "Order has no subscription, skipping dunning", 

1972 order_id=order.id, 

1973 ) 

1974 return order 

1975 

1976 if order.subscription.status == SubscriptionStatus.canceled: 

1977 log.info( 

1978 "Order subscription is cancelled, removing order from dunning process", 

1979 order_id=order.id, 

1980 subscription_id=order.subscription.id, 

1981 ) 

1982 

1983 repository = OrderRepository.from_session(session) 

1984 return await repository.update( 

1985 order, update_dict={"next_payment_attempt_at": None} 

1986 ) 

1987 

1988 if order.subscription.payment_method_id is None: 

1989 log.warning( 

1990 "Order subscription has no payment method, skipping dunning", 

1991 order_id=order.id, 

1992 subscription_id=order.subscription.id, 

1993 ) 

1994 return order 

1995 

1996 log.info( 

1997 "Processing dunning order", 

1998 order_id=order.id, 

1999 subscription_id=order.subscription.id, 

2000 ) 

2001 

2002 # Enqueue a payment retry for this order 

2003 enqueue_job( 

2004 "order.trigger_payment", 

2005 order_id=order.id, 

2006 payment_method_id=order.subscription.payment_method_id, 

2007 ) 

2008 

2009 return order 

2010 

2011 

2012order = OrderService() 1a