Coverage for polar/organization/service.py: 19%
415 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import uuid 1a
2from collections.abc import Sequence 1a
3from datetime import UTC, datetime 1a
4from enum import StrEnum 1a
5from typing import TYPE_CHECKING, Any 1a
6from uuid import UUID 1a
8import structlog 1a
9from pydantic import BaseModel, Field 1a
10from sqlalchemy import update as sqlalchemy_update 1a
11from sqlalchemy.exc import IntegrityError 1a
12from sqlalchemy.orm import joinedload 1a
14from polar.account.repository import AccountRepository 1a
15from polar.account.service import account as account_service 1a
16from polar.auth.models import AuthSubject 1a
17from polar.checkout_link.repository import CheckoutLinkRepository 1a
18from polar.config import Environment, settings 1a
19from polar.customer.repository import CustomerRepository 1a
20from polar.enums import InvoiceNumbering 1a
21from polar.exceptions import NotPermitted, PolarError, PolarRequestValidationError 1a
22from polar.integrations.loops.service import loops as loops_service 1a
23from polar.integrations.plain.service import plain as plain_service 1a
24from polar.kit.anonymization import anonymize_email_for_deletion, anonymize_for_deletion 1a
25from polar.kit.pagination import PaginationParams 1a
26from polar.kit.repository import Options 1a
27from polar.kit.sorting import Sorting 1a
28from polar.models import ( 1a
29 Account,
30 Customer,
31 Order,
32 Organization,
33 Subscription,
34 User,
35 UserOrganization,
36)
37from polar.models.organization import OrganizationStatus 1a
38from polar.models.organization_review import OrganizationReview 1a
39from polar.models.subscription import SubscriptionStatus 1a
40from polar.models.transaction import TransactionType 1a
41from polar.models.user import IdentityVerificationStatus 1a
42from polar.models.webhook_endpoint import WebhookEventType 1a
43from polar.organization.ai_validation import validator as organization_validator 1a
44from polar.organization_access_token.repository import OrganizationAccessTokenRepository 1a
45from polar.postgres import AsyncReadSession, AsyncSession, sql 1a
46from polar.posthog import posthog 1a
47from polar.product.repository import ProductRepository 1a
48from polar.transaction.service.transaction import transaction as transaction_service 1a
49from polar.webhook.service import webhook as webhook_service 1a
50from polar.worker import enqueue_job 1a
52from .repository import OrganizationRepository, OrganizationReviewRepository 1a
53from .schemas import ( 1a
54 OrganizationCreate,
55 OrganizationDeletionBlockedReason,
56 OrganizationUpdate,
57)
58from .sorting import OrganizationSortProperty 1a
60if TYPE_CHECKING: 60 ↛ 61line 60 didn't jump to line 61 because the condition on line 60 was never true1a
61 pass
63log = structlog.get_logger() 1a
66class PaymentStepID(StrEnum): 1a
67 """Enum for payment onboarding step identifiers."""
69 CREATE_PRODUCT = "create_product" 1a
70 INTEGRATE_CHECKOUT = "integrate_checkout" 1a
71 SETUP_ACCOUNT = "setup_account" 1a
74class PaymentStep(BaseModel): 1a
75 """Service-level model for payment onboarding steps."""
77 id: str = Field(description="Step identifier") 1a
78 title: str = Field(description="Step title") 1a
79 description: str = Field(description="Step description") 1a
80 completed: bool = Field(description="Whether the step is completed") 1a
83class PaymentStatusResponse(BaseModel): 1a
84 """Service-level response for payment status."""
86 payment_ready: bool = Field( 1a
87 description="Whether the organization is ready to accept payments"
88 )
89 steps: list[PaymentStep] = Field(description="List of onboarding steps") 1a
90 organization_status: OrganizationStatus = Field( 1a
91 description="Current organization status"
92 )
95class OrganizationDeletionCheckResult(BaseModel): 1a
96 """Result of checking if an organization can be deleted."""
98 can_delete_immediately: bool = Field( 1a
99 description="Whether the organization can be deleted immediately"
100 )
101 blocked_reasons: list[OrganizationDeletionBlockedReason] = Field( 1a
102 default_factory=list,
103 description="Reasons why immediate deletion is blocked",
104 )
107class OrganizationError(PolarError): ... 1a
110class InvalidAccount(OrganizationError): 1a
111 def __init__(self, account_id: UUID) -> None: 1a
112 self.account_id = account_id
113 message = (
114 f"The account {account_id} does not exist or you don't have access to it."
115 )
116 super().__init__(message)
119class AccountAlreadySet(OrganizationError): 1a
120 def __init__(self, organization_slug: str) -> None: 1a
121 self.organization_slug = organization_slug
122 message = f"The account for organization '{organization_slug}' has already been set up by the owner. Contact support to change the owner of the account."
123 super().__init__(message, 403)
126class OrganizationService: 1a
127 async def list( 1a
128 self,
129 session: AsyncReadSession,
130 auth_subject: AuthSubject[User | Organization],
131 *,
132 slug: str | None = None,
133 pagination: PaginationParams,
134 sorting: list[Sorting[OrganizationSortProperty]] = [
135 (OrganizationSortProperty.created_at, False)
136 ],
137 ) -> tuple[Sequence[Organization], int]:
138 repository = OrganizationRepository.from_session(session)
139 statement = repository.get_readable_statement(auth_subject)
141 if slug is not None:
142 statement = statement.where(Organization.slug == slug)
144 statement = repository.apply_sorting(statement, sorting)
146 return await repository.paginate(
147 statement, limit=pagination.limit, page=pagination.page
148 )
150 async def get( 1a
151 self,
152 session: AsyncReadSession,
153 auth_subject: AuthSubject[User | Organization],
154 id: uuid.UUID,
155 *,
156 options: Options = (),
157 ) -> Organization | None:
158 repository = OrganizationRepository.from_session(session) 1b
159 statement = ( 1b
160 repository.get_readable_statement(auth_subject)
161 .where(Organization.id == id)
162 .options(*options)
163 )
164 return await repository.get_one_or_none(statement) 1b
166 async def get_anonymous( 1a
167 self,
168 session: AsyncReadSession,
169 id: uuid.UUID,
170 *,
171 options: Options = (),
172 ) -> Organization | None:
173 """Use it with precaution! Get organization by ID for anonymous users."""
174 repository = OrganizationRepository.from_session(session)
175 statement = (
176 repository.get_base_statement()
177 .where(Organization.blocked_at.is_(None))
178 .where(Organization.id == id)
179 .options(*options)
180 )
182 return await repository.get_one_or_none(statement)
184 async def create( 1a
185 self,
186 session: AsyncSession,
187 create_schema: OrganizationCreate,
188 auth_subject: AuthSubject[User],
189 ) -> Organization:
190 repository = OrganizationRepository.from_session(session)
191 if await repository.slug_exists(create_schema.slug):
192 raise PolarRequestValidationError(
193 [
194 {
195 "loc": ("body", "slug"),
196 "msg": "An organization with this slug already exists.",
197 "type": "value_error",
198 "input": create_schema.slug,
199 }
200 ]
201 )
203 organization = await repository.create(
204 Organization(
205 **create_schema.model_dump(exclude_unset=True, exclude_none=True),
206 customer_invoice_prefix=create_schema.slug.upper(),
207 )
208 )
209 await self.add_user(session, organization, auth_subject.subject)
211 enqueue_job("organization.created", organization_id=organization.id)
213 posthog.auth_subject_event(
214 auth_subject,
215 "organizations",
216 "create",
217 "done",
218 {
219 "id": organization.id,
220 "name": organization.name,
221 "slug": organization.slug,
222 },
223 )
224 return organization
226 async def update( 1a
227 self,
228 session: AsyncSession,
229 organization: Organization,
230 update_schema: OrganizationUpdate,
231 ) -> Organization:
232 repository = OrganizationRepository.from_session(session)
234 if organization.onboarded_at is None:
235 organization.onboarded_at = datetime.now(UTC)
237 if update_schema.feature_settings is not None:
238 organization.feature_settings = {
239 **organization.feature_settings,
240 **update_schema.feature_settings.model_dump(
241 mode="json", exclude_unset=True, exclude_none=True
242 ),
243 }
245 if update_schema.subscription_settings is not None:
246 organization.subscription_settings = update_schema.subscription_settings
248 if update_schema.notification_settings is not None:
249 organization.notification_settings = update_schema.notification_settings
251 previous_details = organization.details
252 update_dict = update_schema.model_dump(
253 by_alias=True,
254 exclude_unset=True,
255 exclude={
256 "profile_settings",
257 "feature_settings",
258 "subscription_settings",
259 "details",
260 },
261 )
263 # Only store details once to avoid API overrides later w/o review
264 if not previous_details and update_schema.details:
265 organization.details = update_schema.details.model_dump()
266 organization.details_submitted_at = datetime.now(UTC)
268 organization = await repository.update(organization, update_dict=update_dict)
270 await self._after_update(session, organization)
271 return organization
273 async def delete( 1a
274 self,
275 session: AsyncSession,
276 organization: Organization,
277 ) -> Organization:
278 """Anonymizes fields on the Organization that can contain PII and then
279 soft-deletes the Organization.
281 DOES NOT:
282 - Delete or anonymize Users related Organization
283 - Delete or anonymize Account of the Organization
284 - Delete or anonymize Customers, Products, Discounts, Benefits, Checkouts of the Organization
285 - Revoke Benefits granted
286 - Remove API tokens (organization or personal)
287 """
288 repository = OrganizationRepository.from_session(session)
290 update_dict: dict[str, Any] = {}
292 pii_fields = ["name", "slug", "website", "customer_invoice_prefix"]
293 github_fields = ["bio", "company", "blog", "location", "twitter_username"]
294 for pii_field in pii_fields + github_fields:
295 value = getattr(organization, pii_field)
296 if value:
297 update_dict[pii_field] = anonymize_for_deletion(value)
299 if organization.email:
300 update_dict["email"] = anonymize_email_for_deletion(organization.email)
302 if organization.avatar_url:
303 # Anonymize by setting to Polar logo
304 update_dict["avatar_url"] = (
305 "https://avatars.githubusercontent.com/u/105373340?s=48&v=4"
306 )
307 if organization.details:
308 update_dict["details"] = {}
310 if organization.socials:
311 update_dict["socials"] = []
313 organization = await repository.update(organization, update_dict=update_dict)
314 await repository.soft_delete(organization)
316 return organization
318 async def check_can_delete( 1a
319 self,
320 session: AsyncReadSession,
321 organization: Organization,
322 ) -> OrganizationDeletionCheckResult:
323 """Check if an organization can be deleted immediately.
325 An organization can be deleted immediately if it has:
326 - No orders
327 - No active subscriptions
329 If it has an account but no orders/subscriptions, we'll attempt to
330 delete the Stripe account first.
331 """
332 blocked_reasons: list[OrganizationDeletionBlockedReason] = []
334 # Check for orders
335 order_count = await self._count_orders_by_organization(session, organization.id)
336 if order_count > 0:
337 blocked_reasons.append(OrganizationDeletionBlockedReason.HAS_ORDERS)
339 # Check for active subscriptions
340 active_subscription_count = (
341 await self._count_active_subscriptions_by_organization(
342 session, organization.id
343 )
344 )
345 if active_subscription_count > 0:
346 blocked_reasons.append(
347 OrganizationDeletionBlockedReason.HAS_ACTIVE_SUBSCRIPTIONS
348 )
350 return OrganizationDeletionCheckResult(
351 can_delete_immediately=len(blocked_reasons) == 0,
352 blocked_reasons=blocked_reasons,
353 )
355 async def request_deletion( 1a
356 self,
357 session: AsyncSession,
358 auth_subject: AuthSubject[User],
359 organization: Organization,
360 ) -> OrganizationDeletionCheckResult:
361 """Request deletion of an organization.
363 Authorization:
364 - If the organization has an account, only the account admin can delete
365 - If there is no account, any organization member can delete
367 Flow:
368 1. Check authorization
369 2. Check for orders/subscriptions -> if blocked, create support ticket
370 3. If has account -> try to delete Stripe account
371 4. If Stripe deletion fails -> create support ticket
372 5. Soft delete organization
373 """
374 # Authorization check: only account admin can delete if account exists
375 if organization.account_id is not None:
376 is_admin = await account_service.is_user_admin(
377 session, organization.account_id, auth_subject.subject
378 )
379 if not is_admin:
380 raise NotPermitted(
381 "Only the account admin can delete an organization with an account"
382 )
384 check_result = await self.check_can_delete(session, organization)
386 if not check_result.can_delete_immediately:
387 # Organization has orders or active subscriptions
388 enqueue_job(
389 "organization.deletion_requested",
390 organization_id=organization.id,
391 user_id=auth_subject.subject.id,
392 blocked_reasons=[r.value for r in check_result.blocked_reasons],
393 )
394 return check_result
396 # Organization is eligible for deletion
397 # If it has an account, try to delete it first
398 if organization.account_id is not None:
399 try:
400 await self._delete_account(session, organization)
401 except Exception as e:
402 log.error(
403 "organization.deletion.stripe_account_deletion_failed",
404 organization_id=organization.id,
405 error=str(e),
406 )
407 # Stripe deletion failed, create support ticket
408 check_result = OrganizationDeletionCheckResult(
409 can_delete_immediately=False,
410 blocked_reasons=[
411 OrganizationDeletionBlockedReason.STRIPE_ACCOUNT_DELETION_FAILED
412 ],
413 )
414 enqueue_job(
415 "organization.deletion_requested",
416 organization_id=organization.id,
417 user_id=auth_subject.subject.id,
418 blocked_reasons=[r.value for r in check_result.blocked_reasons],
419 )
420 return check_result
422 # Soft delete the organization
423 await self.soft_delete_organization(session, organization)
425 return OrganizationDeletionCheckResult(
426 can_delete_immediately=True,
427 blocked_reasons=[],
428 )
430 async def soft_delete_organization( 1a
431 self,
432 session: AsyncSession,
433 organization: Organization,
434 ) -> Organization:
435 """Soft-delete an organization, preserving the slug for backoffice links.
437 Anonymizes PII fields (except slug) and sets deleted_at timestamp.
438 """
439 repository = OrganizationRepository.from_session(session)
441 update_dict: dict[str, Any] = {}
443 # Anonymize PII fields but NOT slug (to keep backoffice links working)
444 pii_fields = ["name", "website", "customer_invoice_prefix"]
445 github_fields = ["bio", "company", "blog", "location", "twitter_username"]
446 for pii_field in pii_fields + github_fields:
447 value = getattr(organization, pii_field)
448 if value:
449 update_dict[pii_field] = anonymize_for_deletion(value)
451 if organization.email:
452 update_dict["email"] = anonymize_email_for_deletion(organization.email)
454 if organization.avatar_url:
455 # Anonymize by setting to Polar logo
456 update_dict["avatar_url"] = (
457 "https://avatars.githubusercontent.com/u/105373340?s=48&v=4"
458 )
460 if organization.details:
461 update_dict["details"] = {}
463 if organization.socials:
464 update_dict["socials"] = []
466 organization = await repository.update(organization, update_dict=update_dict)
467 await repository.soft_delete(organization)
469 log.info(
470 "organization.deleted",
471 organization_id=organization.id,
472 slug=organization.slug,
473 )
475 return organization
477 async def _delete_account( 1a
478 self,
479 session: AsyncSession,
480 organization: Organization,
481 ) -> None:
482 """Delete the Stripe account linked to an organization."""
483 if organization.account_id is None:
484 return
486 account_repository = AccountRepository.from_session(session)
487 account = await account_repository.get_by_id(organization.account_id)
489 if account is None:
490 return
492 if account.stripe_id:
493 await account_service.delete_stripe_account(session, account)
495 organization.account_id = None
496 session.add(organization)
498 await account_service.delete(session, account)
500 log.info(
501 "organization.account_deleted",
502 organization_id=organization.id,
503 account_id=account.id,
504 )
506 async def _count_orders_by_organization( 1a
507 self,
508 session: AsyncReadSession,
509 organization_id: UUID,
510 ) -> int:
511 """Count orders for all customers of this organization."""
512 statement = (
513 sql.select(sql.func.count(Order.id))
514 .join(Customer, Order.customer_id == Customer.id)
515 .where(
516 Customer.organization_id == organization_id,
517 Customer.deleted_at.is_(None),
518 )
519 )
520 result = await session.execute(statement)
521 return result.scalar() or 0
523 async def _count_active_subscriptions_by_organization( 1a
524 self,
525 session: AsyncReadSession,
526 organization_id: UUID,
527 ) -> int:
528 """Count active subscriptions for all customers of this organization."""
529 statement = (
530 sql.select(sql.func.count(Subscription.id))
531 .join(Customer, Subscription.customer_id == Customer.id)
532 .where(
533 Customer.organization_id == organization_id,
534 Customer.deleted_at.is_(None),
535 Subscription.status.in_(SubscriptionStatus.active_statuses()),
536 )
537 )
538 result = await session.execute(statement)
539 return result.scalar() or 0
541 async def add_user( 1a
542 self,
543 session: AsyncSession,
544 organization: Organization,
545 user: User,
546 ) -> None:
547 nested = await session.begin_nested()
548 try:
549 relation = UserOrganization(
550 user_id=user.id, organization_id=organization.id
551 )
552 session.add(relation)
553 await session.flush()
554 log.info(
555 "organization.add_user.created",
556 user_id=user.id,
557 organization_id=organization.id,
558 )
559 except IntegrityError:
560 # TODO: Currently, we treat this as success since the connection
561 # exists. However, once we use status to distinguish active/inactive
562 # installations we need to change this.
563 log.info(
564 "organization.add_user.already_exists",
565 organization_id=organization.id,
566 user_id=user.id,
567 )
568 await nested.rollback()
569 # Update
570 stmt = (
571 sql.Update(UserOrganization)
572 .where(
573 UserOrganization.user_id == user.id,
574 UserOrganization.organization_id == organization.id,
575 )
576 .values(
577 deleted_at=None, # un-delete user if exists
578 )
579 )
580 await session.execute(stmt)
581 await session.flush()
582 finally:
583 await loops_service.user_organization_added(session, user)
585 async def set_account( 1a
586 self,
587 session: AsyncSession,
588 auth_subject: AuthSubject[User | Organization],
589 organization: Organization,
590 account_id: UUID,
591 ) -> Organization:
592 if organization.account_id is not None:
593 raise AccountAlreadySet(organization.slug)
595 account = await account_service.get(session, auth_subject, account_id)
596 if account is None:
597 raise InvalidAccount(account_id)
599 repository = OrganizationRepository.from_session(session)
600 organization = await repository.update(
601 organization, update_dict={"account": account}
602 )
604 enqueue_job("organization.account_set", organization.id)
606 await self._after_update(session, organization)
608 return organization
610 async def get_next_invoice_number( 1a
611 self,
612 session: AsyncSession,
613 organization: Organization,
614 customer: "Customer",
615 ) -> str:
616 match organization.invoice_numbering:
617 case InvoiceNumbering.customer:
618 invoice_number = f"{organization.customer_invoice_prefix}-{customer.short_id_str}-{customer.invoice_next_number:04d}"
619 customer_repository = CustomerRepository.from_session(session)
620 customer = await customer_repository.update(
621 customer,
622 update_dict={
623 "invoice_next_number": customer.invoice_next_number + 1
624 },
625 )
626 return invoice_number
628 case InvoiceNumbering.organization:
629 invoice_number = f"{organization.customer_invoice_prefix}-{organization.customer_invoice_next_number:04d}"
630 repository = OrganizationRepository.from_session(session)
631 organization = await repository.update(
632 organization,
633 update_dict={
634 "customer_invoice_next_number": organization.customer_invoice_next_number
635 + 1
636 },
637 )
638 return invoice_number
640 async def _after_update( 1a
641 self,
642 session: AsyncSession,
643 organization: Organization,
644 ) -> None:
645 await webhook_service.send(
646 session, organization, WebhookEventType.organization_updated, organization
647 )
649 async def check_review_threshold( 1a
650 self, session: AsyncSession, organization: Organization
651 ) -> Organization:
652 if organization.is_under_review:
653 return organization
655 transfers_sum = await transaction_service.get_transactions_sum(
656 session, organization.account_id, type=TransactionType.balance
657 )
658 if (
659 organization.next_review_threshold >= 0
660 and transfers_sum >= organization.next_review_threshold
661 ):
662 organization.status = (
663 OrganizationStatus.ONGOING_REVIEW
664 if organization.initially_reviewed_at is not None
665 else OrganizationStatus.INITIAL_REVIEW
666 )
667 organization.status_updated_at = datetime.now(UTC)
668 await self._sync_account_status(session, organization)
669 session.add(organization)
671 enqueue_job("organization.under_review", organization_id=organization.id)
673 return organization
675 async def confirm_organization_reviewed( 1a
676 self,
677 session: AsyncSession,
678 organization: Organization,
679 next_review_threshold: int,
680 ) -> Organization:
681 organization.status = OrganizationStatus.ACTIVE
682 organization.status_updated_at = datetime.now(UTC)
683 organization.next_review_threshold = next_review_threshold
685 initial_review = False
686 if organization.initially_reviewed_at is None:
687 organization.initially_reviewed_at = datetime.now(UTC)
688 initial_review = True
690 await self._sync_account_status(session, organization)
691 session.add(organization)
693 # If there's a pending appeal, mark it as approved
694 review_repository = OrganizationReviewRepository.from_session(session)
695 review = await review_repository.get_by_organization(organization.id)
696 if review and review.appeal_submitted_at and review.appeal_decision is None:
697 review.appeal_decision = OrganizationReview.AppealDecision.APPROVED
698 review.appeal_reviewed_at = datetime.now(UTC)
699 session.add(review)
701 enqueue_job(
702 "organization.reviewed",
703 organization_id=organization.id,
704 initial_review=initial_review,
705 )
706 return organization
708 async def deny_organization( 1a
709 self, session: AsyncSession, organization: Organization
710 ) -> Organization:
711 organization.status = OrganizationStatus.DENIED
712 organization.status_updated_at = datetime.now(UTC)
713 await self._sync_account_status(session, organization)
714 session.add(organization)
716 # If there's a pending appeal, mark it as rejected
717 review_repository = OrganizationReviewRepository.from_session(session)
718 review = await review_repository.get_by_organization(organization.id)
719 if review and review.appeal_submitted_at and review.appeal_decision is None:
720 review.appeal_decision = OrganizationReview.AppealDecision.REJECTED
721 review.appeal_reviewed_at = datetime.now(UTC)
722 session.add(review)
724 return organization
726 async def set_organization_under_review( 1a
727 self, session: AsyncSession, organization: Organization
728 ) -> Organization:
729 organization.status = OrganizationStatus.ONGOING_REVIEW
730 organization.status_updated_at = datetime.now(UTC)
731 await self._sync_account_status(session, organization)
732 session.add(organization)
733 enqueue_job("organization.under_review", organization_id=organization.id)
734 return organization
736 async def update_status_from_stripe_account( 1a
737 self, session: AsyncSession, account: Account
738 ) -> None:
739 """Update organization status based on Stripe account capabilities."""
740 repository = OrganizationRepository.from_session(session)
741 organizations = await repository.get_all_by_account(account.id)
743 for organization in organizations:
744 # Don't override organizations that are denied
745 if organization.status == OrganizationStatus.DENIED:
746 continue
748 # If account is fully set up, set organization to ACTIVE
749 if all(
750 (
751 not organization.is_under_review,
752 not organization.is_active(),
753 account.currency is not None,
754 account.is_details_submitted,
755 account.is_charges_enabled,
756 account.is_payouts_enabled,
757 )
758 ):
759 organization.status = OrganizationStatus.ACTIVE
760 organization.status_updated_at = datetime.now(UTC)
762 # If Stripe disables some capabilities, reset to ONBOARDING_STARTED
763 if any(
764 (
765 not account.is_details_submitted,
766 not account.is_charges_enabled,
767 not account.is_payouts_enabled,
768 )
769 ):
770 organization.status = OrganizationStatus.ONBOARDING_STARTED
771 organization.status_updated_at = datetime.now(UTC)
773 await self._sync_account_status(session, organization)
774 session.add(organization)
776 async def _sync_account_status( 1a
777 self, session: AsyncSession, organization: Organization
778 ) -> None:
779 """Sync organization status to the related account."""
780 if not organization.account_id:
781 return
783 # Map organization status to account status
784 status_mapping = {
785 OrganizationStatus.ONBOARDING_STARTED: Account.Status.ONBOARDING_STARTED,
786 OrganizationStatus.ACTIVE: Account.Status.ACTIVE,
787 OrganizationStatus.INITIAL_REVIEW: Account.Status.UNDER_REVIEW,
788 OrganizationStatus.ONGOING_REVIEW: Account.Status.UNDER_REVIEW,
789 OrganizationStatus.DENIED: Account.Status.DENIED,
790 }
792 if organization.status in status_mapping:
793 account_status = status_mapping[organization.status]
794 await session.execute(
795 sqlalchemy_update(Account)
796 .where(Account.id == organization.account_id)
797 .values(status=account_status)
798 )
800 async def get_payment_status( 1a
801 self,
802 session: AsyncReadSession,
803 organization: Organization,
804 account_verification_only: bool = False,
805 ) -> PaymentStatusResponse:
806 """Get payment status and onboarding steps for an organization."""
807 steps = []
809 if not account_verification_only:
810 # Step 1: Create a product
811 product_repository = ProductRepository.from_session(session)
812 product_count = await product_repository.count_by_organization_id(
813 organization.id, is_archived=False
814 )
815 steps.append(
816 PaymentStep(
817 id=PaymentStepID.CREATE_PRODUCT,
818 title="Create a product",
819 description="Create your first product to start accepting payments",
820 completed=product_count > 0,
821 )
822 )
824 # Step 2: Integrate Checkout (API key OR checkout link)
825 token_repository = OrganizationAccessTokenRepository.from_session(session)
826 api_key_count = await token_repository.count_by_organization_id(
827 organization.id
828 )
830 checkout_link_repository = CheckoutLinkRepository.from_session(session)
831 checkout_link_count = (
832 await checkout_link_repository.count_by_organization_id(organization.id)
833 )
835 # Step is completed if user has either an API key OR a checkout link
836 integration_completed = api_key_count > 0 or checkout_link_count > 0
837 steps.append(
838 PaymentStep(
839 id=PaymentStepID.INTEGRATE_CHECKOUT,
840 title="Integrate Checkout",
841 description="Set up your integration to start accepting payments",
842 completed=integration_completed,
843 )
844 )
846 # Step 3: Finish account setup
847 account_setup_complete = self._is_account_setup_complete(organization)
848 steps.append(
849 PaymentStep(
850 id=PaymentStepID.SETUP_ACCOUNT,
851 title="Finish account setup",
852 description="Complete your account details and verify your identity",
853 completed=account_setup_complete,
854 )
855 )
857 return PaymentStatusResponse(
858 payment_ready=await self.is_organization_ready_for_payment(
859 session, organization
860 ),
861 steps=steps,
862 organization_status=organization.status,
863 )
865 def _is_account_setup_complete(self, organization: Organization) -> bool: 1a
866 """Check if the organization's account setup is complete."""
867 if not organization.account_id:
868 return False
870 account = organization.account
871 if not account:
872 return False
874 admin = account.admin
875 return (
876 organization.details_submitted_at is not None
877 and account.is_details_submitted
878 and (admin.identity_verification_status in ["verified", "pending"])
879 )
881 async def is_organization_ready_for_payment( 1a
882 self, session: AsyncReadSession, organization: Organization
883 ) -> bool:
884 """
885 Check if an organization is ready to accept payments.
886 This method loads the account and admin data as needed, avoiding the need
887 for eager loading in other services like checkout.
888 """
889 # In sandbox environment, always allow payments regardless of account setup
890 if settings.ENV == Environment.sandbox:
891 return True
893 # First check basic conditions that don't require account data
894 if (
895 organization.is_blocked()
896 or organization.status == OrganizationStatus.DENIED
897 ):
898 return False
900 # Check grandfathering - if grandfathered, they're ready
901 cutoff_date = datetime(2025, 8, 4, 9, 0, tzinfo=UTC)
902 if organization.created_at <= cutoff_date:
903 return True
905 # For new organizations, check basic conditions first
906 if organization.status not in OrganizationStatus.payment_ready_statuses():
907 return False
909 # Details must be submitted (check for empty dict as well)
910 if not organization.details_submitted_at or not organization.details:
911 return False
913 # Must have an active payout account
914 if organization.account_id is None:
915 return False
917 account_repository = AccountRepository.from_session(session)
918 account = await account_repository.get_by_id(
919 organization.account_id, options=(joinedload(Account.admin),)
920 )
921 if not account:
922 return False
924 # Check admin identity verification status
925 admin = account.admin
926 if not admin or admin.identity_verification_status not in [
927 IdentityVerificationStatus.verified,
928 IdentityVerificationStatus.pending,
929 ]:
930 return False
932 return True
934 async def validate_with_ai( 1a
935 self, session: AsyncSession, organization: Organization
936 ) -> OrganizationReview:
937 """Validate organization details using AI and store the result."""
938 repository = OrganizationReviewRepository.from_session(session)
939 previous_validation = await repository.get_by_organization(organization.id)
941 if previous_validation is not None:
942 return previous_validation
944 result = await organization_validator.validate_organization_details(
945 organization
946 )
948 ai_validation = OrganizationReview(
949 organization_id=organization.id,
950 verdict=result.verdict.verdict,
951 risk_score=result.verdict.risk_score,
952 violated_sections=result.verdict.violated_sections,
953 reason=result.verdict.reason,
954 timed_out=result.timed_out,
955 organization_details_snapshot={
956 "name": organization.name,
957 "website": organization.website,
958 "details": organization.details,
959 "socials": organization.socials,
960 },
961 model_used=organization_validator.model.model_name,
962 )
964 if result.verdict.verdict in ["FAIL", "UNCERTAIN"]:
965 await self.deny_organization(session, organization)
967 session.add(ai_validation)
969 return ai_validation
971 async def submit_appeal( 1a
972 self, session: AsyncSession, organization: Organization, appeal_reason: str
973 ) -> OrganizationReview:
974 """Submit an appeal for organization review and create a Plain ticket."""
976 repository = OrganizationReviewRepository.from_session(session)
977 review = await repository.get_by_organization(organization.id)
979 if review is None:
980 raise ValueError("Organization must have a review before submitting appeal")
982 if review.verdict == OrganizationReview.Verdict.PASS:
983 raise ValueError("Cannot submit appeal for a passed review")
985 if review.appeal_submitted_at is not None:
986 raise ValueError("Appeal has already been submitted for this organization")
988 review.appeal_submitted_at = datetime.now(UTC)
989 review.appeal_reason = appeal_reason
991 session.add(review)
993 await plain_service.create_appeal_review_thread(session, organization, review)
995 return review
997 async def approve_appeal( 1a
998 self, session: AsyncSession, organization: Organization
999 ) -> OrganizationReview:
1000 """Approve an organization's appeal and restore payment access."""
1002 repository = OrganizationReviewRepository.from_session(session)
1003 review = await repository.get_by_organization(organization.id)
1005 if review is None:
1006 raise ValueError("Organization must have a review before approving appeal")
1008 if review.appeal_submitted_at is None:
1009 raise ValueError("No appeal has been submitted for this organization")
1011 if review.appeal_decision is not None:
1012 raise ValueError("Appeal has already been reviewed")
1014 organization.status = OrganizationStatus.ACTIVE
1015 organization.status_updated_at = datetime.now(UTC)
1016 review.appeal_decision = OrganizationReview.AppealDecision.APPROVED
1017 review.appeal_reviewed_at = datetime.now(UTC)
1019 await self._sync_account_status(session, organization)
1021 session.add(organization)
1022 session.add(review)
1024 return review
1026 async def deny_appeal( 1a
1027 self, session: AsyncSession, organization: Organization
1028 ) -> OrganizationReview:
1029 """Deny an organization's appeal and keep payment access blocked."""
1031 repository = OrganizationReviewRepository.from_session(session)
1032 review = await repository.get_by_organization(organization.id)
1034 if review is None:
1035 raise ValueError("Organization must have a review before denying appeal")
1037 if review.appeal_submitted_at is None:
1038 raise ValueError("No appeal has been submitted for this organization")
1040 if review.appeal_decision is not None:
1041 raise ValueError("Appeal has already been reviewed")
1043 review.appeal_decision = OrganizationReview.AppealDecision.REJECTED
1044 review.appeal_reviewed_at = datetime.now(UTC)
1046 session.add(review)
1048 return review
1051organization = OrganizationService() 1a