Coverage for polar/organization/service.py: 19%

415 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import uuid 1a

2from collections.abc import Sequence 1a

3from datetime import UTC, datetime 1a

4from enum import StrEnum 1a

5from typing import TYPE_CHECKING, Any 1a

6from uuid import UUID 1a

7 

8import structlog 1a

9from pydantic import BaseModel, Field 1a

10from sqlalchemy import update as sqlalchemy_update 1a

11from sqlalchemy.exc import IntegrityError 1a

12from sqlalchemy.orm import joinedload 1a

13 

14from polar.account.repository import AccountRepository 1a

15from polar.account.service import account as account_service 1a

16from polar.auth.models import AuthSubject 1a

17from polar.checkout_link.repository import CheckoutLinkRepository 1a

18from polar.config import Environment, settings 1a

19from polar.customer.repository import CustomerRepository 1a

20from polar.enums import InvoiceNumbering 1a

21from polar.exceptions import NotPermitted, PolarError, PolarRequestValidationError 1a

22from polar.integrations.loops.service import loops as loops_service 1a

23from polar.integrations.plain.service import plain as plain_service 1a

24from polar.kit.anonymization import anonymize_email_for_deletion, anonymize_for_deletion 1a

25from polar.kit.pagination import PaginationParams 1a

26from polar.kit.repository import Options 1a

27from polar.kit.sorting import Sorting 1a

28from polar.models import ( 1a

29 Account, 

30 Customer, 

31 Order, 

32 Organization, 

33 Subscription, 

34 User, 

35 UserOrganization, 

36) 

37from polar.models.organization import OrganizationStatus 1a

38from polar.models.organization_review import OrganizationReview 1a

39from polar.models.subscription import SubscriptionStatus 1a

40from polar.models.transaction import TransactionType 1a

41from polar.models.user import IdentityVerificationStatus 1a

42from polar.models.webhook_endpoint import WebhookEventType 1a

43from polar.organization.ai_validation import validator as organization_validator 1a

44from polar.organization_access_token.repository import OrganizationAccessTokenRepository 1a

45from polar.postgres import AsyncReadSession, AsyncSession, sql 1a

46from polar.posthog import posthog 1a

47from polar.product.repository import ProductRepository 1a

48from polar.transaction.service.transaction import transaction as transaction_service 1a

49from polar.webhook.service import webhook as webhook_service 1a

50from polar.worker import enqueue_job 1a

51 

52from .repository import OrganizationRepository, OrganizationReviewRepository 1a

53from .schemas import ( 1a

54 OrganizationCreate, 

55 OrganizationDeletionBlockedReason, 

56 OrganizationUpdate, 

57) 

58from .sorting import OrganizationSortProperty 1a

59 

60if TYPE_CHECKING: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true1a

61 pass 

62 

63log = structlog.get_logger() 1a

64 

65 

66class PaymentStepID(StrEnum): 1a

67 """Enum for payment onboarding step identifiers.""" 

68 

69 CREATE_PRODUCT = "create_product" 1a

70 INTEGRATE_CHECKOUT = "integrate_checkout" 1a

71 SETUP_ACCOUNT = "setup_account" 1a

72 

73 

74class PaymentStep(BaseModel): 1a

75 """Service-level model for payment onboarding steps.""" 

76 

77 id: str = Field(description="Step identifier") 1a

78 title: str = Field(description="Step title") 1a

79 description: str = Field(description="Step description") 1a

80 completed: bool = Field(description="Whether the step is completed") 1a

81 

82 

83class PaymentStatusResponse(BaseModel): 1a

84 """Service-level response for payment status.""" 

85 

86 payment_ready: bool = Field( 1a

87 description="Whether the organization is ready to accept payments" 

88 ) 

89 steps: list[PaymentStep] = Field(description="List of onboarding steps") 1a

90 organization_status: OrganizationStatus = Field( 1a

91 description="Current organization status" 

92 ) 

93 

94 

95class OrganizationDeletionCheckResult(BaseModel): 1a

96 """Result of checking if an organization can be deleted.""" 

97 

98 can_delete_immediately: bool = Field( 1a

99 description="Whether the organization can be deleted immediately" 

100 ) 

101 blocked_reasons: list[OrganizationDeletionBlockedReason] = Field( 1a

102 default_factory=list, 

103 description="Reasons why immediate deletion is blocked", 

104 ) 

105 

106 

107class OrganizationError(PolarError): ... 1a

108 

109 

110class InvalidAccount(OrganizationError): 1a

111 def __init__(self, account_id: UUID) -> None: 1a

112 self.account_id = account_id 

113 message = ( 

114 f"The account {account_id} does not exist or you don't have access to it." 

115 ) 

116 super().__init__(message) 

117 

118 

119class AccountAlreadySet(OrganizationError): 1a

120 def __init__(self, organization_slug: str) -> None: 1a

121 self.organization_slug = organization_slug 

122 message = f"The account for organization '{organization_slug}' has already been set up by the owner. Contact support to change the owner of the account." 

123 super().__init__(message, 403) 

124 

125 

126class OrganizationService: 1a

127 async def list( 1a

128 self, 

129 session: AsyncReadSession, 

130 auth_subject: AuthSubject[User | Organization], 

131 *, 

132 slug: str | None = None, 

133 pagination: PaginationParams, 

134 sorting: list[Sorting[OrganizationSortProperty]] = [ 

135 (OrganizationSortProperty.created_at, False) 

136 ], 

137 ) -> tuple[Sequence[Organization], int]: 

138 repository = OrganizationRepository.from_session(session) 

139 statement = repository.get_readable_statement(auth_subject) 

140 

141 if slug is not None: 

142 statement = statement.where(Organization.slug == slug) 

143 

144 statement = repository.apply_sorting(statement, sorting) 

145 

146 return await repository.paginate( 

147 statement, limit=pagination.limit, page=pagination.page 

148 ) 

149 

150 async def get( 1a

151 self, 

152 session: AsyncReadSession, 

153 auth_subject: AuthSubject[User | Organization], 

154 id: uuid.UUID, 

155 *, 

156 options: Options = (), 

157 ) -> Organization | None: 

158 repository = OrganizationRepository.from_session(session) 1b

159 statement = ( 1b

160 repository.get_readable_statement(auth_subject) 

161 .where(Organization.id == id) 

162 .options(*options) 

163 ) 

164 return await repository.get_one_or_none(statement) 1b

165 

166 async def get_anonymous( 1a

167 self, 

168 session: AsyncReadSession, 

169 id: uuid.UUID, 

170 *, 

171 options: Options = (), 

172 ) -> Organization | None: 

173 """Use it with precaution! Get organization by ID for anonymous users.""" 

174 repository = OrganizationRepository.from_session(session) 

175 statement = ( 

176 repository.get_base_statement() 

177 .where(Organization.blocked_at.is_(None)) 

178 .where(Organization.id == id) 

179 .options(*options) 

180 ) 

181 

182 return await repository.get_one_or_none(statement) 

183 

184 async def create( 1a

185 self, 

186 session: AsyncSession, 

187 create_schema: OrganizationCreate, 

188 auth_subject: AuthSubject[User], 

189 ) -> Organization: 

190 repository = OrganizationRepository.from_session(session) 

191 if await repository.slug_exists(create_schema.slug): 

192 raise PolarRequestValidationError( 

193 [ 

194 { 

195 "loc": ("body", "slug"), 

196 "msg": "An organization with this slug already exists.", 

197 "type": "value_error", 

198 "input": create_schema.slug, 

199 } 

200 ] 

201 ) 

202 

203 organization = await repository.create( 

204 Organization( 

205 **create_schema.model_dump(exclude_unset=True, exclude_none=True), 

206 customer_invoice_prefix=create_schema.slug.upper(), 

207 ) 

208 ) 

209 await self.add_user(session, organization, auth_subject.subject) 

210 

211 enqueue_job("organization.created", organization_id=organization.id) 

212 

213 posthog.auth_subject_event( 

214 auth_subject, 

215 "organizations", 

216 "create", 

217 "done", 

218 { 

219 "id": organization.id, 

220 "name": organization.name, 

221 "slug": organization.slug, 

222 }, 

223 ) 

224 return organization 

225 

226 async def update( 1a

227 self, 

228 session: AsyncSession, 

229 organization: Organization, 

230 update_schema: OrganizationUpdate, 

231 ) -> Organization: 

232 repository = OrganizationRepository.from_session(session) 

233 

234 if organization.onboarded_at is None: 

235 organization.onboarded_at = datetime.now(UTC) 

236 

237 if update_schema.feature_settings is not None: 

238 organization.feature_settings = { 

239 **organization.feature_settings, 

240 **update_schema.feature_settings.model_dump( 

241 mode="json", exclude_unset=True, exclude_none=True 

242 ), 

243 } 

244 

245 if update_schema.subscription_settings is not None: 

246 organization.subscription_settings = update_schema.subscription_settings 

247 

248 if update_schema.notification_settings is not None: 

249 organization.notification_settings = update_schema.notification_settings 

250 

251 previous_details = organization.details 

252 update_dict = update_schema.model_dump( 

253 by_alias=True, 

254 exclude_unset=True, 

255 exclude={ 

256 "profile_settings", 

257 "feature_settings", 

258 "subscription_settings", 

259 "details", 

260 }, 

261 ) 

262 

263 # Only store details once to avoid API overrides later w/o review 

264 if not previous_details and update_schema.details: 

265 organization.details = update_schema.details.model_dump() 

266 organization.details_submitted_at = datetime.now(UTC) 

267 

268 organization = await repository.update(organization, update_dict=update_dict) 

269 

270 await self._after_update(session, organization) 

271 return organization 

272 

273 async def delete( 1a

274 self, 

275 session: AsyncSession, 

276 organization: Organization, 

277 ) -> Organization: 

278 """Anonymizes fields on the Organization that can contain PII and then 

279 soft-deletes the Organization. 

280 

281 DOES NOT: 

282 - Delete or anonymize Users related Organization 

283 - Delete or anonymize Account of the Organization 

284 - Delete or anonymize Customers, Products, Discounts, Benefits, Checkouts of the Organization 

285 - Revoke Benefits granted 

286 - Remove API tokens (organization or personal) 

287 """ 

288 repository = OrganizationRepository.from_session(session) 

289 

290 update_dict: dict[str, Any] = {} 

291 

292 pii_fields = ["name", "slug", "website", "customer_invoice_prefix"] 

293 github_fields = ["bio", "company", "blog", "location", "twitter_username"] 

294 for pii_field in pii_fields + github_fields: 

295 value = getattr(organization, pii_field) 

296 if value: 

297 update_dict[pii_field] = anonymize_for_deletion(value) 

298 

299 if organization.email: 

300 update_dict["email"] = anonymize_email_for_deletion(organization.email) 

301 

302 if organization.avatar_url: 

303 # Anonymize by setting to Polar logo 

304 update_dict["avatar_url"] = ( 

305 "https://avatars.githubusercontent.com/u/105373340?s=48&v=4" 

306 ) 

307 if organization.details: 

308 update_dict["details"] = {} 

309 

310 if organization.socials: 

311 update_dict["socials"] = [] 

312 

313 organization = await repository.update(organization, update_dict=update_dict) 

314 await repository.soft_delete(organization) 

315 

316 return organization 

317 

318 async def check_can_delete( 1a

319 self, 

320 session: AsyncReadSession, 

321 organization: Organization, 

322 ) -> OrganizationDeletionCheckResult: 

323 """Check if an organization can be deleted immediately. 

324 

325 An organization can be deleted immediately if it has: 

326 - No orders 

327 - No active subscriptions 

328 

329 If it has an account but no orders/subscriptions, we'll attempt to 

330 delete the Stripe account first. 

331 """ 

332 blocked_reasons: list[OrganizationDeletionBlockedReason] = [] 

333 

334 # Check for orders 

335 order_count = await self._count_orders_by_organization(session, organization.id) 

336 if order_count > 0: 

337 blocked_reasons.append(OrganizationDeletionBlockedReason.HAS_ORDERS) 

338 

339 # Check for active subscriptions 

340 active_subscription_count = ( 

341 await self._count_active_subscriptions_by_organization( 

342 session, organization.id 

343 ) 

344 ) 

345 if active_subscription_count > 0: 

346 blocked_reasons.append( 

347 OrganizationDeletionBlockedReason.HAS_ACTIVE_SUBSCRIPTIONS 

348 ) 

349 

350 return OrganizationDeletionCheckResult( 

351 can_delete_immediately=len(blocked_reasons) == 0, 

352 blocked_reasons=blocked_reasons, 

353 ) 

354 

355 async def request_deletion( 1a

356 self, 

357 session: AsyncSession, 

358 auth_subject: AuthSubject[User], 

359 organization: Organization, 

360 ) -> OrganizationDeletionCheckResult: 

361 """Request deletion of an organization. 

362 

363 Authorization: 

364 - If the organization has an account, only the account admin can delete 

365 - If there is no account, any organization member can delete 

366 

367 Flow: 

368 1. Check authorization 

369 2. Check for orders/subscriptions -> if blocked, create support ticket 

370 3. If has account -> try to delete Stripe account 

371 4. If Stripe deletion fails -> create support ticket 

372 5. Soft delete organization 

373 """ 

374 # Authorization check: only account admin can delete if account exists 

375 if organization.account_id is not None: 

376 is_admin = await account_service.is_user_admin( 

377 session, organization.account_id, auth_subject.subject 

378 ) 

379 if not is_admin: 

380 raise NotPermitted( 

381 "Only the account admin can delete an organization with an account" 

382 ) 

383 

384 check_result = await self.check_can_delete(session, organization) 

385 

386 if not check_result.can_delete_immediately: 

387 # Organization has orders or active subscriptions 

388 enqueue_job( 

389 "organization.deletion_requested", 

390 organization_id=organization.id, 

391 user_id=auth_subject.subject.id, 

392 blocked_reasons=[r.value for r in check_result.blocked_reasons], 

393 ) 

394 return check_result 

395 

396 # Organization is eligible for deletion 

397 # If it has an account, try to delete it first 

398 if organization.account_id is not None: 

399 try: 

400 await self._delete_account(session, organization) 

401 except Exception as e: 

402 log.error( 

403 "organization.deletion.stripe_account_deletion_failed", 

404 organization_id=organization.id, 

405 error=str(e), 

406 ) 

407 # Stripe deletion failed, create support ticket 

408 check_result = OrganizationDeletionCheckResult( 

409 can_delete_immediately=False, 

410 blocked_reasons=[ 

411 OrganizationDeletionBlockedReason.STRIPE_ACCOUNT_DELETION_FAILED 

412 ], 

413 ) 

414 enqueue_job( 

415 "organization.deletion_requested", 

416 organization_id=organization.id, 

417 user_id=auth_subject.subject.id, 

418 blocked_reasons=[r.value for r in check_result.blocked_reasons], 

419 ) 

420 return check_result 

421 

422 # Soft delete the organization 

423 await self.soft_delete_organization(session, organization) 

424 

425 return OrganizationDeletionCheckResult( 

426 can_delete_immediately=True, 

427 blocked_reasons=[], 

428 ) 

429 

430 async def soft_delete_organization( 1a

431 self, 

432 session: AsyncSession, 

433 organization: Organization, 

434 ) -> Organization: 

435 """Soft-delete an organization, preserving the slug for backoffice links. 

436 

437 Anonymizes PII fields (except slug) and sets deleted_at timestamp. 

438 """ 

439 repository = OrganizationRepository.from_session(session) 

440 

441 update_dict: dict[str, Any] = {} 

442 

443 # Anonymize PII fields but NOT slug (to keep backoffice links working) 

444 pii_fields = ["name", "website", "customer_invoice_prefix"] 

445 github_fields = ["bio", "company", "blog", "location", "twitter_username"] 

446 for pii_field in pii_fields + github_fields: 

447 value = getattr(organization, pii_field) 

448 if value: 

449 update_dict[pii_field] = anonymize_for_deletion(value) 

450 

451 if organization.email: 

452 update_dict["email"] = anonymize_email_for_deletion(organization.email) 

453 

454 if organization.avatar_url: 

455 # Anonymize by setting to Polar logo 

456 update_dict["avatar_url"] = ( 

457 "https://avatars.githubusercontent.com/u/105373340?s=48&v=4" 

458 ) 

459 

460 if organization.details: 

461 update_dict["details"] = {} 

462 

463 if organization.socials: 

464 update_dict["socials"] = [] 

465 

466 organization = await repository.update(organization, update_dict=update_dict) 

467 await repository.soft_delete(organization) 

468 

469 log.info( 

470 "organization.deleted", 

471 organization_id=organization.id, 

472 slug=organization.slug, 

473 ) 

474 

475 return organization 

476 

477 async def _delete_account( 1a

478 self, 

479 session: AsyncSession, 

480 organization: Organization, 

481 ) -> None: 

482 """Delete the Stripe account linked to an organization.""" 

483 if organization.account_id is None: 

484 return 

485 

486 account_repository = AccountRepository.from_session(session) 

487 account = await account_repository.get_by_id(organization.account_id) 

488 

489 if account is None: 

490 return 

491 

492 if account.stripe_id: 

493 await account_service.delete_stripe_account(session, account) 

494 

495 organization.account_id = None 

496 session.add(organization) 

497 

498 await account_service.delete(session, account) 

499 

500 log.info( 

501 "organization.account_deleted", 

502 organization_id=organization.id, 

503 account_id=account.id, 

504 ) 

505 

506 async def _count_orders_by_organization( 1a

507 self, 

508 session: AsyncReadSession, 

509 organization_id: UUID, 

510 ) -> int: 

511 """Count orders for all customers of this organization.""" 

512 statement = ( 

513 sql.select(sql.func.count(Order.id)) 

514 .join(Customer, Order.customer_id == Customer.id) 

515 .where( 

516 Customer.organization_id == organization_id, 

517 Customer.deleted_at.is_(None), 

518 ) 

519 ) 

520 result = await session.execute(statement) 

521 return result.scalar() or 0 

522 

523 async def _count_active_subscriptions_by_organization( 1a

524 self, 

525 session: AsyncReadSession, 

526 organization_id: UUID, 

527 ) -> int: 

528 """Count active subscriptions for all customers of this organization.""" 

529 statement = ( 

530 sql.select(sql.func.count(Subscription.id)) 

531 .join(Customer, Subscription.customer_id == Customer.id) 

532 .where( 

533 Customer.organization_id == organization_id, 

534 Customer.deleted_at.is_(None), 

535 Subscription.status.in_(SubscriptionStatus.active_statuses()), 

536 ) 

537 ) 

538 result = await session.execute(statement) 

539 return result.scalar() or 0 

540 

541 async def add_user( 1a

542 self, 

543 session: AsyncSession, 

544 organization: Organization, 

545 user: User, 

546 ) -> None: 

547 nested = await session.begin_nested() 

548 try: 

549 relation = UserOrganization( 

550 user_id=user.id, organization_id=organization.id 

551 ) 

552 session.add(relation) 

553 await session.flush() 

554 log.info( 

555 "organization.add_user.created", 

556 user_id=user.id, 

557 organization_id=organization.id, 

558 ) 

559 except IntegrityError: 

560 # TODO: Currently, we treat this as success since the connection 

561 # exists. However, once we use status to distinguish active/inactive 

562 # installations we need to change this. 

563 log.info( 

564 "organization.add_user.already_exists", 

565 organization_id=organization.id, 

566 user_id=user.id, 

567 ) 

568 await nested.rollback() 

569 # Update 

570 stmt = ( 

571 sql.Update(UserOrganization) 

572 .where( 

573 UserOrganization.user_id == user.id, 

574 UserOrganization.organization_id == organization.id, 

575 ) 

576 .values( 

577 deleted_at=None, # un-delete user if exists 

578 ) 

579 ) 

580 await session.execute(stmt) 

581 await session.flush() 

582 finally: 

583 await loops_service.user_organization_added(session, user) 

584 

585 async def set_account( 1a

586 self, 

587 session: AsyncSession, 

588 auth_subject: AuthSubject[User | Organization], 

589 organization: Organization, 

590 account_id: UUID, 

591 ) -> Organization: 

592 if organization.account_id is not None: 

593 raise AccountAlreadySet(organization.slug) 

594 

595 account = await account_service.get(session, auth_subject, account_id) 

596 if account is None: 

597 raise InvalidAccount(account_id) 

598 

599 repository = OrganizationRepository.from_session(session) 

600 organization = await repository.update( 

601 organization, update_dict={"account": account} 

602 ) 

603 

604 enqueue_job("organization.account_set", organization.id) 

605 

606 await self._after_update(session, organization) 

607 

608 return organization 

609 

610 async def get_next_invoice_number( 1a

611 self, 

612 session: AsyncSession, 

613 organization: Organization, 

614 customer: "Customer", 

615 ) -> str: 

616 match organization.invoice_numbering: 

617 case InvoiceNumbering.customer: 

618 invoice_number = f"{organization.customer_invoice_prefix}-{customer.short_id_str}-{customer.invoice_next_number:04d}" 

619 customer_repository = CustomerRepository.from_session(session) 

620 customer = await customer_repository.update( 

621 customer, 

622 update_dict={ 

623 "invoice_next_number": customer.invoice_next_number + 1 

624 }, 

625 ) 

626 return invoice_number 

627 

628 case InvoiceNumbering.organization: 

629 invoice_number = f"{organization.customer_invoice_prefix}-{organization.customer_invoice_next_number:04d}" 

630 repository = OrganizationRepository.from_session(session) 

631 organization = await repository.update( 

632 organization, 

633 update_dict={ 

634 "customer_invoice_next_number": organization.customer_invoice_next_number 

635 + 1 

636 }, 

637 ) 

638 return invoice_number 

639 

640 async def _after_update( 1a

641 self, 

642 session: AsyncSession, 

643 organization: Organization, 

644 ) -> None: 

645 await webhook_service.send( 

646 session, organization, WebhookEventType.organization_updated, organization 

647 ) 

648 

649 async def check_review_threshold( 1a

650 self, session: AsyncSession, organization: Organization 

651 ) -> Organization: 

652 if organization.is_under_review: 

653 return organization 

654 

655 transfers_sum = await transaction_service.get_transactions_sum( 

656 session, organization.account_id, type=TransactionType.balance 

657 ) 

658 if ( 

659 organization.next_review_threshold >= 0 

660 and transfers_sum >= organization.next_review_threshold 

661 ): 

662 organization.status = ( 

663 OrganizationStatus.ONGOING_REVIEW 

664 if organization.initially_reviewed_at is not None 

665 else OrganizationStatus.INITIAL_REVIEW 

666 ) 

667 organization.status_updated_at = datetime.now(UTC) 

668 await self._sync_account_status(session, organization) 

669 session.add(organization) 

670 

671 enqueue_job("organization.under_review", organization_id=organization.id) 

672 

673 return organization 

674 

675 async def confirm_organization_reviewed( 1a

676 self, 

677 session: AsyncSession, 

678 organization: Organization, 

679 next_review_threshold: int, 

680 ) -> Organization: 

681 organization.status = OrganizationStatus.ACTIVE 

682 organization.status_updated_at = datetime.now(UTC) 

683 organization.next_review_threshold = next_review_threshold 

684 

685 initial_review = False 

686 if organization.initially_reviewed_at is None: 

687 organization.initially_reviewed_at = datetime.now(UTC) 

688 initial_review = True 

689 

690 await self._sync_account_status(session, organization) 

691 session.add(organization) 

692 

693 # If there's a pending appeal, mark it as approved 

694 review_repository = OrganizationReviewRepository.from_session(session) 

695 review = await review_repository.get_by_organization(organization.id) 

696 if review and review.appeal_submitted_at and review.appeal_decision is None: 

697 review.appeal_decision = OrganizationReview.AppealDecision.APPROVED 

698 review.appeal_reviewed_at = datetime.now(UTC) 

699 session.add(review) 

700 

701 enqueue_job( 

702 "organization.reviewed", 

703 organization_id=organization.id, 

704 initial_review=initial_review, 

705 ) 

706 return organization 

707 

708 async def deny_organization( 1a

709 self, session: AsyncSession, organization: Organization 

710 ) -> Organization: 

711 organization.status = OrganizationStatus.DENIED 

712 organization.status_updated_at = datetime.now(UTC) 

713 await self._sync_account_status(session, organization) 

714 session.add(organization) 

715 

716 # If there's a pending appeal, mark it as rejected 

717 review_repository = OrganizationReviewRepository.from_session(session) 

718 review = await review_repository.get_by_organization(organization.id) 

719 if review and review.appeal_submitted_at and review.appeal_decision is None: 

720 review.appeal_decision = OrganizationReview.AppealDecision.REJECTED 

721 review.appeal_reviewed_at = datetime.now(UTC) 

722 session.add(review) 

723 

724 return organization 

725 

726 async def set_organization_under_review( 1a

727 self, session: AsyncSession, organization: Organization 

728 ) -> Organization: 

729 organization.status = OrganizationStatus.ONGOING_REVIEW 

730 organization.status_updated_at = datetime.now(UTC) 

731 await self._sync_account_status(session, organization) 

732 session.add(organization) 

733 enqueue_job("organization.under_review", organization_id=organization.id) 

734 return organization 

735 

736 async def update_status_from_stripe_account( 1a

737 self, session: AsyncSession, account: Account 

738 ) -> None: 

739 """Update organization status based on Stripe account capabilities.""" 

740 repository = OrganizationRepository.from_session(session) 

741 organizations = await repository.get_all_by_account(account.id) 

742 

743 for organization in organizations: 

744 # Don't override organizations that are denied 

745 if organization.status == OrganizationStatus.DENIED: 

746 continue 

747 

748 # If account is fully set up, set organization to ACTIVE 

749 if all( 

750 ( 

751 not organization.is_under_review, 

752 not organization.is_active(), 

753 account.currency is not None, 

754 account.is_details_submitted, 

755 account.is_charges_enabled, 

756 account.is_payouts_enabled, 

757 ) 

758 ): 

759 organization.status = OrganizationStatus.ACTIVE 

760 organization.status_updated_at = datetime.now(UTC) 

761 

762 # If Stripe disables some capabilities, reset to ONBOARDING_STARTED 

763 if any( 

764 ( 

765 not account.is_details_submitted, 

766 not account.is_charges_enabled, 

767 not account.is_payouts_enabled, 

768 ) 

769 ): 

770 organization.status = OrganizationStatus.ONBOARDING_STARTED 

771 organization.status_updated_at = datetime.now(UTC) 

772 

773 await self._sync_account_status(session, organization) 

774 session.add(organization) 

775 

776 async def _sync_account_status( 1a

777 self, session: AsyncSession, organization: Organization 

778 ) -> None: 

779 """Sync organization status to the related account.""" 

780 if not organization.account_id: 

781 return 

782 

783 # Map organization status to account status 

784 status_mapping = { 

785 OrganizationStatus.ONBOARDING_STARTED: Account.Status.ONBOARDING_STARTED, 

786 OrganizationStatus.ACTIVE: Account.Status.ACTIVE, 

787 OrganizationStatus.INITIAL_REVIEW: Account.Status.UNDER_REVIEW, 

788 OrganizationStatus.ONGOING_REVIEW: Account.Status.UNDER_REVIEW, 

789 OrganizationStatus.DENIED: Account.Status.DENIED, 

790 } 

791 

792 if organization.status in status_mapping: 

793 account_status = status_mapping[organization.status] 

794 await session.execute( 

795 sqlalchemy_update(Account) 

796 .where(Account.id == organization.account_id) 

797 .values(status=account_status) 

798 ) 

799 

800 async def get_payment_status( 1a

801 self, 

802 session: AsyncReadSession, 

803 organization: Organization, 

804 account_verification_only: bool = False, 

805 ) -> PaymentStatusResponse: 

806 """Get payment status and onboarding steps for an organization.""" 

807 steps = [] 

808 

809 if not account_verification_only: 

810 # Step 1: Create a product 

811 product_repository = ProductRepository.from_session(session) 

812 product_count = await product_repository.count_by_organization_id( 

813 organization.id, is_archived=False 

814 ) 

815 steps.append( 

816 PaymentStep( 

817 id=PaymentStepID.CREATE_PRODUCT, 

818 title="Create a product", 

819 description="Create your first product to start accepting payments", 

820 completed=product_count > 0, 

821 ) 

822 ) 

823 

824 # Step 2: Integrate Checkout (API key OR checkout link) 

825 token_repository = OrganizationAccessTokenRepository.from_session(session) 

826 api_key_count = await token_repository.count_by_organization_id( 

827 organization.id 

828 ) 

829 

830 checkout_link_repository = CheckoutLinkRepository.from_session(session) 

831 checkout_link_count = ( 

832 await checkout_link_repository.count_by_organization_id(organization.id) 

833 ) 

834 

835 # Step is completed if user has either an API key OR a checkout link 

836 integration_completed = api_key_count > 0 or checkout_link_count > 0 

837 steps.append( 

838 PaymentStep( 

839 id=PaymentStepID.INTEGRATE_CHECKOUT, 

840 title="Integrate Checkout", 

841 description="Set up your integration to start accepting payments", 

842 completed=integration_completed, 

843 ) 

844 ) 

845 

846 # Step 3: Finish account setup 

847 account_setup_complete = self._is_account_setup_complete(organization) 

848 steps.append( 

849 PaymentStep( 

850 id=PaymentStepID.SETUP_ACCOUNT, 

851 title="Finish account setup", 

852 description="Complete your account details and verify your identity", 

853 completed=account_setup_complete, 

854 ) 

855 ) 

856 

857 return PaymentStatusResponse( 

858 payment_ready=await self.is_organization_ready_for_payment( 

859 session, organization 

860 ), 

861 steps=steps, 

862 organization_status=organization.status, 

863 ) 

864 

865 def _is_account_setup_complete(self, organization: Organization) -> bool: 1a

866 """Check if the organization's account setup is complete.""" 

867 if not organization.account_id: 

868 return False 

869 

870 account = organization.account 

871 if not account: 

872 return False 

873 

874 admin = account.admin 

875 return ( 

876 organization.details_submitted_at is not None 

877 and account.is_details_submitted 

878 and (admin.identity_verification_status in ["verified", "pending"]) 

879 ) 

880 

881 async def is_organization_ready_for_payment( 1a

882 self, session: AsyncReadSession, organization: Organization 

883 ) -> bool: 

884 """ 

885 Check if an organization is ready to accept payments. 

886 This method loads the account and admin data as needed, avoiding the need 

887 for eager loading in other services like checkout. 

888 """ 

889 # In sandbox environment, always allow payments regardless of account setup 

890 if settings.ENV == Environment.sandbox: 

891 return True 

892 

893 # First check basic conditions that don't require account data 

894 if ( 

895 organization.is_blocked() 

896 or organization.status == OrganizationStatus.DENIED 

897 ): 

898 return False 

899 

900 # Check grandfathering - if grandfathered, they're ready 

901 cutoff_date = datetime(2025, 8, 4, 9, 0, tzinfo=UTC) 

902 if organization.created_at <= cutoff_date: 

903 return True 

904 

905 # For new organizations, check basic conditions first 

906 if organization.status not in OrganizationStatus.payment_ready_statuses(): 

907 return False 

908 

909 # Details must be submitted (check for empty dict as well) 

910 if not organization.details_submitted_at or not organization.details: 

911 return False 

912 

913 # Must have an active payout account 

914 if organization.account_id is None: 

915 return False 

916 

917 account_repository = AccountRepository.from_session(session) 

918 account = await account_repository.get_by_id( 

919 organization.account_id, options=(joinedload(Account.admin),) 

920 ) 

921 if not account: 

922 return False 

923 

924 # Check admin identity verification status 

925 admin = account.admin 

926 if not admin or admin.identity_verification_status not in [ 

927 IdentityVerificationStatus.verified, 

928 IdentityVerificationStatus.pending, 

929 ]: 

930 return False 

931 

932 return True 

933 

934 async def validate_with_ai( 1a

935 self, session: AsyncSession, organization: Organization 

936 ) -> OrganizationReview: 

937 """Validate organization details using AI and store the result.""" 

938 repository = OrganizationReviewRepository.from_session(session) 

939 previous_validation = await repository.get_by_organization(organization.id) 

940 

941 if previous_validation is not None: 

942 return previous_validation 

943 

944 result = await organization_validator.validate_organization_details( 

945 organization 

946 ) 

947 

948 ai_validation = OrganizationReview( 

949 organization_id=organization.id, 

950 verdict=result.verdict.verdict, 

951 risk_score=result.verdict.risk_score, 

952 violated_sections=result.verdict.violated_sections, 

953 reason=result.verdict.reason, 

954 timed_out=result.timed_out, 

955 organization_details_snapshot={ 

956 "name": organization.name, 

957 "website": organization.website, 

958 "details": organization.details, 

959 "socials": organization.socials, 

960 }, 

961 model_used=organization_validator.model.model_name, 

962 ) 

963 

964 if result.verdict.verdict in ["FAIL", "UNCERTAIN"]: 

965 await self.deny_organization(session, organization) 

966 

967 session.add(ai_validation) 

968 

969 return ai_validation 

970 

971 async def submit_appeal( 1a

972 self, session: AsyncSession, organization: Organization, appeal_reason: str 

973 ) -> OrganizationReview: 

974 """Submit an appeal for organization review and create a Plain ticket.""" 

975 

976 repository = OrganizationReviewRepository.from_session(session) 

977 review = await repository.get_by_organization(organization.id) 

978 

979 if review is None: 

980 raise ValueError("Organization must have a review before submitting appeal") 

981 

982 if review.verdict == OrganizationReview.Verdict.PASS: 

983 raise ValueError("Cannot submit appeal for a passed review") 

984 

985 if review.appeal_submitted_at is not None: 

986 raise ValueError("Appeal has already been submitted for this organization") 

987 

988 review.appeal_submitted_at = datetime.now(UTC) 

989 review.appeal_reason = appeal_reason 

990 

991 session.add(review) 

992 

993 await plain_service.create_appeal_review_thread(session, organization, review) 

994 

995 return review 

996 

997 async def approve_appeal( 1a

998 self, session: AsyncSession, organization: Organization 

999 ) -> OrganizationReview: 

1000 """Approve an organization's appeal and restore payment access.""" 

1001 

1002 repository = OrganizationReviewRepository.from_session(session) 

1003 review = await repository.get_by_organization(organization.id) 

1004 

1005 if review is None: 

1006 raise ValueError("Organization must have a review before approving appeal") 

1007 

1008 if review.appeal_submitted_at is None: 

1009 raise ValueError("No appeal has been submitted for this organization") 

1010 

1011 if review.appeal_decision is not None: 

1012 raise ValueError("Appeal has already been reviewed") 

1013 

1014 organization.status = OrganizationStatus.ACTIVE 

1015 organization.status_updated_at = datetime.now(UTC) 

1016 review.appeal_decision = OrganizationReview.AppealDecision.APPROVED 

1017 review.appeal_reviewed_at = datetime.now(UTC) 

1018 

1019 await self._sync_account_status(session, organization) 

1020 

1021 session.add(organization) 

1022 session.add(review) 

1023 

1024 return review 

1025 

1026 async def deny_appeal( 1a

1027 self, session: AsyncSession, organization: Organization 

1028 ) -> OrganizationReview: 

1029 """Deny an organization's appeal and keep payment access blocked.""" 

1030 

1031 repository = OrganizationReviewRepository.from_session(session) 

1032 review = await repository.get_by_organization(organization.id) 

1033 

1034 if review is None: 

1035 raise ValueError("Organization must have a review before denying appeal") 

1036 

1037 if review.appeal_submitted_at is None: 

1038 raise ValueError("No appeal has been submitted for this organization") 

1039 

1040 if review.appeal_decision is not None: 

1041 raise ValueError("Appeal has already been reviewed") 

1042 

1043 review.appeal_decision = OrganizationReview.AppealDecision.REJECTED 

1044 review.appeal_reviewed_at = datetime.now(UTC) 

1045 

1046 session.add(review) 

1047 

1048 return review 

1049 

1050 

1051organization = OrganizationService() 1a