Coverage for polar/account/service.py: 20%
193 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1from __future__ import annotations 1a
3import uuid 1a
4from collections.abc import Sequence 1a
6import stripe as stripe_lib 1a
7from sqlalchemy.orm.strategy_options import joinedload 1a
9from polar.account.repository import AccountRepository 1a
10from polar.auth.models import AuthSubject 1a
11from polar.campaign.service import campaign as campaign_service 1a
12from polar.enums import AccountType 1a
13from polar.exceptions import PolarError 1a
14from polar.integrations.loops.service import loops as loops_service 1a
15from polar.integrations.open_collective.service import open_collective 1a
16from polar.integrations.stripe.service import stripe 1a
17from polar.kit.pagination import PaginationParams 1a
18from polar.models import Account, Organization, User 1a
19from polar.models.user import IdentityVerificationStatus 1a
20from polar.postgres import AsyncReadSession, AsyncSession 1a
21from polar.user.repository import UserRepository 1a
23from .schemas import ( 1a
24 AccountCreateForOrganization,
25 AccountLink,
26 AccountUpdate,
27)
30class AccountServiceError(PolarError): 1a
31 pass 1a
34class AccountAlreadyExistsError(AccountServiceError): 1a
35 def __init__(self) -> None: 1a
36 super().__init__("An account already exists for this organization.")
39class AccountExternalIdDoesNotExist(AccountServiceError): 1a
40 def __init__(self, external_id: str) -> None: 1a
41 self.external_id = external_id
42 message = f"No associated account exists with external ID {external_id}"
43 super().__init__(message)
46class CannotChangeAdminError(AccountServiceError): 1a
47 def __init__(self, reason: str) -> None: 1a
48 super().__init__(f"Cannot change account admin: {reason}")
51class UserNotOrganizationMemberError(AccountServiceError): 1a
52 def __init__(self, user_id: uuid.UUID, organization_id: uuid.UUID) -> None: 1a
53 super().__init__(
54 f"User {user_id} is not a member of organization {organization_id}"
55 )
58class AccountService: 1a
59 async def search( 1a
60 self,
61 session: AsyncReadSession,
62 auth_subject: AuthSubject[User],
63 *,
64 pagination: PaginationParams,
65 ) -> tuple[Sequence[Account], int]:
66 repository = AccountRepository.from_session(session)
67 statement = repository.get_readable_statement(auth_subject).options(
68 joinedload(Account.users),
69 joinedload(Account.organizations),
70 )
71 accounts, count = await repository.paginate(
72 statement, limit=pagination.limit, page=pagination.page
73 )
75 return accounts, count
77 async def get( 1a
78 self,
79 session: AsyncReadSession,
80 auth_subject: AuthSubject[User | Organization],
81 id: uuid.UUID,
82 ) -> Account | None:
83 repository = AccountRepository.from_session(session)
84 statement = (
85 repository.get_readable_statement(auth_subject)
86 .where(Account.id == id)
87 .options(
88 joinedload(Account.users),
89 joinedload(Account.organizations),
90 )
91 )
92 account = await repository.get_one_or_none(statement)
94 return account
96 async def _get_unrestricted( 1a
97 self,
98 session: AsyncReadSession,
99 id: uuid.UUID,
100 ) -> Account | None:
101 repository = AccountRepository.from_session(session)
102 statement = (
103 repository.get_base_statement()
104 .where(Account.id == id)
105 .options(
106 joinedload(Account.users),
107 joinedload(Account.organizations),
108 )
109 )
110 return await repository.get_one_or_none(statement)
112 async def is_user_admin( 1a
113 self, session: AsyncReadSession, account_id: uuid.UUID, user: User
114 ) -> bool:
115 account = await self._get_unrestricted(session, account_id)
116 if account is None:
117 return False
118 return account.admin_id == user.id
120 async def update( 1a
121 self, session: AsyncSession, account: Account, account_update: AccountUpdate
122 ) -> Account:
123 repository = AccountRepository.from_session(session)
124 return await repository.update(
125 account, update_dict=account_update.model_dump(exclude_unset=True)
126 )
128 async def delete(self, session: AsyncSession, account: Account) -> Account: 1a
129 repository = AccountRepository.from_session(session)
130 return await repository.soft_delete(account)
132 async def delete_stripe_account( 1a
133 self, session: AsyncSession, account: Account
134 ) -> None:
135 """Delete Stripe account and clear related database fields."""
136 if not account.stripe_id:
137 raise AccountServiceError("Account does not have a Stripe ID")
139 # Verify the account exists on Stripe before deletion
140 if not await stripe.account_exists(account.stripe_id):
141 raise AccountServiceError(
142 f"Stripe Account ID {account.stripe_id} doesn't exist"
143 )
145 # Delete the account on Stripe
146 await stripe.delete_account(account.stripe_id)
148 # Clear Stripe account data from database
149 account.stripe_id = None
150 account.is_details_submitted = False
151 account.is_charges_enabled = False
152 account.is_payouts_enabled = False
153 session.add(account)
155 async def create_account( 1a
156 self,
157 session: AsyncSession,
158 *,
159 admin: User,
160 account_create: AccountCreateForOrganization,
161 ) -> Account:
162 assert account_create.account_type == AccountType.stripe
163 account = await self._create_stripe_account(session, admin, account_create)
164 await loops_service.user_created_account(
165 session, admin, accountType=account.account_type
166 )
167 return account
169 async def get_or_create_account_for_organization( 1a
170 self,
171 session: AsyncSession,
172 organization: Organization,
173 admin: User,
174 account_create: AccountCreateForOrganization,
175 ) -> Account:
176 """Get existing account for organization or create a new one.
178 If organization already has an account:
179 - If account has no stripe_id (deleted), create new Stripe account
180 - Otherwise return existing account
182 If organization has no account, create new one and link it.
183 """
185 # Check if organization already has an account
186 if organization.account_id:
187 repository = AccountRepository.from_session(session)
188 account = await repository.get_by_id(
189 organization.account_id,
190 options=(
191 joinedload(Account.users),
192 joinedload(Account.organizations),
193 ),
194 )
196 if account and not account.stripe_id:
197 assert account_create.account_type == AccountType.stripe
198 try:
199 stripe_account = await stripe.create_account(
200 account_create, name=None
201 )
202 except stripe_lib.StripeError as e:
203 if e.user_message:
204 raise AccountServiceError(e.user_message) from e
205 else:
206 raise AccountServiceError(
207 "An unexpected Stripe error happened"
208 ) from e
210 # Update account with new Stripe details
211 account.stripe_id = stripe_account.id
212 account.email = stripe_account.email
213 if stripe_account.country is not None:
214 account.country = stripe_account.country
215 assert stripe_account.default_currency is not None
216 account.currency = stripe_account.default_currency
217 account.is_details_submitted = stripe_account.details_submitted or False
218 account.is_charges_enabled = stripe_account.charges_enabled or False
219 account.is_payouts_enabled = stripe_account.payouts_enabled or False
220 account.business_type = stripe_account.business_type
221 account.data = stripe_account.to_dict()
223 session.add(account)
225 await loops_service.user_created_account(
226 session, admin, accountType=account.account_type
227 )
229 return account
230 elif account:
231 return account
233 # No account exists, create new one
234 account = await self.create_account(
235 session, admin=admin, account_create=account_create
236 )
238 # Link account to organization. Import happens here to avoid circular dependency
239 from polar.organization.service import organization as organization_service
241 await organization_service.set_account(
242 session,
243 auth_subject=AuthSubject(subject=admin, scopes=set(), session=None),
244 organization=organization,
245 account_id=account.id,
246 )
248 await session.refresh(account, {"users", "organizations"})
250 return account
252 async def _build_stripe_account_name( 1a
253 self, session: AsyncSession, account: Account
254 ) -> str | None:
255 # The account name is visible for users and is used to differentiate accounts
256 # from the same Platform ("Polar") in Stripe Express.
257 await session.refresh(account, {"users", "organizations"})
258 associations = []
259 for user in account.users:
260 associations.append(f"user/{user.email}")
261 for organization in account.organizations:
262 associations.append(f"org/{organization.slug}")
263 return "·".join(associations)
265 async def _create_stripe_account( 1a
266 self,
267 session: AsyncSession,
268 admin: User,
269 account_create: AccountCreateForOrganization,
270 ) -> Account:
271 try:
272 stripe_account = await stripe.create_account(
273 account_create, name=None
274 ) # TODO: name
275 except stripe_lib.StripeError as e:
276 if e.user_message:
277 raise AccountServiceError(e.user_message) from e
278 else:
279 raise AccountServiceError("An unexpected Stripe error happened") from e
281 account = Account(
282 status=Account.Status.ONBOARDING_STARTED,
283 admin_id=admin.id,
284 account_type=account_create.account_type,
285 stripe_id=stripe_account.id,
286 email=stripe_account.email,
287 country=stripe_account.country,
288 currency=stripe_account.default_currency,
289 is_details_submitted=stripe_account.details_submitted,
290 is_charges_enabled=stripe_account.charges_enabled,
291 is_payouts_enabled=stripe_account.payouts_enabled,
292 business_type=stripe_account.business_type,
293 data=stripe_account.to_dict(),
294 users=[],
295 organizations=[],
296 )
298 campaign = await campaign_service.get_eligible(session, admin)
299 if campaign:
300 account.campaign_id = campaign.id
301 account._platform_fee_percent = campaign.fee_percent
302 account._platform_fee_fixed = campaign.fee_fixed
304 session.add(account)
305 await session.flush()
306 return account
308 async def update_account_from_stripe( 1a
309 self, session: AsyncSession, *, stripe_account: stripe_lib.Account
310 ) -> Account:
311 repository = AccountRepository.from_session(session)
312 account = await repository.get_by_stripe_id(stripe_account.id)
313 if account is None:
314 raise AccountExternalIdDoesNotExist(stripe_account.id)
316 account.email = stripe_account.email
317 assert stripe_account.default_currency is not None
318 account.currency = stripe_account.default_currency
319 account.is_details_submitted = stripe_account.details_submitted or False
320 account.is_charges_enabled = stripe_account.charges_enabled or False
321 account.is_payouts_enabled = stripe_account.payouts_enabled or False
322 if stripe_account.country is not None:
323 account.country = stripe_account.country
324 account.data = stripe_account.to_dict()
326 session.add(account)
328 # Update organization status based on Stripe account capabilities
329 # Import here to avoid circular imports
330 from polar.organization.service import organization as organization_service
332 await organization_service.update_status_from_stripe_account(session, account)
334 return account
336 async def onboarding_link( 1a
337 self, account: Account, return_path: str
338 ) -> AccountLink | None:
339 if account.account_type == AccountType.stripe:
340 assert account.stripe_id is not None
341 account_link = await stripe.create_account_link(
342 account.stripe_id, return_path
343 )
344 return AccountLink(url=account_link.url)
346 return None
348 async def dashboard_link(self, account: Account) -> AccountLink | None: 1a
349 if account.account_type == AccountType.stripe:
350 assert account.stripe_id is not None
351 account_link = await stripe.create_login_link(account.stripe_id)
352 return AccountLink(url=account_link.url)
354 elif account.account_type == AccountType.open_collective:
355 assert account.open_collective_slug is not None
356 dashboard_link = open_collective.create_dashboard_link(
357 account.open_collective_slug
358 )
359 return AccountLink(url=dashboard_link)
361 return None
363 async def sync_to_upstream(self, session: AsyncSession, account: Account) -> None: 1a
364 if account.account_type != AccountType.stripe:
365 return
367 if not account.stripe_id:
368 return
370 name = await self._build_stripe_account_name(session, account)
371 await stripe.update_account(account.stripe_id, name)
373 async def change_admin( 1a
374 self,
375 session: AsyncSession,
376 account: Account,
377 new_admin_id: uuid.UUID,
378 organization_id: uuid.UUID,
379 ) -> Account:
380 if account.stripe_id:
381 raise CannotChangeAdminError(
382 "Stripe account must be deleted before changing admin"
383 )
385 user_repository = UserRepository.from_session(session)
386 is_member = await user_repository.is_organization_member(
387 new_admin_id, organization_id
388 )
390 if not is_member:
391 raise UserNotOrganizationMemberError(new_admin_id, organization_id)
393 new_admin_user = await user_repository.get_by_id(new_admin_id)
395 if new_admin_user is None:
396 raise UserNotOrganizationMemberError(new_admin_id, organization_id)
398 if (
399 new_admin_user.identity_verification_status
400 != IdentityVerificationStatus.verified
401 ):
402 raise CannotChangeAdminError(
403 f"New admin must be verified in Stripe. Current status: {new_admin_user.identity_verification_status.get_display_name()}"
404 )
406 if account.admin_id == new_admin_id:
407 raise CannotChangeAdminError("New admin is the same as current admin")
409 repository = AccountRepository.from_session(session)
410 account = await repository.update(
411 account, update_dict={"admin_id": new_admin_id}
412 )
414 return account
417account = AccountService() 1a