Coverage for polar/payout/service.py: 24%

243 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import datetime 1a

2import uuid 1a

3from collections.abc import AsyncIterable, Sequence 1a

4from typing import Any, cast 1a

5 

6import stripe as stripe_lib 1a

7import structlog 1a

8 

9from polar.auth.models import AuthSubject, User 1a

10from polar.config import settings 1a

11from polar.enums import AccountType 1a

12from polar.eventstream.service import publish as eventstream_publish 1a

13from polar.exceptions import PolarError, PolarRequestValidationError 1a

14from polar.integrations.stripe.service import stripe as stripe_service 1a

15from polar.integrations.stripe.utils import get_expandable_id 1a

16from polar.invoice.service import invoice as invoice_service 1a

17from polar.kit.csv import IterableCSVWriter 1a

18from polar.kit.db.postgres import AsyncSessionMaker 1a

19from polar.kit.pagination import PaginationParams 1a

20from polar.kit.sorting import Sorting 1a

21from polar.locker import Locker 1a

22from polar.logging import Logger 1a

23from polar.models import Account, Payout 1a

24from polar.models.payout import PayoutStatus 1a

25from polar.organization.repository import OrganizationRepository 1a

26from polar.postgres import AsyncSession 1a

27from polar.transaction.repository import ( 1a

28 PayoutTransactionRepository, 

29 TransactionRepository, 

30) 

31from polar.transaction.service.payout import ( 1a

32 payout_transaction as payout_transaction_service, 

33) 

34from polar.transaction.service.platform_fee import PayoutAmountTooLow 1a

35from polar.transaction.service.platform_fee import ( 1a

36 platform_fee_transaction as platform_fee_transaction_service, 

37) 

38from polar.transaction.service.transaction import transaction as transaction_service 1a

39from polar.worker import enqueue_job 1a

40 

41from .repository import PayoutRepository 1a

42from .schemas import PayoutEstimate, PayoutGenerateInvoice, PayoutInvoice 1a

43from .sorting import PayoutSortProperty 1a

44 

45log: Logger = structlog.get_logger() 1a

46 

47 

48class PayoutError(PolarError): ... 1a

49 

50 

51class InsufficientBalance(PayoutError): 1a

52 def __init__(self, account: Account, balance: int) -> None: 1a

53 self.account = account 

54 self.balance = balance 

55 message = "Your account has an insufficient balance to make a payout." 

56 super().__init__(message, 400) 

57 

58 

59class UnderReviewAccount(PayoutError): 1a

60 def __init__(self, account: Account) -> None: 1a

61 self.account = account 

62 message = "Your account is under review and can't receive payouts." 

63 super().__init__(message, 403) 

64 

65 

66class NotReadyAccount(PayoutError): 1a

67 def __init__(self, account: Account) -> None: 1a

68 self.account = account 

69 message = "Your account is not ready." 

70 super().__init__(message, 403) 

71 

72 

73class PendingPayoutCreation(PayoutError): 1a

74 def __init__(self, account: Account) -> None: 1a

75 self.account = account 

76 message = f"A payout is already being created for the account {account.id}." 

77 super().__init__(message, 409) 

78 

79 

80class PayoutDoesNotExist(PayoutError): 1a

81 def __init__(self, payout_id: str) -> None: 1a

82 self.payout_id = payout_id 

83 message = ( 

84 f"Received payout {payout_id} from Stripe, " 

85 "but it's not associated to a Payout." 

86 ) 

87 super().__init__(message, 404) 

88 

89 

90class InvoiceAlreadyExists(PayoutError): 1a

91 def __init__(self, payout: Payout) -> None: 1a

92 self.payout = payout 

93 message = f"An invoice already exists for payout {payout.id}." 

94 super().__init__(message, 409) 

95 

96 

97class PayoutNotSucceeded(PayoutError): 1a

98 def __init__(self, payout: Payout) -> None: 1a

99 self.payout = payout 

100 message = ( 

101 f"Can't generate an invoice for payout {payout.id} because " 

102 "it has not succeeded yet." 

103 ) 

104 super().__init__(message, 400) 

105 

106 

107class MissingInvoiceBillingDetails(PayoutError): 1a

108 def __init__(self, payout: Payout) -> None: 1a

109 self.payout = payout 

110 message = ( 

111 "You must provide billing details for the account to generate an invoice." 

112 ) 

113 super().__init__(message, 400) 

114 

115 

116class InvoiceDoesNotExist(PayoutError): 1a

117 def __init__(self, payout: Payout) -> None: 1a

118 self.payout = payout 

119 message = f"Invoice does not exist for payout {payout.id}." 

120 super().__init__(message, 404) 

121 

122 

123class PayoutAlreadyTriggered(PayoutError): 1a

124 def __init__(self, payout: Payout) -> None: 1a

125 self.payout = payout 

126 message = f"Payout {payout.id} has already been triggered." 

127 super().__init__(message) 

128 

129 

130class PayoutService: 1a

131 async def list( 1a

132 self, 

133 session: AsyncSession, 

134 auth_subject: AuthSubject[User], 

135 *, 

136 account_id: Sequence[uuid.UUID] | None = None, 

137 status: Sequence[PayoutStatus] | None = None, 

138 pagination: PaginationParams, 

139 sorting: list[Sorting[PayoutSortProperty]] = [ 

140 (PayoutSortProperty.created_at, False) 

141 ], 

142 ) -> tuple[Sequence[Payout], int]: 

143 repository = PayoutRepository.from_session(session) 

144 statement = repository.get_readable_statement(auth_subject).options( 

145 *repository.get_eager_options() 

146 ) 

147 

148 if account_id is not None: 

149 statement = statement.where(Payout.account_id.in_(account_id)) 

150 

151 if status is not None: 

152 statement = statement.where(Payout.status.in_(status)) 

153 

154 statement = repository.apply_sorting(statement, sorting) 

155 

156 return await repository.paginate( 

157 statement, limit=pagination.limit, page=pagination.page 

158 ) 

159 

160 async def get( 1a

161 self, 

162 session: AsyncSession, 

163 auth_subject: AuthSubject[User], 

164 id: uuid.UUID, 

165 ) -> Payout | None: 

166 repository = PayoutRepository.from_session(session) 

167 statement = ( 

168 repository.get_readable_statement(auth_subject) 

169 .where(Payout.id == id) 

170 .options(*repository.get_eager_options()) 

171 ) 

172 return await repository.get_one_or_none(statement) 

173 

174 async def estimate( 1a

175 self, session: AsyncSession, *, account: Account 

176 ) -> PayoutEstimate: 

177 if account.is_under_review(): 

178 raise UnderReviewAccount(account) 

179 if not account.is_payout_ready(): 

180 raise NotReadyAccount(account) 

181 

182 balance_amount = await transaction_service.get_transactions_sum( 

183 session, account.id 

184 ) 

185 if balance_amount < settings.get_minimum_payout_for_currency(account.currency): 

186 raise InsufficientBalance(account, balance_amount) 

187 

188 try: 

189 payout_fees = await platform_fee_transaction_service.get_payout_fees( 

190 session, account=account, balance_amount=balance_amount 

191 ) 

192 except PayoutAmountTooLow as e: 

193 raise InsufficientBalance(account, balance_amount) from e 

194 

195 return PayoutEstimate( 

196 account_id=account.id, 

197 gross_amount=balance_amount, 

198 fees_amount=sum(fee for _, fee in payout_fees), 

199 net_amount=balance_amount - sum(fee for _, fee in payout_fees), 

200 ) 

201 

202 async def create( 1a

203 self, session: AsyncSession, locker: Locker, *, account: Account 

204 ) -> Payout: 

205 lock_name = f"payout:{account.id}" 

206 if await locker.is_locked(lock_name): 

207 raise PendingPayoutCreation(account) 

208 

209 async with locker.lock(lock_name, timeout=60, blocking_timeout=1): 

210 if account.is_under_review(): 

211 raise UnderReviewAccount(account) 

212 if not account.is_payout_ready(): 

213 raise NotReadyAccount(account) 

214 

215 balance_amount = await transaction_service.get_transactions_sum( 

216 session, account.id 

217 ) 

218 if balance_amount < settings.get_minimum_payout_for_currency( 

219 account.currency 

220 ): 

221 raise InsufficientBalance(account, balance_amount) 

222 

223 try: 

224 ( 

225 balance_amount_after_fees, 

226 payout_fees_balances, 

227 ) = await platform_fee_transaction_service.create_payout_fees_balances( 

228 session, account=account, balance_amount=balance_amount 

229 ) 

230 except PayoutAmountTooLow as e: 

231 raise InsufficientBalance(account, balance_amount) from e 

232 

233 repository = PayoutRepository.from_session(session) 

234 payout = await repository.create( 

235 Payout( 

236 processor=account.account_type, 

237 currency="usd", # FIXME: Main Polar currency 

238 amount=balance_amount_after_fees, 

239 fees_amount=balance_amount - balance_amount_after_fees, 

240 account_currency=account.currency, 

241 account_amount=balance_amount_after_fees, 

242 account=account, 

243 invoice_number=await self._get_next_invoice_number( 

244 session, account 

245 ), 

246 ) 

247 ) 

248 transaction = await payout_transaction_service.create( 

249 session, payout, payout_fees_balances 

250 ) 

251 

252 if payout.currency != payout.account_currency: 

253 await repository.update( 

254 payout, 

255 update_dict={"account_amount": -transaction.account_amount}, 

256 ) 

257 

258 enqueue_job("payout.created", payout_id=payout.id) 

259 

260 return payout 

261 

262 async def transfer_stripe(self, session: AsyncSession, payout: Payout) -> Payout: 1a

263 """ 

264 The Stripe payout is a two-steps process: 

265 

266 1. Make the transfer to the Stripe Connect account 

267 2. Trigger a payout on the Stripe Connect account, 

268 but later once the balance is actually available. 

269 

270 This function performs the first step. 

271 """ 

272 account = payout.account 

273 assert account.stripe_id is not None 

274 

275 payout_transaction_repository = PayoutTransactionRepository.from_session( 

276 session 

277 ) 

278 transaction = await payout_transaction_repository.get_by_payout_id(payout.id) 

279 assert transaction is not None 

280 

281 stripe_transfer = await stripe_service.transfer( 

282 account.stripe_id, 

283 payout.amount, 

284 metadata={ 

285 "payout_id": str(payout.id), 

286 "payout_transaction_id": str(transaction.id), 

287 }, 

288 idempotency_key=f"payout-{payout.id}", 

289 ) 

290 

291 transaction.transfer_id = stripe_transfer.id 

292 

293 # Different source and destination currencies: get the converted amount 

294 account_amount = payout.account_amount 

295 if transaction.currency != transaction.account_currency: 

296 assert stripe_transfer.destination_payment is not None 

297 stripe_destination_charge = await stripe_service.get_charge( 

298 get_expandable_id(stripe_transfer.destination_payment), 

299 stripe_account=account.stripe_id, 

300 expand=["balance_transaction"], 

301 ) 

302 # Case where the charge don't lead to a balance transaction, 

303 # e.g. when the converted amount is 0 

304 if stripe_destination_charge.balance_transaction is None: 

305 account_amount = 0 

306 else: 

307 stripe_destination_balance_transaction = cast( 

308 stripe_lib.BalanceTransaction, 

309 stripe_destination_charge.balance_transaction, 

310 ) 

311 account_amount = stripe_destination_balance_transaction.amount 

312 

313 await payout_transaction_repository.update( 

314 transaction, 

315 update_dict={ 

316 "account_amount": -account_amount, 

317 "transfer_id": stripe_transfer.id, 

318 }, 

319 ) 

320 

321 payout_repository = PayoutRepository.from_session(session) 

322 payout = await payout_repository.update( 

323 payout, update_dict={"account_amount": account_amount} 

324 ) 

325 

326 return payout 

327 

328 async def update_from_stripe( 1a

329 self, session: AsyncSession, stripe_payout: stripe_lib.Payout 

330 ) -> Payout: 

331 repository = PayoutRepository.from_session(session) 

332 payout = await repository.get_by_processor_id( 

333 AccountType.stripe, stripe_payout.id 

334 ) 

335 if payout is None: 

336 raise PayoutDoesNotExist(stripe_payout.id) 

337 

338 status = PayoutStatus.from_stripe(stripe_payout.status) 

339 update_dict: dict[str, Any] = {"status": status} 

340 if status == PayoutStatus.succeeded and stripe_payout.arrival_date is not None: 

341 update_dict["paid_at"] = datetime.datetime.fromtimestamp( 

342 stripe_payout.arrival_date, datetime.UTC 

343 ) 

344 return await repository.update(payout, update_dict=update_dict) 

345 

346 async def trigger_stripe_payouts(self, session: AsyncSession) -> None: 1a

347 """ 

348 The Stripe payout is a two-steps process: 

349 

350 1. Make the transfer to the Stripe Connect account. 

351 2. Trigger a payout on the Stripe Connect account, 

352 but later once our safety delay is passed and the balance is actually available. 

353 

354 This function performs the second step and tries to trigger pending payouts, 

355 if balance is available. 

356 """ 

357 repository = PayoutRepository.from_session(session) 

358 for payout in await repository.get_all_stripe_pending(): 

359 enqueue_job("payout.trigger_stripe_payout", payout_id=payout.id) 

360 

361 async def trigger_stripe_payout( 1a

362 self, session: AsyncSession, payout: Payout 

363 ) -> Payout: 

364 if payout.processor_id is not None: 

365 raise PayoutAlreadyTriggered(payout) 

366 

367 account = payout.account 

368 assert account.stripe_id is not None 

369 _, balance = await stripe_service.retrieve_balance(account.stripe_id) 

370 

371 if balance < payout.account_amount: 

372 log.info( 

373 ( 

374 "The Stripe Connect account doesn't have enough balance " 

375 "to make the payout yet" 

376 ), 

377 payout_id=str(payout.id), 

378 account_id=str(account.id), 

379 balance=balance, 

380 payout_amount=payout.account_amount, 

381 ) 

382 return payout 

383 

384 # Trigger a payout on the Stripe Connect account 

385 stripe_payout = await stripe_service.create_payout( 

386 stripe_account=account.stripe_id, 

387 amount=payout.account_amount, 

388 currency=payout.account_currency, 

389 metadata={ 

390 "payout_id": str(payout.id), 

391 }, 

392 ) 

393 

394 repository = PayoutRepository.from_session(session) 

395 return await repository.update( 

396 payout, 

397 update_dict={"processor_id": stripe_payout.id}, 

398 ) 

399 

400 async def trigger_invoice_generation( 1a

401 self, 

402 session: AsyncSession, 

403 payout: Payout, 

404 payout_generate_invoice: PayoutGenerateInvoice, 

405 ) -> Payout: 

406 if payout.is_invoice_generated: 

407 raise InvoiceAlreadyExists(payout) 

408 

409 if payout.status != PayoutStatus.succeeded: 

410 raise PayoutNotSucceeded(payout) 

411 

412 account = payout.account 

413 if account.billing_name is None or account.billing_address is None: 

414 raise MissingInvoiceBillingDetails(payout) 

415 

416 repository = PayoutRepository.from_session(session) 

417 if payout_generate_invoice.invoice_number is not None: 

418 existing_payout = await repository.get_by_account_and_invoice_number( 

419 account.id, payout_generate_invoice.invoice_number 

420 ) 

421 if existing_payout is not None and existing_payout.id != payout.id: 

422 raise PolarRequestValidationError( 

423 [ 

424 { 

425 "type": "value_error", 

426 "loc": ("body", "invoice_number"), 

427 "msg": "An invoice with this number already exists.", 

428 "input": payout_generate_invoice.invoice_number, 

429 } 

430 ] 

431 ) 

432 payout = await repository.update( 

433 payout, 

434 update_dict={"invoice_number": payout_generate_invoice.invoice_number}, 

435 ) 

436 

437 enqueue_job("payout.invoice", payout_id=payout.id) 

438 

439 return payout 

440 

441 async def generate_invoice(self, session: AsyncSession, payout: Payout) -> Payout: 1a

442 invoice_path = await invoice_service.create_payout_invoice(session, payout) 

443 repository = PayoutRepository.from_session(session) 

444 payout = await repository.update( 

445 payout, update_dict={"invoice_path": invoice_path} 

446 ) 

447 

448 organization_repository = OrganizationRepository.from_session(session) 

449 account_organizations = await organization_repository.get_all_by_account( 

450 payout.account_id 

451 ) 

452 for organization in account_organizations: 

453 await eventstream_publish( 

454 "payout.invoice_generated", 

455 {"payout_id": payout.id}, 

456 organization_id=organization.id, 

457 ) 

458 

459 return payout 

460 

461 async def get_invoice(self, payout: Payout) -> PayoutInvoice: 1a

462 if not payout.is_invoice_generated: 

463 raise InvoiceDoesNotExist(payout) 

464 

465 url, _ = await invoice_service.get_payout_invoice_url(payout) 

466 return PayoutInvoice(url=url) 

467 

468 async def get_csv( 1a

469 self, session: AsyncSession, sessionmaker: AsyncSessionMaker, payout: Payout 

470 ) -> AsyncIterable[str]: 

471 payout_transaction_repository = PayoutTransactionRepository.from_session( 

472 session 

473 ) 

474 payout_transaction = await payout_transaction_repository.get_by_payout_id( 

475 payout.id 

476 ) 

477 assert payout_transaction is not None 

478 

479 transaction_repository = TransactionRepository.from_session(session) 

480 statement = transaction_repository.get_paid_transactions_statement( 

481 payout_transaction.id 

482 ) 

483 

484 csv_writer = IterableCSVWriter(dialect="excel") 

485 yield csv_writer.getrow( 

486 ( 

487 "Date", 

488 "Payout ID", 

489 "Transaction ID", 

490 "Description", 

491 "Currency", 

492 "Amount", 

493 "Payout Total", 

494 "Account Currency", 

495 "Account Payout Total", 

496 ) 

497 ) 

498 

499 # StreamingResponse is running its own async task to exhaust the iterator 

500 # Thus, rely on the main session generated by the FastAPI dependency leads to 

501 # garbage collection problems. 

502 # We create a new session to avoid this. 

503 async with sessionmaker() as sub_session: 

504 transactions = await sub_session.stream_scalars( 

505 statement, 

506 execution_options={"yield_per": settings.DATABASE_STREAM_YIELD_PER}, 

507 ) 

508 async for transaction in transactions: 

509 description = "" 

510 if transaction.platform_fee_type is not None: 

511 if transaction.platform_fee_type == "platform": 

512 description = "Polar fee" 

513 else: 

514 description = ( 

515 f"Payment processor fee ({transaction.platform_fee_type})" 

516 ) 

517 elif transaction.pledge is not None: 

518 description = f"Pledge to {transaction.pledge.issue_reference}" 

519 elif transaction.order is not None: 

520 description = transaction.order.description 

521 

522 transaction_id = ( 

523 str(transaction.id) 

524 if transaction.incurred_by_transaction_id is None 

525 else str(transaction.incurred_by_transaction_id) 

526 ) 

527 

528 yield csv_writer.getrow( 

529 ( 

530 transaction.created_at.isoformat(), 

531 str(payout.id), 

532 transaction_id, 

533 description, 

534 transaction.currency, 

535 transaction.amount / 100, 

536 abs(payout.amount / 100), 

537 payout.account_currency, 

538 abs(payout.account_amount / 100), 

539 ) 

540 ) 

541 

542 async def _get_next_invoice_number( 1a

543 self, session: AsyncSession, account: Account, increment: int = 1 

544 ) -> str: 

545 repository = PayoutRepository.from_session(session) 

546 payouts_count = await repository.count_by_account(account.id) 

547 invoice_number = ( 

548 f"{settings.PAYOUT_INVOICES_PREFIX}{payouts_count + increment:04d}" 

549 ) 

550 existing_payout = await repository.get_by_account_and_invoice_number( 

551 account.id, invoice_number 

552 ) 

553 if existing_payout is not None: 

554 return await self._get_next_invoice_number( 

555 session, account, increment=increment + 1 

556 ) 

557 return invoice_number 

558 

559 

560payout = PayoutService() 1a