Coverage for polar/payout/service.py: 24%
243 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import datetime 1a
2import uuid 1a
3from collections.abc import AsyncIterable, Sequence 1a
4from typing import Any, cast 1a
6import stripe as stripe_lib 1a
7import structlog 1a
9from polar.auth.models import AuthSubject, User 1a
10from polar.config import settings 1a
11from polar.enums import AccountType 1a
12from polar.eventstream.service import publish as eventstream_publish 1a
13from polar.exceptions import PolarError, PolarRequestValidationError 1a
14from polar.integrations.stripe.service import stripe as stripe_service 1a
15from polar.integrations.stripe.utils import get_expandable_id 1a
16from polar.invoice.service import invoice as invoice_service 1a
17from polar.kit.csv import IterableCSVWriter 1a
18from polar.kit.db.postgres import AsyncSessionMaker 1a
19from polar.kit.pagination import PaginationParams 1a
20from polar.kit.sorting import Sorting 1a
21from polar.locker import Locker 1a
22from polar.logging import Logger 1a
23from polar.models import Account, Payout 1a
24from polar.models.payout import PayoutStatus 1a
25from polar.organization.repository import OrganizationRepository 1a
26from polar.postgres import AsyncSession 1a
27from polar.transaction.repository import ( 1a
28 PayoutTransactionRepository,
29 TransactionRepository,
30)
31from polar.transaction.service.payout import ( 1a
32 payout_transaction as payout_transaction_service,
33)
34from polar.transaction.service.platform_fee import PayoutAmountTooLow 1a
35from polar.transaction.service.platform_fee import ( 1a
36 platform_fee_transaction as platform_fee_transaction_service,
37)
38from polar.transaction.service.transaction import transaction as transaction_service 1a
39from polar.worker import enqueue_job 1a
41from .repository import PayoutRepository 1a
42from .schemas import PayoutEstimate, PayoutGenerateInvoice, PayoutInvoice 1a
43from .sorting import PayoutSortProperty 1a
45log: Logger = structlog.get_logger() 1a
48class PayoutError(PolarError): ... 1a
51class InsufficientBalance(PayoutError): 1a
52 def __init__(self, account: Account, balance: int) -> None: 1a
53 self.account = account
54 self.balance = balance
55 message = "Your account has an insufficient balance to make a payout."
56 super().__init__(message, 400)
59class UnderReviewAccount(PayoutError): 1a
60 def __init__(self, account: Account) -> None: 1a
61 self.account = account
62 message = "Your account is under review and can't receive payouts."
63 super().__init__(message, 403)
66class NotReadyAccount(PayoutError): 1a
67 def __init__(self, account: Account) -> None: 1a
68 self.account = account
69 message = "Your account is not ready."
70 super().__init__(message, 403)
73class PendingPayoutCreation(PayoutError): 1a
74 def __init__(self, account: Account) -> None: 1a
75 self.account = account
76 message = f"A payout is already being created for the account {account.id}."
77 super().__init__(message, 409)
80class PayoutDoesNotExist(PayoutError): 1a
81 def __init__(self, payout_id: str) -> None: 1a
82 self.payout_id = payout_id
83 message = (
84 f"Received payout {payout_id} from Stripe, "
85 "but it's not associated to a Payout."
86 )
87 super().__init__(message, 404)
90class InvoiceAlreadyExists(PayoutError): 1a
91 def __init__(self, payout: Payout) -> None: 1a
92 self.payout = payout
93 message = f"An invoice already exists for payout {payout.id}."
94 super().__init__(message, 409)
97class PayoutNotSucceeded(PayoutError): 1a
98 def __init__(self, payout: Payout) -> None: 1a
99 self.payout = payout
100 message = (
101 f"Can't generate an invoice for payout {payout.id} because "
102 "it has not succeeded yet."
103 )
104 super().__init__(message, 400)
107class MissingInvoiceBillingDetails(PayoutError): 1a
108 def __init__(self, payout: Payout) -> None: 1a
109 self.payout = payout
110 message = (
111 "You must provide billing details for the account to generate an invoice."
112 )
113 super().__init__(message, 400)
116class InvoiceDoesNotExist(PayoutError): 1a
117 def __init__(self, payout: Payout) -> None: 1a
118 self.payout = payout
119 message = f"Invoice does not exist for payout {payout.id}."
120 super().__init__(message, 404)
123class PayoutAlreadyTriggered(PayoutError): 1a
124 def __init__(self, payout: Payout) -> None: 1a
125 self.payout = payout
126 message = f"Payout {payout.id} has already been triggered."
127 super().__init__(message)
130class PayoutService: 1a
131 async def list( 1a
132 self,
133 session: AsyncSession,
134 auth_subject: AuthSubject[User],
135 *,
136 account_id: Sequence[uuid.UUID] | None = None,
137 status: Sequence[PayoutStatus] | None = None,
138 pagination: PaginationParams,
139 sorting: list[Sorting[PayoutSortProperty]] = [
140 (PayoutSortProperty.created_at, False)
141 ],
142 ) -> tuple[Sequence[Payout], int]:
143 repository = PayoutRepository.from_session(session)
144 statement = repository.get_readable_statement(auth_subject).options(
145 *repository.get_eager_options()
146 )
148 if account_id is not None:
149 statement = statement.where(Payout.account_id.in_(account_id))
151 if status is not None:
152 statement = statement.where(Payout.status.in_(status))
154 statement = repository.apply_sorting(statement, sorting)
156 return await repository.paginate(
157 statement, limit=pagination.limit, page=pagination.page
158 )
160 async def get( 1a
161 self,
162 session: AsyncSession,
163 auth_subject: AuthSubject[User],
164 id: uuid.UUID,
165 ) -> Payout | None:
166 repository = PayoutRepository.from_session(session)
167 statement = (
168 repository.get_readable_statement(auth_subject)
169 .where(Payout.id == id)
170 .options(*repository.get_eager_options())
171 )
172 return await repository.get_one_or_none(statement)
174 async def estimate( 1a
175 self, session: AsyncSession, *, account: Account
176 ) -> PayoutEstimate:
177 if account.is_under_review():
178 raise UnderReviewAccount(account)
179 if not account.is_payout_ready():
180 raise NotReadyAccount(account)
182 balance_amount = await transaction_service.get_transactions_sum(
183 session, account.id
184 )
185 if balance_amount < settings.get_minimum_payout_for_currency(account.currency):
186 raise InsufficientBalance(account, balance_amount)
188 try:
189 payout_fees = await platform_fee_transaction_service.get_payout_fees(
190 session, account=account, balance_amount=balance_amount
191 )
192 except PayoutAmountTooLow as e:
193 raise InsufficientBalance(account, balance_amount) from e
195 return PayoutEstimate(
196 account_id=account.id,
197 gross_amount=balance_amount,
198 fees_amount=sum(fee for _, fee in payout_fees),
199 net_amount=balance_amount - sum(fee for _, fee in payout_fees),
200 )
202 async def create( 1a
203 self, session: AsyncSession, locker: Locker, *, account: Account
204 ) -> Payout:
205 lock_name = f"payout:{account.id}"
206 if await locker.is_locked(lock_name):
207 raise PendingPayoutCreation(account)
209 async with locker.lock(lock_name, timeout=60, blocking_timeout=1):
210 if account.is_under_review():
211 raise UnderReviewAccount(account)
212 if not account.is_payout_ready():
213 raise NotReadyAccount(account)
215 balance_amount = await transaction_service.get_transactions_sum(
216 session, account.id
217 )
218 if balance_amount < settings.get_minimum_payout_for_currency(
219 account.currency
220 ):
221 raise InsufficientBalance(account, balance_amount)
223 try:
224 (
225 balance_amount_after_fees,
226 payout_fees_balances,
227 ) = await platform_fee_transaction_service.create_payout_fees_balances(
228 session, account=account, balance_amount=balance_amount
229 )
230 except PayoutAmountTooLow as e:
231 raise InsufficientBalance(account, balance_amount) from e
233 repository = PayoutRepository.from_session(session)
234 payout = await repository.create(
235 Payout(
236 processor=account.account_type,
237 currency="usd", # FIXME: Main Polar currency
238 amount=balance_amount_after_fees,
239 fees_amount=balance_amount - balance_amount_after_fees,
240 account_currency=account.currency,
241 account_amount=balance_amount_after_fees,
242 account=account,
243 invoice_number=await self._get_next_invoice_number(
244 session, account
245 ),
246 )
247 )
248 transaction = await payout_transaction_service.create(
249 session, payout, payout_fees_balances
250 )
252 if payout.currency != payout.account_currency:
253 await repository.update(
254 payout,
255 update_dict={"account_amount": -transaction.account_amount},
256 )
258 enqueue_job("payout.created", payout_id=payout.id)
260 return payout
262 async def transfer_stripe(self, session: AsyncSession, payout: Payout) -> Payout: 1a
263 """
264 The Stripe payout is a two-steps process:
266 1. Make the transfer to the Stripe Connect account
267 2. Trigger a payout on the Stripe Connect account,
268 but later once the balance is actually available.
270 This function performs the first step.
271 """
272 account = payout.account
273 assert account.stripe_id is not None
275 payout_transaction_repository = PayoutTransactionRepository.from_session(
276 session
277 )
278 transaction = await payout_transaction_repository.get_by_payout_id(payout.id)
279 assert transaction is not None
281 stripe_transfer = await stripe_service.transfer(
282 account.stripe_id,
283 payout.amount,
284 metadata={
285 "payout_id": str(payout.id),
286 "payout_transaction_id": str(transaction.id),
287 },
288 idempotency_key=f"payout-{payout.id}",
289 )
291 transaction.transfer_id = stripe_transfer.id
293 # Different source and destination currencies: get the converted amount
294 account_amount = payout.account_amount
295 if transaction.currency != transaction.account_currency:
296 assert stripe_transfer.destination_payment is not None
297 stripe_destination_charge = await stripe_service.get_charge(
298 get_expandable_id(stripe_transfer.destination_payment),
299 stripe_account=account.stripe_id,
300 expand=["balance_transaction"],
301 )
302 # Case where the charge don't lead to a balance transaction,
303 # e.g. when the converted amount is 0
304 if stripe_destination_charge.balance_transaction is None:
305 account_amount = 0
306 else:
307 stripe_destination_balance_transaction = cast(
308 stripe_lib.BalanceTransaction,
309 stripe_destination_charge.balance_transaction,
310 )
311 account_amount = stripe_destination_balance_transaction.amount
313 await payout_transaction_repository.update(
314 transaction,
315 update_dict={
316 "account_amount": -account_amount,
317 "transfer_id": stripe_transfer.id,
318 },
319 )
321 payout_repository = PayoutRepository.from_session(session)
322 payout = await payout_repository.update(
323 payout, update_dict={"account_amount": account_amount}
324 )
326 return payout
328 async def update_from_stripe( 1a
329 self, session: AsyncSession, stripe_payout: stripe_lib.Payout
330 ) -> Payout:
331 repository = PayoutRepository.from_session(session)
332 payout = await repository.get_by_processor_id(
333 AccountType.stripe, stripe_payout.id
334 )
335 if payout is None:
336 raise PayoutDoesNotExist(stripe_payout.id)
338 status = PayoutStatus.from_stripe(stripe_payout.status)
339 update_dict: dict[str, Any] = {"status": status}
340 if status == PayoutStatus.succeeded and stripe_payout.arrival_date is not None:
341 update_dict["paid_at"] = datetime.datetime.fromtimestamp(
342 stripe_payout.arrival_date, datetime.UTC
343 )
344 return await repository.update(payout, update_dict=update_dict)
346 async def trigger_stripe_payouts(self, session: AsyncSession) -> None: 1a
347 """
348 The Stripe payout is a two-steps process:
350 1. Make the transfer to the Stripe Connect account.
351 2. Trigger a payout on the Stripe Connect account,
352 but later once our safety delay is passed and the balance is actually available.
354 This function performs the second step and tries to trigger pending payouts,
355 if balance is available.
356 """
357 repository = PayoutRepository.from_session(session)
358 for payout in await repository.get_all_stripe_pending():
359 enqueue_job("payout.trigger_stripe_payout", payout_id=payout.id)
361 async def trigger_stripe_payout( 1a
362 self, session: AsyncSession, payout: Payout
363 ) -> Payout:
364 if payout.processor_id is not None:
365 raise PayoutAlreadyTriggered(payout)
367 account = payout.account
368 assert account.stripe_id is not None
369 _, balance = await stripe_service.retrieve_balance(account.stripe_id)
371 if balance < payout.account_amount:
372 log.info(
373 (
374 "The Stripe Connect account doesn't have enough balance "
375 "to make the payout yet"
376 ),
377 payout_id=str(payout.id),
378 account_id=str(account.id),
379 balance=balance,
380 payout_amount=payout.account_amount,
381 )
382 return payout
384 # Trigger a payout on the Stripe Connect account
385 stripe_payout = await stripe_service.create_payout(
386 stripe_account=account.stripe_id,
387 amount=payout.account_amount,
388 currency=payout.account_currency,
389 metadata={
390 "payout_id": str(payout.id),
391 },
392 )
394 repository = PayoutRepository.from_session(session)
395 return await repository.update(
396 payout,
397 update_dict={"processor_id": stripe_payout.id},
398 )
400 async def trigger_invoice_generation( 1a
401 self,
402 session: AsyncSession,
403 payout: Payout,
404 payout_generate_invoice: PayoutGenerateInvoice,
405 ) -> Payout:
406 if payout.is_invoice_generated:
407 raise InvoiceAlreadyExists(payout)
409 if payout.status != PayoutStatus.succeeded:
410 raise PayoutNotSucceeded(payout)
412 account = payout.account
413 if account.billing_name is None or account.billing_address is None:
414 raise MissingInvoiceBillingDetails(payout)
416 repository = PayoutRepository.from_session(session)
417 if payout_generate_invoice.invoice_number is not None:
418 existing_payout = await repository.get_by_account_and_invoice_number(
419 account.id, payout_generate_invoice.invoice_number
420 )
421 if existing_payout is not None and existing_payout.id != payout.id:
422 raise PolarRequestValidationError(
423 [
424 {
425 "type": "value_error",
426 "loc": ("body", "invoice_number"),
427 "msg": "An invoice with this number already exists.",
428 "input": payout_generate_invoice.invoice_number,
429 }
430 ]
431 )
432 payout = await repository.update(
433 payout,
434 update_dict={"invoice_number": payout_generate_invoice.invoice_number},
435 )
437 enqueue_job("payout.invoice", payout_id=payout.id)
439 return payout
441 async def generate_invoice(self, session: AsyncSession, payout: Payout) -> Payout: 1a
442 invoice_path = await invoice_service.create_payout_invoice(session, payout)
443 repository = PayoutRepository.from_session(session)
444 payout = await repository.update(
445 payout, update_dict={"invoice_path": invoice_path}
446 )
448 organization_repository = OrganizationRepository.from_session(session)
449 account_organizations = await organization_repository.get_all_by_account(
450 payout.account_id
451 )
452 for organization in account_organizations:
453 await eventstream_publish(
454 "payout.invoice_generated",
455 {"payout_id": payout.id},
456 organization_id=organization.id,
457 )
459 return payout
461 async def get_invoice(self, payout: Payout) -> PayoutInvoice: 1a
462 if not payout.is_invoice_generated:
463 raise InvoiceDoesNotExist(payout)
465 url, _ = await invoice_service.get_payout_invoice_url(payout)
466 return PayoutInvoice(url=url)
468 async def get_csv( 1a
469 self, session: AsyncSession, sessionmaker: AsyncSessionMaker, payout: Payout
470 ) -> AsyncIterable[str]:
471 payout_transaction_repository = PayoutTransactionRepository.from_session(
472 session
473 )
474 payout_transaction = await payout_transaction_repository.get_by_payout_id(
475 payout.id
476 )
477 assert payout_transaction is not None
479 transaction_repository = TransactionRepository.from_session(session)
480 statement = transaction_repository.get_paid_transactions_statement(
481 payout_transaction.id
482 )
484 csv_writer = IterableCSVWriter(dialect="excel")
485 yield csv_writer.getrow(
486 (
487 "Date",
488 "Payout ID",
489 "Transaction ID",
490 "Description",
491 "Currency",
492 "Amount",
493 "Payout Total",
494 "Account Currency",
495 "Account Payout Total",
496 )
497 )
499 # StreamingResponse is running its own async task to exhaust the iterator
500 # Thus, rely on the main session generated by the FastAPI dependency leads to
501 # garbage collection problems.
502 # We create a new session to avoid this.
503 async with sessionmaker() as sub_session:
504 transactions = await sub_session.stream_scalars(
505 statement,
506 execution_options={"yield_per": settings.DATABASE_STREAM_YIELD_PER},
507 )
508 async for transaction in transactions:
509 description = ""
510 if transaction.platform_fee_type is not None:
511 if transaction.platform_fee_type == "platform":
512 description = "Polar fee"
513 else:
514 description = (
515 f"Payment processor fee ({transaction.platform_fee_type})"
516 )
517 elif transaction.pledge is not None:
518 description = f"Pledge to {transaction.pledge.issue_reference}"
519 elif transaction.order is not None:
520 description = transaction.order.description
522 transaction_id = (
523 str(transaction.id)
524 if transaction.incurred_by_transaction_id is None
525 else str(transaction.incurred_by_transaction_id)
526 )
528 yield csv_writer.getrow(
529 (
530 transaction.created_at.isoformat(),
531 str(payout.id),
532 transaction_id,
533 description,
534 transaction.currency,
535 transaction.amount / 100,
536 abs(payout.amount / 100),
537 payout.account_currency,
538 abs(payout.account_amount / 100),
539 )
540 )
542 async def _get_next_invoice_number( 1a
543 self, session: AsyncSession, account: Account, increment: int = 1
544 ) -> str:
545 repository = PayoutRepository.from_session(session)
546 payouts_count = await repository.count_by_account(account.id)
547 invoice_number = (
548 f"{settings.PAYOUT_INVOICES_PREFIX}{payouts_count + increment:04d}"
549 )
550 existing_payout = await repository.get_by_account_and_invoice_number(
551 account.id, invoice_number
552 )
553 if existing_payout is not None:
554 return await self._get_next_invoice_number(
555 session, account, increment=increment + 1
556 )
557 return invoice_number
560payout = PayoutService() 1a