Coverage for polar/user/service.py: 26%
121 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1from typing import Any 1a
2from uuid import UUID 1a
4import stripe as stripe_lib 1a
5import structlog 1a
6from sqlalchemy import delete 1a
8from polar.exceptions import PolarError 1a
9from polar.integrations.stripe.service import stripe as stripe_service 1a
10from polar.kit.anonymization import anonymize_email_for_deletion 1a
11from polar.models import OAuthAccount, User 1a
12from polar.models.user import IdentityVerificationStatus 1a
13from polar.organization.repository import OrganizationRepository 1a
14from polar.postgres import AsyncSession 1a
15from polar.worker import enqueue_job 1a
17from .repository import UserRepository 1a
18from .schemas import ( 1a
19 BlockingOrganization,
20 UserDeletionBlockedReason,
21 UserDeletionResponse,
22 UserIdentityVerification,
23 UserSignupAttribution,
24)
26log = structlog.get_logger() 1a
29class UserError(PolarError): ... 1a
32class IdentityAlreadyVerified(UserError): 1a
33 def __init__(self, user_id: UUID) -> None: 1a
34 self.user_id = user_id
35 message = "Your identity is already verified."
36 super().__init__(message, 403)
39class IdentityVerificationProcessing(UserError): 1a
40 def __init__(self, user_id: UUID) -> None: 1a
41 self.user_id = user_id
42 message = "Your identity verification is still processing."
43 super().__init__(message, 403)
46class IdentityVerificationDoesNotExist(UserError): 1a
47 def __init__(self, identity_verification_id: str) -> None: 1a
48 self.identity_verification_id = identity_verification_id
49 message = (
50 f"Received identity verification {identity_verification_id} from Stripe, "
51 "but no associated User exists."
52 )
53 super().__init__(message)
56class InvalidAccount(UserError): 1a
57 def __init__(self, account_id: UUID) -> None: 1a
58 self.account_id = account_id
59 message = (
60 f"The account {account_id} does not exist or you don't have access to it."
61 )
62 super().__init__(message)
65class UserService: 1a
66 async def get_by_email_or_create( 1a
67 self,
68 session: AsyncSession,
69 email: str,
70 *,
71 signup_attribution: UserSignupAttribution | None = None,
72 ) -> tuple[User, bool]:
73 repository = UserRepository.from_session(session)
74 user = await repository.get_by_email(email)
75 created = False
76 if user is None:
77 user = await self.create_by_email(
78 session, email, signup_attribution=signup_attribution
79 )
80 created = True
82 return (user, created)
84 async def create_by_email( 1a
85 self,
86 session: AsyncSession,
87 email: str,
88 signup_attribution: UserSignupAttribution | None = None,
89 ) -> User:
90 repository = UserRepository.from_session(session)
91 user = await repository.create(
92 User(
93 email=email,
94 oauth_accounts=[],
95 signup_attribution=signup_attribution,
96 ),
97 flush=True,
98 )
99 enqueue_job("user.on_after_signup", user_id=user.id)
100 return user
102 async def create_identity_verification( 1a
103 self, session: AsyncSession, user: User
104 ) -> UserIdentityVerification:
105 if user.identity_verified:
106 raise IdentityAlreadyVerified(user.id)
108 if user.identity_verification_status == IdentityVerificationStatus.pending:
109 raise IdentityVerificationProcessing(user.id)
111 verification_session: stripe_lib.identity.VerificationSession | None = None
112 if user.identity_verification_id is not None:
113 verification_session = await stripe_service.get_verification_session(
114 user.identity_verification_id
115 )
117 if (
118 verification_session is None
119 or verification_session.status != "requires_input"
120 ):
121 verification_session = await stripe_service.create_verification_session(
122 user
123 )
125 repository = UserRepository.from_session(session)
126 await repository.update(
127 user, update_dict={"identity_verification_id": verification_session.id}
128 )
130 assert verification_session.client_secret is not None
131 return UserIdentityVerification(
132 id=verification_session.id, client_secret=verification_session.client_secret
133 )
135 async def identity_verification_verified( 1a
136 self,
137 session: AsyncSession,
138 verification_session: stripe_lib.identity.VerificationSession,
139 ) -> User:
140 repository = UserRepository.from_session(session)
141 user = await repository.get_by_identity_verification_id(verification_session.id)
142 if user is None:
143 raise IdentityVerificationDoesNotExist(verification_session.id)
145 assert verification_session.status == "verified"
146 return await repository.update(
147 user,
148 update_dict={
149 "identity_verification_status": IdentityVerificationStatus.verified
150 },
151 )
153 async def identity_verification_pending( 1a
154 self,
155 session: AsyncSession,
156 verification_session: stripe_lib.identity.VerificationSession,
157 ) -> User:
158 repository = UserRepository.from_session(session)
159 user = await repository.get_by_identity_verification_id(verification_session.id)
160 if user is None:
161 raise IdentityVerificationDoesNotExist(verification_session.id)
163 # If the user is already verified, we don't need to update their status.
164 # Might happen if the webhook was delayed
165 if user.identity_verified:
166 return user
168 assert verification_session.status == "processing"
169 return await repository.update(
170 user,
171 update_dict={
172 "identity_verification_status": IdentityVerificationStatus.pending
173 },
174 )
176 async def identity_verification_failed( 1a
177 self,
178 session: AsyncSession,
179 verification_session: stripe_lib.identity.VerificationSession,
180 ) -> User:
181 repository = UserRepository.from_session(session)
182 user = await repository.get_by_identity_verification_id(verification_session.id)
183 if user is None:
184 raise IdentityVerificationDoesNotExist(verification_session.id)
186 # TODO: should we send an email?
188 return await repository.update(
189 user,
190 update_dict={
191 "identity_verification_status": IdentityVerificationStatus.failed
192 },
193 )
195 async def check_can_delete( 1a
196 self,
197 session: AsyncSession,
198 user: User,
199 ) -> UserDeletionResponse:
200 """Check if a user can be deleted.
202 A user can be deleted if all organizations they are members of
203 are soft-deleted (deleted_at is not None).
204 """
205 blocked_reasons: list[UserDeletionBlockedReason] = []
206 blocking_organizations: list[BlockingOrganization] = []
208 # Get all organizations the user is a member of (excluding deleted orgs)
209 org_repository = OrganizationRepository.from_session(session)
210 organizations = await org_repository.get_all_by_user(user.id)
212 if organizations:
213 blocked_reasons.append(UserDeletionBlockedReason.HAS_ACTIVE_ORGANIZATIONS)
214 for org in organizations:
215 blocking_organizations.append(
216 BlockingOrganization(id=org.id, slug=org.slug, name=org.name)
217 )
219 return UserDeletionResponse(
220 deleted=False,
221 blocked_reasons=blocked_reasons,
222 blocking_organizations=blocking_organizations,
223 )
225 async def request_deletion( 1a
226 self,
227 session: AsyncSession,
228 user: User,
229 ) -> UserDeletionResponse:
230 """Request deletion of the user account.
232 Flow:
233 1. Check if user has any active organizations -> block if yes
234 2. Soft delete the user
236 Note: The user's Account (payout account) is not deleted here.
237 Accounts are tied to organizations and should be deleted when the
238 organization is deleted, not when the user account is deleted.
239 """
240 check_result = await self.check_can_delete(session, user)
242 if check_result.blocked_reasons:
243 return check_result
245 # Soft delete the user
246 await self.soft_delete_user(session, user)
248 return UserDeletionResponse(
249 deleted=True,
250 blocked_reasons=[],
251 blocking_organizations=[],
252 )
254 async def soft_delete_user( 1a
255 self,
256 session: AsyncSession,
257 user: User,
258 ) -> User:
259 """Soft-delete a user, anonymizing PII fields."""
260 repository = UserRepository.from_session(session)
262 update_dict: dict[str, Any] = {}
264 update_dict["email"] = anonymize_email_for_deletion(user.email)
266 if user.avatar_url:
267 update_dict["avatar_url"] = None
269 if user.meta:
270 update_dict["meta"] = {}
272 await self._delete_oauth_accounts(session, user)
274 user = await repository.update(user, update_dict=update_dict)
275 await repository.soft_delete(user)
277 log.info(
278 "user.deleted",
279 user_id=user.id,
280 )
282 return user
284 async def _delete_oauth_accounts( 1a
285 self,
286 session: AsyncSession,
287 user: User,
288 ) -> None:
289 """Delete all OAuth accounts for a user."""
290 stmt = delete(OAuthAccount).where(OAuthAccount.user_id == user.id)
291 await session.execute(stmt)
293 log.info(
294 "user.oauth_accounts_deleted",
295 user_id=user.id,
296 )
299user = UserService() 1a