Coverage for polar/account/service.py: 20%

193 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from __future__ import annotations 1a

2 

3import uuid 1a

4from collections.abc import Sequence 1a

5 

6import stripe as stripe_lib 1a

7from sqlalchemy.orm.strategy_options import joinedload 1a

8 

9from polar.account.repository import AccountRepository 1a

10from polar.auth.models import AuthSubject 1a

11from polar.campaign.service import campaign as campaign_service 1a

12from polar.enums import AccountType 1a

13from polar.exceptions import PolarError 1a

14from polar.integrations.loops.service import loops as loops_service 1a

15from polar.integrations.open_collective.service import open_collective 1a

16from polar.integrations.stripe.service import stripe 1a

17from polar.kit.pagination import PaginationParams 1a

18from polar.models import Account, Organization, User 1a

19from polar.models.user import IdentityVerificationStatus 1a

20from polar.postgres import AsyncReadSession, AsyncSession 1a

21from polar.user.repository import UserRepository 1a

22 

23from .schemas import ( 1a

24 AccountCreateForOrganization, 

25 AccountLink, 

26 AccountUpdate, 

27) 

28 

29 

30class AccountServiceError(PolarError): 1a

31 pass 1a

32 

33 

34class AccountAlreadyExistsError(AccountServiceError): 1a

35 def __init__(self) -> None: 1a

36 super().__init__("An account already exists for this organization.") 

37 

38 

39class AccountExternalIdDoesNotExist(AccountServiceError): 1a

40 def __init__(self, external_id: str) -> None: 1a

41 self.external_id = external_id 

42 message = f"No associated account exists with external ID {external_id}" 

43 super().__init__(message) 

44 

45 

46class CannotChangeAdminError(AccountServiceError): 1a

47 def __init__(self, reason: str) -> None: 1a

48 super().__init__(f"Cannot change account admin: {reason}") 

49 

50 

51class UserNotOrganizationMemberError(AccountServiceError): 1a

52 def __init__(self, user_id: uuid.UUID, organization_id: uuid.UUID) -> None: 1a

53 super().__init__( 

54 f"User {user_id} is not a member of organization {organization_id}" 

55 ) 

56 

57 

58class AccountService: 1a

59 async def search( 1a

60 self, 

61 session: AsyncReadSession, 

62 auth_subject: AuthSubject[User], 

63 *, 

64 pagination: PaginationParams, 

65 ) -> tuple[Sequence[Account], int]: 

66 repository = AccountRepository.from_session(session) 

67 statement = repository.get_readable_statement(auth_subject).options( 

68 joinedload(Account.users), 

69 joinedload(Account.organizations), 

70 ) 

71 accounts, count = await repository.paginate( 

72 statement, limit=pagination.limit, page=pagination.page 

73 ) 

74 

75 return accounts, count 

76 

77 async def get( 1a

78 self, 

79 session: AsyncReadSession, 

80 auth_subject: AuthSubject[User | Organization], 

81 id: uuid.UUID, 

82 ) -> Account | None: 

83 repository = AccountRepository.from_session(session) 

84 statement = ( 

85 repository.get_readable_statement(auth_subject) 

86 .where(Account.id == id) 

87 .options( 

88 joinedload(Account.users), 

89 joinedload(Account.organizations), 

90 ) 

91 ) 

92 account = await repository.get_one_or_none(statement) 

93 

94 return account 

95 

96 async def _get_unrestricted( 1a

97 self, 

98 session: AsyncReadSession, 

99 id: uuid.UUID, 

100 ) -> Account | None: 

101 repository = AccountRepository.from_session(session) 

102 statement = ( 

103 repository.get_base_statement() 

104 .where(Account.id == id) 

105 .options( 

106 joinedload(Account.users), 

107 joinedload(Account.organizations), 

108 ) 

109 ) 

110 return await repository.get_one_or_none(statement) 

111 

112 async def is_user_admin( 1a

113 self, session: AsyncReadSession, account_id: uuid.UUID, user: User 

114 ) -> bool: 

115 account = await self._get_unrestricted(session, account_id) 

116 if account is None: 

117 return False 

118 return account.admin_id == user.id 

119 

120 async def update( 1a

121 self, session: AsyncSession, account: Account, account_update: AccountUpdate 

122 ) -> Account: 

123 repository = AccountRepository.from_session(session) 

124 return await repository.update( 

125 account, update_dict=account_update.model_dump(exclude_unset=True) 

126 ) 

127 

128 async def delete(self, session: AsyncSession, account: Account) -> Account: 1a

129 repository = AccountRepository.from_session(session) 

130 return await repository.soft_delete(account) 

131 

132 async def delete_stripe_account( 1a

133 self, session: AsyncSession, account: Account 

134 ) -> None: 

135 """Delete Stripe account and clear related database fields.""" 

136 if not account.stripe_id: 

137 raise AccountServiceError("Account does not have a Stripe ID") 

138 

139 # Verify the account exists on Stripe before deletion 

140 if not await stripe.account_exists(account.stripe_id): 

141 raise AccountServiceError( 

142 f"Stripe Account ID {account.stripe_id} doesn't exist" 

143 ) 

144 

145 # Delete the account on Stripe 

146 await stripe.delete_account(account.stripe_id) 

147 

148 # Clear Stripe account data from database 

149 account.stripe_id = None 

150 account.is_details_submitted = False 

151 account.is_charges_enabled = False 

152 account.is_payouts_enabled = False 

153 session.add(account) 

154 

155 async def create_account( 1a

156 self, 

157 session: AsyncSession, 

158 *, 

159 admin: User, 

160 account_create: AccountCreateForOrganization, 

161 ) -> Account: 

162 assert account_create.account_type == AccountType.stripe 

163 account = await self._create_stripe_account(session, admin, account_create) 

164 await loops_service.user_created_account( 

165 session, admin, accountType=account.account_type 

166 ) 

167 return account 

168 

169 async def get_or_create_account_for_organization( 1a

170 self, 

171 session: AsyncSession, 

172 organization: Organization, 

173 admin: User, 

174 account_create: AccountCreateForOrganization, 

175 ) -> Account: 

176 """Get existing account for organization or create a new one. 

177 

178 If organization already has an account: 

179 - If account has no stripe_id (deleted), create new Stripe account 

180 - Otherwise return existing account 

181 

182 If organization has no account, create new one and link it. 

183 """ 

184 

185 # Check if organization already has an account 

186 if organization.account_id: 

187 repository = AccountRepository.from_session(session) 

188 account = await repository.get_by_id( 

189 organization.account_id, 

190 options=( 

191 joinedload(Account.users), 

192 joinedload(Account.organizations), 

193 ), 

194 ) 

195 

196 if account and not account.stripe_id: 

197 assert account_create.account_type == AccountType.stripe 

198 try: 

199 stripe_account = await stripe.create_account( 

200 account_create, name=None 

201 ) 

202 except stripe_lib.StripeError as e: 

203 if e.user_message: 

204 raise AccountServiceError(e.user_message) from e 

205 else: 

206 raise AccountServiceError( 

207 "An unexpected Stripe error happened" 

208 ) from e 

209 

210 # Update account with new Stripe details 

211 account.stripe_id = stripe_account.id 

212 account.email = stripe_account.email 

213 if stripe_account.country is not None: 

214 account.country = stripe_account.country 

215 assert stripe_account.default_currency is not None 

216 account.currency = stripe_account.default_currency 

217 account.is_details_submitted = stripe_account.details_submitted or False 

218 account.is_charges_enabled = stripe_account.charges_enabled or False 

219 account.is_payouts_enabled = stripe_account.payouts_enabled or False 

220 account.business_type = stripe_account.business_type 

221 account.data = stripe_account.to_dict() 

222 

223 session.add(account) 

224 

225 await loops_service.user_created_account( 

226 session, admin, accountType=account.account_type 

227 ) 

228 

229 return account 

230 elif account: 

231 return account 

232 

233 # No account exists, create new one 

234 account = await self.create_account( 

235 session, admin=admin, account_create=account_create 

236 ) 

237 

238 # Link account to organization. Import happens here to avoid circular dependency 

239 from polar.organization.service import organization as organization_service 

240 

241 await organization_service.set_account( 

242 session, 

243 auth_subject=AuthSubject(subject=admin, scopes=set(), session=None), 

244 organization=organization, 

245 account_id=account.id, 

246 ) 

247 

248 await session.refresh(account, {"users", "organizations"}) 

249 

250 return account 

251 

252 async def _build_stripe_account_name( 1a

253 self, session: AsyncSession, account: Account 

254 ) -> str | None: 

255 # The account name is visible for users and is used to differentiate accounts 

256 # from the same Platform ("Polar") in Stripe Express. 

257 await session.refresh(account, {"users", "organizations"}) 

258 associations = [] 

259 for user in account.users: 

260 associations.append(f"user/{user.email}") 

261 for organization in account.organizations: 

262 associations.append(f"org/{organization.slug}") 

263 return "·".join(associations) 

264 

265 async def _create_stripe_account( 1a

266 self, 

267 session: AsyncSession, 

268 admin: User, 

269 account_create: AccountCreateForOrganization, 

270 ) -> Account: 

271 try: 

272 stripe_account = await stripe.create_account( 

273 account_create, name=None 

274 ) # TODO: name 

275 except stripe_lib.StripeError as e: 

276 if e.user_message: 

277 raise AccountServiceError(e.user_message) from e 

278 else: 

279 raise AccountServiceError("An unexpected Stripe error happened") from e 

280 

281 account = Account( 

282 status=Account.Status.ONBOARDING_STARTED, 

283 admin_id=admin.id, 

284 account_type=account_create.account_type, 

285 stripe_id=stripe_account.id, 

286 email=stripe_account.email, 

287 country=stripe_account.country, 

288 currency=stripe_account.default_currency, 

289 is_details_submitted=stripe_account.details_submitted, 

290 is_charges_enabled=stripe_account.charges_enabled, 

291 is_payouts_enabled=stripe_account.payouts_enabled, 

292 business_type=stripe_account.business_type, 

293 data=stripe_account.to_dict(), 

294 users=[], 

295 organizations=[], 

296 ) 

297 

298 campaign = await campaign_service.get_eligible(session, admin) 

299 if campaign: 

300 account.campaign_id = campaign.id 

301 account._platform_fee_percent = campaign.fee_percent 

302 account._platform_fee_fixed = campaign.fee_fixed 

303 

304 session.add(account) 

305 await session.flush() 

306 return account 

307 

308 async def update_account_from_stripe( 1a

309 self, session: AsyncSession, *, stripe_account: stripe_lib.Account 

310 ) -> Account: 

311 repository = AccountRepository.from_session(session) 

312 account = await repository.get_by_stripe_id(stripe_account.id) 

313 if account is None: 

314 raise AccountExternalIdDoesNotExist(stripe_account.id) 

315 

316 account.email = stripe_account.email 

317 assert stripe_account.default_currency is not None 

318 account.currency = stripe_account.default_currency 

319 account.is_details_submitted = stripe_account.details_submitted or False 

320 account.is_charges_enabled = stripe_account.charges_enabled or False 

321 account.is_payouts_enabled = stripe_account.payouts_enabled or False 

322 if stripe_account.country is not None: 

323 account.country = stripe_account.country 

324 account.data = stripe_account.to_dict() 

325 

326 session.add(account) 

327 

328 # Update organization status based on Stripe account capabilities 

329 # Import here to avoid circular imports 

330 from polar.organization.service import organization as organization_service 

331 

332 await organization_service.update_status_from_stripe_account(session, account) 

333 

334 return account 

335 

336 async def onboarding_link( 1a

337 self, account: Account, return_path: str 

338 ) -> AccountLink | None: 

339 if account.account_type == AccountType.stripe: 

340 assert account.stripe_id is not None 

341 account_link = await stripe.create_account_link( 

342 account.stripe_id, return_path 

343 ) 

344 return AccountLink(url=account_link.url) 

345 

346 return None 

347 

348 async def dashboard_link(self, account: Account) -> AccountLink | None: 1a

349 if account.account_type == AccountType.stripe: 

350 assert account.stripe_id is not None 

351 account_link = await stripe.create_login_link(account.stripe_id) 

352 return AccountLink(url=account_link.url) 

353 

354 elif account.account_type == AccountType.open_collective: 

355 assert account.open_collective_slug is not None 

356 dashboard_link = open_collective.create_dashboard_link( 

357 account.open_collective_slug 

358 ) 

359 return AccountLink(url=dashboard_link) 

360 

361 return None 

362 

363 async def sync_to_upstream(self, session: AsyncSession, account: Account) -> None: 1a

364 if account.account_type != AccountType.stripe: 

365 return 

366 

367 if not account.stripe_id: 

368 return 

369 

370 name = await self._build_stripe_account_name(session, account) 

371 await stripe.update_account(account.stripe_id, name) 

372 

373 async def change_admin( 1a

374 self, 

375 session: AsyncSession, 

376 account: Account, 

377 new_admin_id: uuid.UUID, 

378 organization_id: uuid.UUID, 

379 ) -> Account: 

380 if account.stripe_id: 

381 raise CannotChangeAdminError( 

382 "Stripe account must be deleted before changing admin" 

383 ) 

384 

385 user_repository = UserRepository.from_session(session) 

386 is_member = await user_repository.is_organization_member( 

387 new_admin_id, organization_id 

388 ) 

389 

390 if not is_member: 

391 raise UserNotOrganizationMemberError(new_admin_id, organization_id) 

392 

393 new_admin_user = await user_repository.get_by_id(new_admin_id) 

394 

395 if new_admin_user is None: 

396 raise UserNotOrganizationMemberError(new_admin_id, organization_id) 

397 

398 if ( 

399 new_admin_user.identity_verification_status 

400 != IdentityVerificationStatus.verified 

401 ): 

402 raise CannotChangeAdminError( 

403 f"New admin must be verified in Stripe. Current status: {new_admin_user.identity_verification_status.get_display_name()}" 

404 ) 

405 

406 if account.admin_id == new_admin_id: 

407 raise CannotChangeAdminError("New admin is the same as current admin") 

408 

409 repository = AccountRepository.from_session(session) 

410 account = await repository.update( 

411 account, update_dict={"admin_id": new_admin_id} 

412 ) 

413 

414 return account 

415 

416 

417account = AccountService() 1a