Coverage for polar/pledge/service.py: 15%
218 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1from __future__ import annotations 1a
3import datetime 1a
4from collections.abc import Sequence 1a
5from datetime import timedelta 1a
6from uuid import UUID 1a
8import structlog 1a
9from sqlalchemy import func, or_ 1a
10from sqlalchemy.orm import ( 1a
11 joinedload,
12)
14from polar.account.repository import AccountRepository 1a
15from polar.exceptions import ( 1a
16 NotPermitted,
17 ResourceNotFound,
18)
19from polar.integrations.stripe.schemas import PaymentIntentSuccessWebhook 1a
20from polar.kit.services import ResourceServiceReader 1a
21from polar.kit.utils import generate_uuid, utc_now 1a
22from polar.models.issue_reward import IssueReward 1a
23from polar.models.pledge import Pledge, PledgeState, PledgeType 1a
24from polar.models.pledge_transaction import PledgeTransaction, PledgeTransactionType 1a
25from polar.models.user import User 1a
26from polar.models.user_organization import UserOrganization 1a
27from polar.postgres import AsyncSession, sql 1a
28from polar.transaction.service.balance import ( 1a
29 balance_transaction as balance_transaction_service,
30)
31from polar.transaction.service.platform_fee import ( 1a
32 platform_fee_transaction as platform_fee_transaction_service,
33)
35from .hooks import ( 1a
36 PledgeHook,
37 pledge_disputed,
38 pledge_updated,
39)
41log = structlog.get_logger() 1a
44class PledgeService(ResourceServiceReader[Pledge]): 1a
45 async def get_with_loaded( 1a
46 self,
47 session: AsyncSession,
48 pledge_id: UUID,
49 ) -> Pledge | None:
50 statement = (
51 sql.select(Pledge)
52 .options(
53 joinedload(Pledge.user),
54 joinedload(Pledge.by_organization),
55 joinedload(Pledge.on_behalf_of_organization),
56 joinedload(Pledge.created_by_user),
57 )
58 .filter(Pledge.id == pledge_id)
59 )
60 res = await session.execute(statement)
61 return res.scalars().unique().one_or_none()
63 async def get_by_payment_id( 1a
64 self, session: AsyncSession, payment_id: str
65 ) -> Pledge | None:
66 statement = sql.select(Pledge).filter(Pledge.payment_id == payment_id)
67 res = await session.execute(statement)
68 return res.scalars().unique().one_or_none()
70 async def list_by( 1a
71 self,
72 session: AsyncSession,
73 organization_ids: list[UUID] | None = None,
74 pledging_user: UUID | None = None,
75 pledging_organization: UUID | None = None,
76 load_pledger: bool = False,
77 all_states: bool = False,
78 ) -> Sequence[Pledge]:
79 statement = sql.select(Pledge)
81 if not all_states:
82 statement = statement.where(
83 Pledge.state.in_(PledgeState.active_states()),
84 )
86 if organization_ids:
87 statement = statement.where(Pledge.organization_id.in_(organization_ids))
89 if pledging_user:
90 statement = statement.where(Pledge.by_user_id == pledging_user)
92 if pledging_organization:
93 statement = statement.where(
94 or_(
95 Pledge.by_organization_id == pledging_organization,
96 Pledge.on_behalf_of_organization_id == pledging_organization,
97 )
98 )
100 if load_pledger:
101 statement = statement.options(
102 joinedload(Pledge.by_organization),
103 joinedload(Pledge.user),
104 joinedload(Pledge.on_behalf_of_organization),
105 joinedload(Pledge.created_by_user),
106 )
108 statement = statement.order_by(Pledge.created_at)
110 res = await session.execute(statement)
111 return res.scalars().unique().all()
113 async def get_by_issue_reference( 1a
114 self,
115 session: AsyncSession,
116 issue_reference: str,
117 ) -> Sequence[Pledge]:
118 statement = (
119 sql.select(Pledge)
120 .options(
121 joinedload(Pledge.organization),
122 )
123 .where(
124 Pledge.state.in_(PledgeState.active_states()),
125 Pledge.issue_reference == issue_reference,
126 Pledge.payment_id.is_not(None),
127 )
128 .order_by(Pledge.created_at.desc())
129 )
130 res = await session.execute(statement)
131 return res.scalars().unique().all()
133 async def list_by_pledging_user( 1a
134 self, session: AsyncSession, user_id: UUID
135 ) -> Sequence[Pledge]:
136 # Deprecated, please use list_by directly
137 return await self.list_by(session, pledging_user=user_id)
139 async def connect_backer( 1a
140 self,
141 session: AsyncSession,
142 payment_intent_id: str,
143 backer: User,
144 ) -> None:
145 pledge = await self.get_by_payment_id(session, payment_id=payment_intent_id)
146 if not pledge:
147 raise ResourceNotFound(
148 f"Pledge not found with payment_id: {payment_intent_id}"
149 )
151 # This pledge is already connected
152 if pledge.by_user_id or pledge.by_organization_id:
153 return None
155 pledge.by_user_id = backer.id
156 session.add(pledge)
158 async def handle_payment_intent_success( 1a
159 self,
160 session: AsyncSession,
161 payload: PaymentIntentSuccessWebhook,
162 ) -> None:
163 pledge = await self.get_by_payment_id(session, payload.id)
164 if not pledge:
165 raise ResourceNotFound(f"Pledge not found with payment_id: {payload.id}")
167 log.info(
168 "handle_payment_intent_success",
169 payment_id=payload.id,
170 )
172 # Log Transaction
173 session.add(
174 PledgeTransaction(
175 pledge_id=pledge.id,
176 type=PledgeTransactionType.pledge,
177 amount=payload.amount_received,
178 transaction_id=payload.latest_charge,
179 )
180 )
181 await session.commit()
183 if pledge.type == PledgeType.pay_on_completion:
184 return await self.handle_paid_invoice(
185 session,
186 payment_id=payload.id,
187 amount_received=payload.amount_received,
188 transaction_id=payload.latest_charge,
189 )
191 raise Exception(f"unhandeled pledge type type: {pledge.type}")
193 async def handle_paid_invoice( 1a
194 self,
195 session: AsyncSession,
196 payment_id: str,
197 amount_received: int,
198 transaction_id: str,
199 ) -> None:
200 pledge = await self.get_by_payment_id(session, payment_id)
201 if not pledge:
202 raise ResourceNotFound(f"Pledge not found with payment_id: {payment_id}")
204 if pledge.state not in PledgeState.to_pending_states():
205 raise Exception(f"pledge is in unexpected state: {pledge.state}")
207 if pledge.type != PledgeType.pay_on_completion:
208 raise Exception(f"pledge is of unexpected type: {pledge.type}")
210 stmt = (
211 sql.Update(Pledge)
212 .where(
213 Pledge.id == pledge.id,
214 Pledge.state.in_(PledgeState.to_pending_states()),
215 )
216 .values(
217 state=PledgeState.pending,
218 amount_received=amount_received,
219 )
220 )
221 await session.execute(stmt)
223 session.add(
224 PledgeTransaction(
225 pledge_id=pledge.id,
226 type=PledgeTransactionType.pledge,
227 amount=amount_received,
228 transaction_id=transaction_id,
229 )
230 )
231 await session.commit()
232 await self.after_pledge_updated(session, pledge)
234 async def refund_by_payment_id( 1a
235 self, session: AsyncSession, payment_id: str, amount: int, transaction_id: str
236 ) -> None:
237 pledge = await self.get_by_payment_id(session, payment_id)
238 if not pledge:
239 raise ResourceNotFound(f"Pledge not found with payment_id: {payment_id}")
241 if pledge.state not in PledgeState.to_refunded_states():
242 raise NotPermitted("Refunding error, unexpected pledge status")
244 pledge.refunded_at = utc_now()
245 if amount == pledge.amount:
246 pledge.state = PledgeState.refunded
247 elif amount < pledge.amount:
248 pledge.amount -= amount
249 else:
250 raise NotPermitted("Refunding error, unexpected amount!")
251 session.add(pledge)
252 session.add(
253 PledgeTransaction(
254 pledge_id=pledge.id,
255 type=PledgeTransactionType.refund,
256 amount=amount,
257 transaction_id=transaction_id,
258 )
259 )
260 await self.after_pledge_updated(session, pledge)
262 async def mark_charge_disputed_by_payment_id( 1a
263 self, session: AsyncSession, payment_id: str, amount: int, transaction_id: str
264 ) -> None:
265 pledge = await self.get_by_payment_id(session, payment_id)
266 if not pledge:
267 raise ResourceNotFound(f"Pledge not found with payment_id: {payment_id}")
269 # charge_disputed (aka chargebacks) can be triggered from _any_ pledge state
270 # not checking existing state here
272 pledge.state = PledgeState.charge_disputed
273 session.add(pledge)
274 session.add(
275 PledgeTransaction(
276 pledge_id=pledge.id,
277 type=PledgeTransactionType.disputed,
278 amount=amount,
279 transaction_id=transaction_id,
280 )
281 )
282 await session.commit()
283 await self.after_pledge_updated(session, pledge)
285 async def get_reward( 1a
286 self, session: AsyncSession, split_id: UUID
287 ) -> IssueReward | None:
288 stmt = sql.select(IssueReward).where(IssueReward.id == split_id)
289 res = await session.execute(stmt)
290 return res.scalars().unique().one_or_none()
292 async def get_transaction( 1a
293 self,
294 session: AsyncSession,
295 type: PledgeTransactionType | None = None,
296 pledge_id: UUID | None = None,
297 issue_reward_id: UUID | None = None,
298 ) -> PledgeTransaction | None:
299 stmt = sql.select(PledgeTransaction)
301 if type:
302 stmt = stmt.where(PledgeTransaction.type == type)
303 if pledge_id:
304 stmt = stmt.where(PledgeTransaction.pledge_id == pledge_id)
305 if issue_reward_id:
306 stmt = stmt.where(PledgeTransaction.issue_reward_id == issue_reward_id)
308 res = await session.execute(stmt)
309 return res.scalars().unique().one_or_none()
311 async def transfer( 1a
312 self, session: AsyncSession, pledge_id: UUID, issue_reward_id: UUID
313 ) -> None:
314 pledge = await self.get(session, id=pledge_id)
315 if not pledge:
316 raise ResourceNotFound(f"Pledge not found with id: {pledge_id}")
317 if pledge.state not in PledgeState.to_paid_states():
318 raise NotPermitted("Pledge is not in pending state")
319 if pledge.scheduled_payout_at and pledge.scheduled_payout_at > utc_now():
320 raise NotPermitted(
321 "Pledge is not ready for payput (still in dispute window)"
322 )
324 # get receiver
325 split = await self.get_reward(session, issue_reward_id)
326 if not split:
327 raise ResourceNotFound(f"IssueReward not found with id: {issue_reward_id}")
329 if not split.user_id and not split.organization_id:
330 raise NotPermitted(
331 "Either user_id or organization_id must be set on the split to create a transfer" # noqa: E501
332 )
334 # sanity check
335 if split.share_thousands < 0 or split.share_thousands > 1000:
336 raise NotPermitted("unexpected split share")
338 # check that this transfer hasn't already been made!
339 existing_trx = await self.get_transaction(
340 session, pledge_id=pledge.id, issue_reward_id=split.id
341 )
342 if existing_trx:
343 raise NotPermitted(
344 "A transfer for this pledge_id and issue_reward_id already exists, refusing to make another one" # noqa: E501
345 )
347 # pledge amount * the users share
348 payout_amount = split.get_share_amount(pledge)
350 account_repository = AccountRepository.from_session(session)
351 if split.user_id:
352 pay_to_account = await account_repository.get_by_user(split.user_id)
353 if pay_to_account is None:
354 raise NotPermitted("Receiving user has no account")
356 elif split.organization_id:
357 pay_to_account = await account_repository.get_by_organization(
358 split.organization_id
359 )
360 if pay_to_account is None:
361 raise NotPermitted("Receiving organization has no account")
362 else:
363 raise NotPermitted("Unexpected split receiver")
365 assert pledge.payment_id is not None
367 balance_transactions = (
368 await balance_transaction_service.create_balance_from_payment_intent(
369 session,
370 source_account=None,
371 destination_account=pay_to_account,
372 payment_intent_id=pledge.payment_id,
373 amount=payout_amount,
374 pledge=pledge,
375 issue_reward=split,
376 )
377 )
378 await platform_fee_transaction_service.create_fees_reversal_balances(
379 session, balance_transactions=balance_transactions
380 )
382 transaction = PledgeTransaction(
383 pledge_id=pledge.id,
384 type=PledgeTransactionType.transfer,
385 amount=payout_amount,
386 issue_reward_id=split.id,
387 )
389 session.add(transaction)
390 await session.commit()
392 async def admin_transfer( 1a
393 self,
394 session: AsyncSession,
395 pledge_id: UUID,
396 ) -> None:
397 """
398 Transfer a pledge directly to the organization (100% of amount minus fees).
399 Similar to the regular transfer method but without reward split logic.
401 Args:
402 pledge_id: The pledge to transfer
404 Raises:
405 PledgeError: If pledge is not in valid state for transfer
406 """
407 pledge = await self.get(session, id=pledge_id)
408 if not pledge:
409 raise ResourceNotFound(f"Pledge not found with id: {pledge_id}")
411 # Check if pledge is pay_upfront type
412 if pledge.type != PledgeType.pay_upfront:
413 raise NotPermitted(f"Pledge is not pay_upfront type: {pledge.type}")
415 # Update state to mimic old automatic state transfer from GitHub events
416 if pledge.state == PledgeState.created:
417 pledge.state = PledgeState.pending
419 if not pledge.scheduled_payout_at:
420 pledge.scheduled_payout_at = utc_now() - timedelta(seconds=10)
422 session.add(pledge)
424 # Create a 100% reward in admin to the receiving organization (unless it exists already)
425 stmt = sql.select(IssueReward).where(
426 IssueReward.organization_id == pledge.organization_id,
427 IssueReward.issue_reference == pledge.issue_reference,
428 )
429 res = await session.execute(stmt)
430 reward = res.scalars().unique().one_or_none()
431 if not reward:
432 reward = IssueReward(
433 id=generate_uuid(),
434 issue_reference=pledge.issue_reference,
435 share_thousands=1000, # 100%
436 organization_id=pledge.organization_id,
437 )
438 session.add(reward)
440 # Now we can proceed with a regular old-school transfer
441 return await self.transfer(
442 session,
443 pledge_id=pledge.id,
444 issue_reward_id=reward.id,
445 )
447 async def mark_disputed( 1a
448 self,
449 session: AsyncSession,
450 pledge_id: UUID,
451 by_user_id: UUID,
452 reason: str,
453 ) -> None:
454 pledge = await self.get(session, id=pledge_id)
455 if not pledge:
456 raise ResourceNotFound(f"Pledge not found with id: {pledge_id}")
457 if pledge.state not in PledgeState.to_disputed_states():
458 raise NotPermitted(f"Pledge is unexpected state: {pledge.state}")
460 stmt = (
461 sql.update(Pledge)
462 .where(Pledge.id == pledge_id)
463 .values(
464 state=PledgeState.disputed,
465 dispute_reason=reason,
466 disputed_at=datetime.datetime.now(),
467 disputed_by_user_id=by_user_id,
468 )
469 )
470 await session.execute(stmt)
471 await session.commit()
473 await pledge_disputed.call(PledgeHook(session, pledge))
474 await self.after_pledge_updated(session, pledge)
476 async def after_pledge_updated( 1a
477 self,
478 session: AsyncSession,
479 pledge: Pledge,
480 ) -> None:
481 full_pledge = await self.get_with_loaded(session, pledge.id)
482 assert full_pledge
484 await pledge_updated.call(PledgeHook(session, full_pledge))
486 def user_can_admin_sender_pledge( 1a
487 self, user: User, pledge: Pledge, memberships: Sequence[UserOrganization]
488 ) -> bool:
489 """
490 Returns true if the User can modify the pledge on behalf of the entity that sent
491 the pledge, such as disputing it.
492 """
494 if pledge.by_user_id == user.id:
495 return True
497 if pledge.by_organization_id:
498 for m in memberships:
499 if m.organization_id == pledge.by_organization_id:
500 return True
502 return False
504 async def sum_pledges_period( 1a
505 self,
506 session: AsyncSession,
507 organization_id: UUID,
508 user_id: UUID | None = None,
509 ) -> int:
510 stmt = sql.select(func.sum(Pledge.amount)).where(
511 Pledge.by_organization_id == organization_id
512 )
514 if user_id:
515 stmt = stmt.where(Pledge.created_by_user_id == user_id)
517 now = utc_now()
518 (start, end) = self.month_range(now)
519 stmt = stmt.where(
520 Pledge.created_at >= start,
521 Pledge.created_at <= end,
522 )
524 ret = await session.execute(stmt)
525 res = ret.scalars().one_or_none()
527 if not res:
528 return 0
530 return res
532 """ 1a
533 month_range returns the first and the last second of the month that ts is in
534 """
536 def month_range( 1a
537 self, ts: datetime.datetime
538 ) -> tuple[datetime.datetime, datetime.datetime]:
539 # go to first day and second of the month
540 start = ts.replace(day=1, hour=0, minute=0, second=0, microsecond=0)
542 # add 35 days to skip to the next month
543 end = start + timedelta(days=35)
544 # go to the first day of the next month
545 end = end.replace(day=1)
546 # go back one second to find the last second of the "current" month
547 end = end - timedelta(seconds=1)
549 return (start, end)
552pledge = PledgeService(Pledge) 1a