Coverage for polar/transaction/service/transaction.py: 27%
78 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import uuid 1a
2from collections.abc import Sequence 1a
3from enum import StrEnum 1a
4from typing import Any, cast 1a
6from sqlalchemy import Select, UnaryExpression, asc, desc, func, or_, select 1a
7from sqlalchemy.exc import NoResultFound 1a
8from sqlalchemy.orm import aliased, joinedload, subqueryload 1a
10from polar.exceptions import ResourceNotFound 1a
11from polar.kit.pagination import PaginationParams, paginate 1a
12from polar.kit.sorting import Sorting 1a
13from polar.models import ( 1a
14 Account,
15 Order,
16 Product,
17 Transaction,
18 User,
19)
20from polar.models.organization import Organization 1a
21from polar.models.transaction import TransactionType 1a
22from polar.models.user_organization import UserOrganization 1a
23from polar.postgres import AsyncReadSession, AsyncSession 1a
25from ..schemas import ( 1a
26 TransactionsBalance,
27 TransactionsSummary,
28)
29from .base import BaseTransactionService 1a
32class TransactionSortProperty(StrEnum): 1a
33 created_at = "created_at" 1a
34 amount = "amount" 1a
37class TransactionService(BaseTransactionService): 1a
38 async def search( 1a
39 self,
40 session: AsyncReadSession,
41 user: User,
42 *,
43 type: TransactionType | None = None,
44 account_id: uuid.UUID | None = None,
45 payment_customer_id: uuid.UUID | None = None,
46 payment_organization_id: uuid.UUID | None = None,
47 payment_user_id: uuid.UUID | None = None,
48 exclude_platform_fees: bool = False,
49 pagination: PaginationParams,
50 sorting: list[Sorting[TransactionSortProperty]] = [
51 (TransactionSortProperty.created_at, True)
52 ],
53 ) -> tuple[Sequence[Transaction], int]:
54 statement = self._get_readable_transactions_statement(user)
56 statement = statement.options(
57 # Incurred transactions
58 subqueryload(Transaction.account_incurred_transactions),
59 # Pledge
60 subqueryload(Transaction.pledge),
61 # IssueReward
62 subqueryload(Transaction.issue_reward),
63 # Order
64 subqueryload(Transaction.order).options(
65 joinedload(Order.product).options(joinedload(Product.organization)),
66 ),
67 )
69 if type is not None:
70 statement = statement.where(Transaction.type == type)
71 if account_id is not None:
72 statement = statement.where(Transaction.account_id == account_id)
73 if payment_customer_id is not None:
74 statement = statement.where(
75 Transaction.payment_customer_id == payment_customer_id
76 )
77 if payment_organization_id is not None:
78 statement = statement.where(
79 Transaction.payment_organization_id == payment_organization_id
80 )
81 if payment_user_id is not None:
82 statement = statement.where(Transaction.payment_user_id == payment_user_id)
83 if exclude_platform_fees:
84 statement = statement.where(Transaction.platform_fee_type.is_(None))
86 order_by_clauses: list[UnaryExpression[Any]] = []
87 for criterion, is_desc in sorting:
88 clause_function = desc if is_desc else asc
89 if criterion == TransactionSortProperty.created_at:
90 order_by_clauses.append(clause_function(Transaction.created_at))
91 elif criterion == TransactionSortProperty.amount:
92 order_by_clauses.append(clause_function(Transaction.amount))
93 statement = statement.order_by(*order_by_clauses)
95 results, count = await paginate(session, statement, pagination=pagination)
97 return results, count
99 async def lookup( 1a
100 self, session: AsyncReadSession, id: uuid.UUID, user: User
101 ) -> Transaction:
102 statement = (
103 self._get_readable_transactions_statement(user)
104 .options(
105 # Incurred transactions
106 subqueryload(Transaction.account_incurred_transactions),
107 # Pledge
108 subqueryload(Transaction.pledge),
109 # IssueReward
110 subqueryload(Transaction.issue_reward),
111 # Order
112 subqueryload(Transaction.order).options(
113 joinedload(Order.product).options(joinedload(Product.organization)),
114 ),
115 # Paid transactions (joining on itself)
116 subqueryload(Transaction.paid_transactions).subqueryload(
117 Transaction.pledge
118 ),
119 subqueryload(Transaction.paid_transactions).subqueryload(
120 Transaction.issue_reward
121 ),
122 subqueryload(Transaction.paid_transactions)
123 .subqueryload(Transaction.order)
124 .options(
125 joinedload(Order.product),
126 ),
127 subqueryload(Transaction.paid_transactions).subqueryload(
128 Transaction.account_incurred_transactions
129 ),
130 subqueryload(Transaction.paid_transactions).subqueryload(
131 Transaction.order
132 ),
133 subqueryload(Transaction.paid_transactions),
134 )
135 .where(Transaction.id == id)
136 )
137 result = await session.execute(statement)
138 transaction = result.scalar_one_or_none()
139 if transaction is None:
140 raise ResourceNotFound()
142 return transaction
144 async def get_summary( 1a
145 self, session: AsyncReadSession, account: Account
146 ) -> TransactionsSummary:
147 statement = select(
148 cast(type[int], func.coalesce(func.sum(Transaction.amount), 0)),
149 cast(type[int], func.coalesce(func.sum(Transaction.account_amount), 0)),
150 cast(
151 type[int],
152 func.coalesce(
153 func.sum(Transaction.amount).filter(
154 Transaction.type == TransactionType.payout
155 ),
156 0,
157 ),
158 ),
159 cast(
160 type[int],
161 func.coalesce(
162 func.sum(Transaction.account_amount).filter(
163 Transaction.type == TransactionType.payout
164 ),
165 0,
166 ),
167 ),
168 ).where(Transaction.account_id == account.id)
170 result = await session.execute(statement)
172 currency = "usd" # FIXME: Main Polar currency
173 account_currency = account.currency
174 assert account_currency is not None
176 try:
177 (
178 amount,
179 account_amount,
180 payout_amount,
181 account_payout_amount,
182 ) = result.one()._tuple()
183 except NoResultFound:
184 amount = 0
185 account_amount = 0
186 payout_amount = 0
187 account_payout_amount = 0
189 return TransactionsSummary(
190 balance=TransactionsBalance(
191 currency=currency,
192 amount=amount,
193 account_currency=account_currency,
194 account_amount=account_amount,
195 ),
196 payout=TransactionsBalance(
197 currency=currency,
198 amount=payout_amount,
199 account_currency=account_currency,
200 account_amount=account_payout_amount,
201 ),
202 )
204 async def get_transactions_sum( 1a
205 self,
206 session: AsyncSession,
207 account_id: uuid.UUID | None,
208 *,
209 type: TransactionType | None = None,
210 ) -> int:
211 statement = select(func.coalesce(func.sum(Transaction.amount), 0)).where(
212 Transaction.account_id == account_id
213 )
215 if type is not None:
216 statement = statement.where(Transaction.type == type)
218 result = await session.execute(statement)
219 return int(result.scalar_one())
221 def _get_readable_transactions_statement(self, user: User) -> Select[Any]: 1a
222 PaymentUserOrganization = aliased(UserOrganization)
223 statement = (
224 select(Transaction)
225 .join(Transaction.account, isouter=True)
226 .join(
227 Organization,
228 onclause=Organization.account_id == Account.id,
229 isouter=True,
230 )
231 .join(
232 UserOrganization,
233 onclause=Organization.id == UserOrganization.organization_id,
234 isouter=True,
235 )
236 .join(User, onclause=User.account_id == Account.id, isouter=True)
237 .join(
238 PaymentUserOrganization,
239 onclause=Transaction.payment_organization_id
240 == PaymentUserOrganization.organization_id,
241 isouter=True,
242 )
243 .where(
244 or_(
245 User.id == user.id,
246 UserOrganization.user_id == user.id,
247 Transaction.payment_user_id == user.id,
248 PaymentUserOrganization.user_id == user.id,
249 )
250 )
251 )
253 return statement
256transaction = TransactionService(Transaction) 1a