Coverage for polar/pledge/service.py: 15%

218 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from __future__ import annotations 1a

2 

3import datetime 1a

4from collections.abc import Sequence 1a

5from datetime import timedelta 1a

6from uuid import UUID 1a

7 

8import structlog 1a

9from sqlalchemy import func, or_ 1a

10from sqlalchemy.orm import ( 1a

11 joinedload, 

12) 

13 

14from polar.account.repository import AccountRepository 1a

15from polar.exceptions import ( 1a

16 NotPermitted, 

17 ResourceNotFound, 

18) 

19from polar.integrations.stripe.schemas import PaymentIntentSuccessWebhook 1a

20from polar.kit.services import ResourceServiceReader 1a

21from polar.kit.utils import generate_uuid, utc_now 1a

22from polar.models.issue_reward import IssueReward 1a

23from polar.models.pledge import Pledge, PledgeState, PledgeType 1a

24from polar.models.pledge_transaction import PledgeTransaction, PledgeTransactionType 1a

25from polar.models.user import User 1a

26from polar.models.user_organization import UserOrganization 1a

27from polar.postgres import AsyncSession, sql 1a

28from polar.transaction.service.balance import ( 1a

29 balance_transaction as balance_transaction_service, 

30) 

31from polar.transaction.service.platform_fee import ( 1a

32 platform_fee_transaction as platform_fee_transaction_service, 

33) 

34 

35from .hooks import ( 1a

36 PledgeHook, 

37 pledge_disputed, 

38 pledge_updated, 

39) 

40 

41log = structlog.get_logger() 1a

42 

43 

44class PledgeService(ResourceServiceReader[Pledge]): 1a

45 async def get_with_loaded( 1a

46 self, 

47 session: AsyncSession, 

48 pledge_id: UUID, 

49 ) -> Pledge | None: 

50 statement = ( 

51 sql.select(Pledge) 

52 .options( 

53 joinedload(Pledge.user), 

54 joinedload(Pledge.by_organization), 

55 joinedload(Pledge.on_behalf_of_organization), 

56 joinedload(Pledge.created_by_user), 

57 ) 

58 .filter(Pledge.id == pledge_id) 

59 ) 

60 res = await session.execute(statement) 

61 return res.scalars().unique().one_or_none() 

62 

63 async def get_by_payment_id( 1a

64 self, session: AsyncSession, payment_id: str 

65 ) -> Pledge | None: 

66 statement = sql.select(Pledge).filter(Pledge.payment_id == payment_id) 

67 res = await session.execute(statement) 

68 return res.scalars().unique().one_or_none() 

69 

70 async def list_by( 1a

71 self, 

72 session: AsyncSession, 

73 organization_ids: list[UUID] | None = None, 

74 pledging_user: UUID | None = None, 

75 pledging_organization: UUID | None = None, 

76 load_pledger: bool = False, 

77 all_states: bool = False, 

78 ) -> Sequence[Pledge]: 

79 statement = sql.select(Pledge) 

80 

81 if not all_states: 

82 statement = statement.where( 

83 Pledge.state.in_(PledgeState.active_states()), 

84 ) 

85 

86 if organization_ids: 

87 statement = statement.where(Pledge.organization_id.in_(organization_ids)) 

88 

89 if pledging_user: 

90 statement = statement.where(Pledge.by_user_id == pledging_user) 

91 

92 if pledging_organization: 

93 statement = statement.where( 

94 or_( 

95 Pledge.by_organization_id == pledging_organization, 

96 Pledge.on_behalf_of_organization_id == pledging_organization, 

97 ) 

98 ) 

99 

100 if load_pledger: 

101 statement = statement.options( 

102 joinedload(Pledge.by_organization), 

103 joinedload(Pledge.user), 

104 joinedload(Pledge.on_behalf_of_organization), 

105 joinedload(Pledge.created_by_user), 

106 ) 

107 

108 statement = statement.order_by(Pledge.created_at) 

109 

110 res = await session.execute(statement) 

111 return res.scalars().unique().all() 

112 

113 async def get_by_issue_reference( 1a

114 self, 

115 session: AsyncSession, 

116 issue_reference: str, 

117 ) -> Sequence[Pledge]: 

118 statement = ( 

119 sql.select(Pledge) 

120 .options( 

121 joinedload(Pledge.organization), 

122 ) 

123 .where( 

124 Pledge.state.in_(PledgeState.active_states()), 

125 Pledge.issue_reference == issue_reference, 

126 Pledge.payment_id.is_not(None), 

127 ) 

128 .order_by(Pledge.created_at.desc()) 

129 ) 

130 res = await session.execute(statement) 

131 return res.scalars().unique().all() 

132 

133 async def list_by_pledging_user( 1a

134 self, session: AsyncSession, user_id: UUID 

135 ) -> Sequence[Pledge]: 

136 # Deprecated, please use list_by directly 

137 return await self.list_by(session, pledging_user=user_id) 

138 

139 async def connect_backer( 1a

140 self, 

141 session: AsyncSession, 

142 payment_intent_id: str, 

143 backer: User, 

144 ) -> None: 

145 pledge = await self.get_by_payment_id(session, payment_id=payment_intent_id) 

146 if not pledge: 

147 raise ResourceNotFound( 

148 f"Pledge not found with payment_id: {payment_intent_id}" 

149 ) 

150 

151 # This pledge is already connected 

152 if pledge.by_user_id or pledge.by_organization_id: 

153 return None 

154 

155 pledge.by_user_id = backer.id 

156 session.add(pledge) 

157 

158 async def handle_payment_intent_success( 1a

159 self, 

160 session: AsyncSession, 

161 payload: PaymentIntentSuccessWebhook, 

162 ) -> None: 

163 pledge = await self.get_by_payment_id(session, payload.id) 

164 if not pledge: 

165 raise ResourceNotFound(f"Pledge not found with payment_id: {payload.id}") 

166 

167 log.info( 

168 "handle_payment_intent_success", 

169 payment_id=payload.id, 

170 ) 

171 

172 # Log Transaction 

173 session.add( 

174 PledgeTransaction( 

175 pledge_id=pledge.id, 

176 type=PledgeTransactionType.pledge, 

177 amount=payload.amount_received, 

178 transaction_id=payload.latest_charge, 

179 ) 

180 ) 

181 await session.commit() 

182 

183 if pledge.type == PledgeType.pay_on_completion: 

184 return await self.handle_paid_invoice( 

185 session, 

186 payment_id=payload.id, 

187 amount_received=payload.amount_received, 

188 transaction_id=payload.latest_charge, 

189 ) 

190 

191 raise Exception(f"unhandeled pledge type type: {pledge.type}") 

192 

193 async def handle_paid_invoice( 1a

194 self, 

195 session: AsyncSession, 

196 payment_id: str, 

197 amount_received: int, 

198 transaction_id: str, 

199 ) -> None: 

200 pledge = await self.get_by_payment_id(session, payment_id) 

201 if not pledge: 

202 raise ResourceNotFound(f"Pledge not found with payment_id: {payment_id}") 

203 

204 if pledge.state not in PledgeState.to_pending_states(): 

205 raise Exception(f"pledge is in unexpected state: {pledge.state}") 

206 

207 if pledge.type != PledgeType.pay_on_completion: 

208 raise Exception(f"pledge is of unexpected type: {pledge.type}") 

209 

210 stmt = ( 

211 sql.Update(Pledge) 

212 .where( 

213 Pledge.id == pledge.id, 

214 Pledge.state.in_(PledgeState.to_pending_states()), 

215 ) 

216 .values( 

217 state=PledgeState.pending, 

218 amount_received=amount_received, 

219 ) 

220 ) 

221 await session.execute(stmt) 

222 

223 session.add( 

224 PledgeTransaction( 

225 pledge_id=pledge.id, 

226 type=PledgeTransactionType.pledge, 

227 amount=amount_received, 

228 transaction_id=transaction_id, 

229 ) 

230 ) 

231 await session.commit() 

232 await self.after_pledge_updated(session, pledge) 

233 

234 async def refund_by_payment_id( 1a

235 self, session: AsyncSession, payment_id: str, amount: int, transaction_id: str 

236 ) -> None: 

237 pledge = await self.get_by_payment_id(session, payment_id) 

238 if not pledge: 

239 raise ResourceNotFound(f"Pledge not found with payment_id: {payment_id}") 

240 

241 if pledge.state not in PledgeState.to_refunded_states(): 

242 raise NotPermitted("Refunding error, unexpected pledge status") 

243 

244 pledge.refunded_at = utc_now() 

245 if amount == pledge.amount: 

246 pledge.state = PledgeState.refunded 

247 elif amount < pledge.amount: 

248 pledge.amount -= amount 

249 else: 

250 raise NotPermitted("Refunding error, unexpected amount!") 

251 session.add(pledge) 

252 session.add( 

253 PledgeTransaction( 

254 pledge_id=pledge.id, 

255 type=PledgeTransactionType.refund, 

256 amount=amount, 

257 transaction_id=transaction_id, 

258 ) 

259 ) 

260 await self.after_pledge_updated(session, pledge) 

261 

262 async def mark_charge_disputed_by_payment_id( 1a

263 self, session: AsyncSession, payment_id: str, amount: int, transaction_id: str 

264 ) -> None: 

265 pledge = await self.get_by_payment_id(session, payment_id) 

266 if not pledge: 

267 raise ResourceNotFound(f"Pledge not found with payment_id: {payment_id}") 

268 

269 # charge_disputed (aka chargebacks) can be triggered from _any_ pledge state 

270 # not checking existing state here 

271 

272 pledge.state = PledgeState.charge_disputed 

273 session.add(pledge) 

274 session.add( 

275 PledgeTransaction( 

276 pledge_id=pledge.id, 

277 type=PledgeTransactionType.disputed, 

278 amount=amount, 

279 transaction_id=transaction_id, 

280 ) 

281 ) 

282 await session.commit() 

283 await self.after_pledge_updated(session, pledge) 

284 

285 async def get_reward( 1a

286 self, session: AsyncSession, split_id: UUID 

287 ) -> IssueReward | None: 

288 stmt = sql.select(IssueReward).where(IssueReward.id == split_id) 

289 res = await session.execute(stmt) 

290 return res.scalars().unique().one_or_none() 

291 

292 async def get_transaction( 1a

293 self, 

294 session: AsyncSession, 

295 type: PledgeTransactionType | None = None, 

296 pledge_id: UUID | None = None, 

297 issue_reward_id: UUID | None = None, 

298 ) -> PledgeTransaction | None: 

299 stmt = sql.select(PledgeTransaction) 

300 

301 if type: 

302 stmt = stmt.where(PledgeTransaction.type == type) 

303 if pledge_id: 

304 stmt = stmt.where(PledgeTransaction.pledge_id == pledge_id) 

305 if issue_reward_id: 

306 stmt = stmt.where(PledgeTransaction.issue_reward_id == issue_reward_id) 

307 

308 res = await session.execute(stmt) 

309 return res.scalars().unique().one_or_none() 

310 

311 async def transfer( 1a

312 self, session: AsyncSession, pledge_id: UUID, issue_reward_id: UUID 

313 ) -> None: 

314 pledge = await self.get(session, id=pledge_id) 

315 if not pledge: 

316 raise ResourceNotFound(f"Pledge not found with id: {pledge_id}") 

317 if pledge.state not in PledgeState.to_paid_states(): 

318 raise NotPermitted("Pledge is not in pending state") 

319 if pledge.scheduled_payout_at and pledge.scheduled_payout_at > utc_now(): 

320 raise NotPermitted( 

321 "Pledge is not ready for payput (still in dispute window)" 

322 ) 

323 

324 # get receiver 

325 split = await self.get_reward(session, issue_reward_id) 

326 if not split: 

327 raise ResourceNotFound(f"IssueReward not found with id: {issue_reward_id}") 

328 

329 if not split.user_id and not split.organization_id: 

330 raise NotPermitted( 

331 "Either user_id or organization_id must be set on the split to create a transfer" # noqa: E501 

332 ) 

333 

334 # sanity check 

335 if split.share_thousands < 0 or split.share_thousands > 1000: 

336 raise NotPermitted("unexpected split share") 

337 

338 # check that this transfer hasn't already been made! 

339 existing_trx = await self.get_transaction( 

340 session, pledge_id=pledge.id, issue_reward_id=split.id 

341 ) 

342 if existing_trx: 

343 raise NotPermitted( 

344 "A transfer for this pledge_id and issue_reward_id already exists, refusing to make another one" # noqa: E501 

345 ) 

346 

347 # pledge amount * the users share 

348 payout_amount = split.get_share_amount(pledge) 

349 

350 account_repository = AccountRepository.from_session(session) 

351 if split.user_id: 

352 pay_to_account = await account_repository.get_by_user(split.user_id) 

353 if pay_to_account is None: 

354 raise NotPermitted("Receiving user has no account") 

355 

356 elif split.organization_id: 

357 pay_to_account = await account_repository.get_by_organization( 

358 split.organization_id 

359 ) 

360 if pay_to_account is None: 

361 raise NotPermitted("Receiving organization has no account") 

362 else: 

363 raise NotPermitted("Unexpected split receiver") 

364 

365 assert pledge.payment_id is not None 

366 

367 balance_transactions = ( 

368 await balance_transaction_service.create_balance_from_payment_intent( 

369 session, 

370 source_account=None, 

371 destination_account=pay_to_account, 

372 payment_intent_id=pledge.payment_id, 

373 amount=payout_amount, 

374 pledge=pledge, 

375 issue_reward=split, 

376 ) 

377 ) 

378 await platform_fee_transaction_service.create_fees_reversal_balances( 

379 session, balance_transactions=balance_transactions 

380 ) 

381 

382 transaction = PledgeTransaction( 

383 pledge_id=pledge.id, 

384 type=PledgeTransactionType.transfer, 

385 amount=payout_amount, 

386 issue_reward_id=split.id, 

387 ) 

388 

389 session.add(transaction) 

390 await session.commit() 

391 

392 async def admin_transfer( 1a

393 self, 

394 session: AsyncSession, 

395 pledge_id: UUID, 

396 ) -> None: 

397 """ 

398 Transfer a pledge directly to the organization (100% of amount minus fees). 

399 Similar to the regular transfer method but without reward split logic. 

400 

401 Args: 

402 pledge_id: The pledge to transfer 

403 

404 Raises: 

405 PledgeError: If pledge is not in valid state for transfer 

406 """ 

407 pledge = await self.get(session, id=pledge_id) 

408 if not pledge: 

409 raise ResourceNotFound(f"Pledge not found with id: {pledge_id}") 

410 

411 # Check if pledge is pay_upfront type 

412 if pledge.type != PledgeType.pay_upfront: 

413 raise NotPermitted(f"Pledge is not pay_upfront type: {pledge.type}") 

414 

415 # Update state to mimic old automatic state transfer from GitHub events 

416 if pledge.state == PledgeState.created: 

417 pledge.state = PledgeState.pending 

418 

419 if not pledge.scheduled_payout_at: 

420 pledge.scheduled_payout_at = utc_now() - timedelta(seconds=10) 

421 

422 session.add(pledge) 

423 

424 # Create a 100% reward in admin to the receiving organization (unless it exists already) 

425 stmt = sql.select(IssueReward).where( 

426 IssueReward.organization_id == pledge.organization_id, 

427 IssueReward.issue_reference == pledge.issue_reference, 

428 ) 

429 res = await session.execute(stmt) 

430 reward = res.scalars().unique().one_or_none() 

431 if not reward: 

432 reward = IssueReward( 

433 id=generate_uuid(), 

434 issue_reference=pledge.issue_reference, 

435 share_thousands=1000, # 100% 

436 organization_id=pledge.organization_id, 

437 ) 

438 session.add(reward) 

439 

440 # Now we can proceed with a regular old-school transfer 

441 return await self.transfer( 

442 session, 

443 pledge_id=pledge.id, 

444 issue_reward_id=reward.id, 

445 ) 

446 

447 async def mark_disputed( 1a

448 self, 

449 session: AsyncSession, 

450 pledge_id: UUID, 

451 by_user_id: UUID, 

452 reason: str, 

453 ) -> None: 

454 pledge = await self.get(session, id=pledge_id) 

455 if not pledge: 

456 raise ResourceNotFound(f"Pledge not found with id: {pledge_id}") 

457 if pledge.state not in PledgeState.to_disputed_states(): 

458 raise NotPermitted(f"Pledge is unexpected state: {pledge.state}") 

459 

460 stmt = ( 

461 sql.update(Pledge) 

462 .where(Pledge.id == pledge_id) 

463 .values( 

464 state=PledgeState.disputed, 

465 dispute_reason=reason, 

466 disputed_at=datetime.datetime.now(), 

467 disputed_by_user_id=by_user_id, 

468 ) 

469 ) 

470 await session.execute(stmt) 

471 await session.commit() 

472 

473 await pledge_disputed.call(PledgeHook(session, pledge)) 

474 await self.after_pledge_updated(session, pledge) 

475 

476 async def after_pledge_updated( 1a

477 self, 

478 session: AsyncSession, 

479 pledge: Pledge, 

480 ) -> None: 

481 full_pledge = await self.get_with_loaded(session, pledge.id) 

482 assert full_pledge 

483 

484 await pledge_updated.call(PledgeHook(session, full_pledge)) 

485 

486 def user_can_admin_sender_pledge( 1a

487 self, user: User, pledge: Pledge, memberships: Sequence[UserOrganization] 

488 ) -> bool: 

489 """ 

490 Returns true if the User can modify the pledge on behalf of the entity that sent 

491 the pledge, such as disputing it. 

492 """ 

493 

494 if pledge.by_user_id == user.id: 

495 return True 

496 

497 if pledge.by_organization_id: 

498 for m in memberships: 

499 if m.organization_id == pledge.by_organization_id: 

500 return True 

501 

502 return False 

503 

504 async def sum_pledges_period( 1a

505 self, 

506 session: AsyncSession, 

507 organization_id: UUID, 

508 user_id: UUID | None = None, 

509 ) -> int: 

510 stmt = sql.select(func.sum(Pledge.amount)).where( 

511 Pledge.by_organization_id == organization_id 

512 ) 

513 

514 if user_id: 

515 stmt = stmt.where(Pledge.created_by_user_id == user_id) 

516 

517 now = utc_now() 

518 (start, end) = self.month_range(now) 

519 stmt = stmt.where( 

520 Pledge.created_at >= start, 

521 Pledge.created_at <= end, 

522 ) 

523 

524 ret = await session.execute(stmt) 

525 res = ret.scalars().one_or_none() 

526 

527 if not res: 

528 return 0 

529 

530 return res 

531 

532 """ 1a

533 month_range returns the first and the last second of the month that ts is in 

534 """ 

535 

536 def month_range( 1a

537 self, ts: datetime.datetime 

538 ) -> tuple[datetime.datetime, datetime.datetime]: 

539 # go to first day and second of the month 

540 start = ts.replace(day=1, hour=0, minute=0, second=0, microsecond=0) 

541 

542 # add 35 days to skip to the next month 

543 end = start + timedelta(days=35) 

544 # go to the first day of the next month 

545 end = end.replace(day=1) 

546 # go back one second to find the last second of the "current" month 

547 end = end - timedelta(seconds=1) 

548 

549 return (start, end) 

550 

551 

552pledge = PledgeService(Pledge) 1a