Coverage for polar/wallet/service.py: 29%
103 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1import uuid 1a
2from collections.abc import Sequence 1a
4import stripe as stripe_lib 1a
6from polar.auth.models import AuthSubject, Organization, User 1a
7from polar.enums import PaymentProcessor 1a
8from polar.exceptions import PolarError 1a
9from polar.integrations.stripe.service import stripe as stripe_service 1a
10from polar.kit.pagination import PaginationParams 1a
11from polar.kit.sorting import Sorting 1a
12from polar.kit.tax import TaxCode, calculate_tax 1a
13from polar.models import Customer, Order, Wallet, WalletTransaction 1a
14from polar.models.payment_method import PaymentMethod 1a
15from polar.models.wallet import WalletType 1a
16from polar.payment_method.service import payment_method as payment_method_service 1a
17from polar.postgres import AsyncReadSession, AsyncSession 1a
19from .repository import WalletRepository, WalletTransactionRepository 1a
20from .sorting import WalletSortProperty 1a
23class WalletError(PolarError): ... 1a
26class WalletAlreadyExistsError(WalletError): 1a
27 def __init__(self, customer: Customer) -> None: 1a
28 self.customer = customer
29 message = "A wallet already exists for this customer."
30 super().__init__(message, 409)
33class MissingPaymentMethodError(WalletError): 1a
34 def __init__(self, wallet: Wallet) -> None: 1a
35 self.wallet = wallet
36 message = "No payment method available for the wallet's customer."
37 super().__init__(message, 402)
40class InvalidPaymentMethodError(WalletError): 1a
41 def __init__(self, wallet: Wallet, payment_method: PaymentMethod) -> None: 1a
42 self.wallet = wallet
43 self.payment_method = payment_method
44 message = "The payment method does not belong to the wallet's customer."
45 super().__init__(message, 403)
48class PaymentIntentFailedError(WalletError): 1a
49 def __init__( 1a
50 self, wallet: Wallet, payment_intent: stripe_lib.PaymentIntent
51 ) -> None:
52 self.wallet = wallet
53 self.payment_intent = payment_intent
54 message = "Payment failed."
55 super().__init__(message, 400)
58class WalletService: 1a
59 async def list( 1a
60 self,
61 session: AsyncReadSession,
62 auth_subject: AuthSubject[User | Organization],
63 *,
64 organization_id: Sequence[uuid.UUID] | None = None,
65 type: Sequence[WalletType] | None = None,
66 customer_id: Sequence[uuid.UUID] | None = None,
67 pagination: PaginationParams,
68 sorting: list[Sorting[WalletSortProperty]] = [
69 (WalletSortProperty.created_at, True)
70 ],
71 ) -> tuple[Sequence[Wallet], int]:
72 repository = WalletRepository.from_session(session)
73 statement = repository.get_readable_statement(auth_subject)
75 if organization_id is not None:
76 statement = statement.where(Customer.organization_id.in_(organization_id))
78 if type is not None:
79 statement = statement.where(Wallet.type.in_(type))
81 if customer_id is not None:
82 statement = statement.where(Customer.id.in_(customer_id))
84 statement = repository.apply_sorting(statement, sorting)
86 return await repository.paginate(
87 statement, limit=pagination.limit, page=pagination.page
88 )
90 async def get( 1a
91 self,
92 session: AsyncReadSession,
93 auth_subject: AuthSubject[User | Organization],
94 id: uuid.UUID,
95 ) -> Wallet | None:
96 repository = WalletRepository.from_session(session)
97 statement = repository.get_readable_statement(auth_subject).where(
98 Wallet.id == id
99 )
100 return await repository.get_one_or_none(statement)
102 async def top_up( 1a
103 self,
104 session: AsyncSession,
105 wallet: Wallet,
106 amount: int,
107 payment_method: PaymentMethod | None = None,
108 ) -> WalletTransaction:
109 if payment_method is None:
110 payment_method = await payment_method_service.get_customer_payment_method(
111 session, wallet.customer
112 )
113 if payment_method is None:
114 raise MissingPaymentMethodError(wallet)
116 if payment_method.customer != wallet.customer:
117 raise InvalidPaymentMethodError(wallet, payment_method)
119 customer = wallet.customer
120 billing_address = customer.billing_address
122 # Calculate tax
123 tax_amount = 0
124 tax_calculation_processor_id: str | None = None
125 if billing_address is not None:
126 tax_id = customer.tax_id
127 tax_calculation = await calculate_tax(
128 f"top_up:{wallet.id}:{uuid.uuid4()}",
129 wallet.currency,
130 amount,
131 TaxCode.general_electronically_supplied_services,
132 billing_address,
133 [tax_id] if tax_id is not None else [],
134 False,
135 )
136 tax_calculation_processor_id = tax_calculation["processor_id"]
137 tax_amount = tax_calculation["amount"]
139 transaction = await self.create_transaction(
140 session,
141 wallet,
142 amount,
143 tax_amount=tax_amount,
144 tax_calculation_processor_id=tax_calculation_processor_id,
145 flush=True,
146 )
147 total_amount = amount + tax_amount
149 if payment_method.processor == PaymentProcessor.stripe:
150 organization = wallet.organization
151 assert customer.stripe_customer_id is not None
152 payment_intent = await stripe_service.create_payment_intent(
153 amount=total_amount,
154 currency=wallet.currency,
155 payment_method=payment_method.processor_id,
156 customer=customer.stripe_customer_id,
157 confirm=True,
158 off_session=True,
159 statement_descriptor_suffix=organization.statement_descriptor(),
160 description=f"{organization.name} — Wallet Top-Up",
161 metadata={
162 "customer_id": str(wallet.customer.id),
163 "wallet_id": str(wallet.id),
164 "wallet_transaction_id": str(transaction.id),
165 },
166 )
168 if payment_intent.status != "succeeded":
169 raise PaymentIntentFailedError(wallet, payment_intent)
171 # Refresh wallet balance
172 await session.flush()
173 await session.refresh(wallet, {"balance"})
175 return transaction
177 async def create_transaction( 1a
178 self,
179 session: AsyncSession,
180 wallet: Wallet,
181 amount: int,
182 *,
183 tax_amount: int | None = None,
184 tax_calculation_processor_id: str | None = None,
185 order: Order | None = None,
186 flush: bool = False,
187 ) -> WalletTransaction:
188 repository = WalletTransactionRepository(session)
189 return await repository.create(
190 WalletTransaction(
191 currency=wallet.currency,
192 amount=amount,
193 wallet=wallet,
194 tax_amount=tax_amount,
195 tax_calculation_processor_id=tax_calculation_processor_id,
196 order=order,
197 ),
198 flush=flush,
199 )
201 async def get_billing_wallet_balance( 1a
202 self, session: AsyncSession, customer: Customer, currency: str
203 ) -> int:
204 repository = WalletRepository.from_session(session)
205 wallet = await repository.get_by_type_currency_customer(
206 WalletType.billing, currency, customer.id
207 )
208 # Small optimization to avoid creating wallet if not existing
209 if wallet is None:
210 return 0
211 await session.refresh(wallet, {"balance"})
212 return wallet.balance
214 async def get_or_create_billing_wallet( 1a
215 self, session: AsyncSession, customer: Customer, currency: str
216 ) -> Wallet:
217 repository = WalletRepository.from_session(session)
218 wallet = await repository.get_by_type_currency_customer(
219 WalletType.billing, currency, customer.id
220 )
221 if wallet is None:
222 wallet = await repository.create(
223 Wallet(
224 type=WalletType.billing,
225 currency=currency,
226 customer=customer,
227 )
228 )
229 return wallet
231 async def create_balance_transaction( 1a
232 self,
233 session: AsyncSession,
234 customer: Customer,
235 amount: int,
236 currency: str,
237 *,
238 order: Order | None = None,
239 ) -> WalletTransaction:
240 wallet = await self.get_or_create_billing_wallet(session, customer, currency)
241 return await self.create_transaction(session, wallet, amount, order=order)
244wallet = WalletService() 1a