Coverage for polar/user/service.py: 26%

121 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1from typing import Any 1a

2from uuid import UUID 1a

3 

4import stripe as stripe_lib 1a

5import structlog 1a

6from sqlalchemy import delete 1a

7 

8from polar.exceptions import PolarError 1a

9from polar.integrations.stripe.service import stripe as stripe_service 1a

10from polar.kit.anonymization import anonymize_email_for_deletion 1a

11from polar.models import OAuthAccount, User 1a

12from polar.models.user import IdentityVerificationStatus 1a

13from polar.organization.repository import OrganizationRepository 1a

14from polar.postgres import AsyncSession 1a

15from polar.worker import enqueue_job 1a

16 

17from .repository import UserRepository 1a

18from .schemas import ( 1a

19 BlockingOrganization, 

20 UserDeletionBlockedReason, 

21 UserDeletionResponse, 

22 UserIdentityVerification, 

23 UserSignupAttribution, 

24) 

25 

26log = structlog.get_logger() 1a

27 

28 

29class UserError(PolarError): ... 1a

30 

31 

32class IdentityAlreadyVerified(UserError): 1a

33 def __init__(self, user_id: UUID) -> None: 1a

34 self.user_id = user_id 

35 message = "Your identity is already verified." 

36 super().__init__(message, 403) 

37 

38 

39class IdentityVerificationProcessing(UserError): 1a

40 def __init__(self, user_id: UUID) -> None: 1a

41 self.user_id = user_id 

42 message = "Your identity verification is still processing." 

43 super().__init__(message, 403) 

44 

45 

46class IdentityVerificationDoesNotExist(UserError): 1a

47 def __init__(self, identity_verification_id: str) -> None: 1a

48 self.identity_verification_id = identity_verification_id 

49 message = ( 

50 f"Received identity verification {identity_verification_id} from Stripe, " 

51 "but no associated User exists." 

52 ) 

53 super().__init__(message) 

54 

55 

56class InvalidAccount(UserError): 1a

57 def __init__(self, account_id: UUID) -> None: 1a

58 self.account_id = account_id 

59 message = ( 

60 f"The account {account_id} does not exist or you don't have access to it." 

61 ) 

62 super().__init__(message) 

63 

64 

65class UserService: 1a

66 async def get_by_email_or_create( 1a

67 self, 

68 session: AsyncSession, 

69 email: str, 

70 *, 

71 signup_attribution: UserSignupAttribution | None = None, 

72 ) -> tuple[User, bool]: 

73 repository = UserRepository.from_session(session) 

74 user = await repository.get_by_email(email) 

75 created = False 

76 if user is None: 

77 user = await self.create_by_email( 

78 session, email, signup_attribution=signup_attribution 

79 ) 

80 created = True 

81 

82 return (user, created) 

83 

84 async def create_by_email( 1a

85 self, 

86 session: AsyncSession, 

87 email: str, 

88 signup_attribution: UserSignupAttribution | None = None, 

89 ) -> User: 

90 repository = UserRepository.from_session(session) 

91 user = await repository.create( 

92 User( 

93 email=email, 

94 oauth_accounts=[], 

95 signup_attribution=signup_attribution, 

96 ), 

97 flush=True, 

98 ) 

99 enqueue_job("user.on_after_signup", user_id=user.id) 

100 return user 

101 

102 async def create_identity_verification( 1a

103 self, session: AsyncSession, user: User 

104 ) -> UserIdentityVerification: 

105 if user.identity_verified: 

106 raise IdentityAlreadyVerified(user.id) 

107 

108 if user.identity_verification_status == IdentityVerificationStatus.pending: 

109 raise IdentityVerificationProcessing(user.id) 

110 

111 verification_session: stripe_lib.identity.VerificationSession | None = None 

112 if user.identity_verification_id is not None: 

113 verification_session = await stripe_service.get_verification_session( 

114 user.identity_verification_id 

115 ) 

116 

117 if ( 

118 verification_session is None 

119 or verification_session.status != "requires_input" 

120 ): 

121 verification_session = await stripe_service.create_verification_session( 

122 user 

123 ) 

124 

125 repository = UserRepository.from_session(session) 

126 await repository.update( 

127 user, update_dict={"identity_verification_id": verification_session.id} 

128 ) 

129 

130 assert verification_session.client_secret is not None 

131 return UserIdentityVerification( 

132 id=verification_session.id, client_secret=verification_session.client_secret 

133 ) 

134 

135 async def identity_verification_verified( 1a

136 self, 

137 session: AsyncSession, 

138 verification_session: stripe_lib.identity.VerificationSession, 

139 ) -> User: 

140 repository = UserRepository.from_session(session) 

141 user = await repository.get_by_identity_verification_id(verification_session.id) 

142 if user is None: 

143 raise IdentityVerificationDoesNotExist(verification_session.id) 

144 

145 assert verification_session.status == "verified" 

146 return await repository.update( 

147 user, 

148 update_dict={ 

149 "identity_verification_status": IdentityVerificationStatus.verified 

150 }, 

151 ) 

152 

153 async def identity_verification_pending( 1a

154 self, 

155 session: AsyncSession, 

156 verification_session: stripe_lib.identity.VerificationSession, 

157 ) -> User: 

158 repository = UserRepository.from_session(session) 

159 user = await repository.get_by_identity_verification_id(verification_session.id) 

160 if user is None: 

161 raise IdentityVerificationDoesNotExist(verification_session.id) 

162 

163 # If the user is already verified, we don't need to update their status. 

164 # Might happen if the webhook was delayed 

165 if user.identity_verified: 

166 return user 

167 

168 assert verification_session.status == "processing" 

169 return await repository.update( 

170 user, 

171 update_dict={ 

172 "identity_verification_status": IdentityVerificationStatus.pending 

173 }, 

174 ) 

175 

176 async def identity_verification_failed( 1a

177 self, 

178 session: AsyncSession, 

179 verification_session: stripe_lib.identity.VerificationSession, 

180 ) -> User: 

181 repository = UserRepository.from_session(session) 

182 user = await repository.get_by_identity_verification_id(verification_session.id) 

183 if user is None: 

184 raise IdentityVerificationDoesNotExist(verification_session.id) 

185 

186 # TODO: should we send an email? 

187 

188 return await repository.update( 

189 user, 

190 update_dict={ 

191 "identity_verification_status": IdentityVerificationStatus.failed 

192 }, 

193 ) 

194 

195 async def check_can_delete( 1a

196 self, 

197 session: AsyncSession, 

198 user: User, 

199 ) -> UserDeletionResponse: 

200 """Check if a user can be deleted. 

201 

202 A user can be deleted if all organizations they are members of 

203 are soft-deleted (deleted_at is not None). 

204 """ 

205 blocked_reasons: list[UserDeletionBlockedReason] = [] 

206 blocking_organizations: list[BlockingOrganization] = [] 

207 

208 # Get all organizations the user is a member of (excluding deleted orgs) 

209 org_repository = OrganizationRepository.from_session(session) 

210 organizations = await org_repository.get_all_by_user(user.id) 

211 

212 if organizations: 

213 blocked_reasons.append(UserDeletionBlockedReason.HAS_ACTIVE_ORGANIZATIONS) 

214 for org in organizations: 

215 blocking_organizations.append( 

216 BlockingOrganization(id=org.id, slug=org.slug, name=org.name) 

217 ) 

218 

219 return UserDeletionResponse( 

220 deleted=False, 

221 blocked_reasons=blocked_reasons, 

222 blocking_organizations=blocking_organizations, 

223 ) 

224 

225 async def request_deletion( 1a

226 self, 

227 session: AsyncSession, 

228 user: User, 

229 ) -> UserDeletionResponse: 

230 """Request deletion of the user account. 

231 

232 Flow: 

233 1. Check if user has any active organizations -> block if yes 

234 2. Soft delete the user 

235 

236 Note: The user's Account (payout account) is not deleted here. 

237 Accounts are tied to organizations and should be deleted when the 

238 organization is deleted, not when the user account is deleted. 

239 """ 

240 check_result = await self.check_can_delete(session, user) 

241 

242 if check_result.blocked_reasons: 

243 return check_result 

244 

245 # Soft delete the user 

246 await self.soft_delete_user(session, user) 

247 

248 return UserDeletionResponse( 

249 deleted=True, 

250 blocked_reasons=[], 

251 blocking_organizations=[], 

252 ) 

253 

254 async def soft_delete_user( 1a

255 self, 

256 session: AsyncSession, 

257 user: User, 

258 ) -> User: 

259 """Soft-delete a user, anonymizing PII fields.""" 

260 repository = UserRepository.from_session(session) 

261 

262 update_dict: dict[str, Any] = {} 

263 

264 update_dict["email"] = anonymize_email_for_deletion(user.email) 

265 

266 if user.avatar_url: 

267 update_dict["avatar_url"] = None 

268 

269 if user.meta: 

270 update_dict["meta"] = {} 

271 

272 await self._delete_oauth_accounts(session, user) 

273 

274 user = await repository.update(user, update_dict=update_dict) 

275 await repository.soft_delete(user) 

276 

277 log.info( 

278 "user.deleted", 

279 user_id=user.id, 

280 ) 

281 

282 return user 

283 

284 async def _delete_oauth_accounts( 1a

285 self, 

286 session: AsyncSession, 

287 user: User, 

288 ) -> None: 

289 """Delete all OAuth accounts for a user.""" 

290 stmt = delete(OAuthAccount).where(OAuthAccount.user_id == user.id) 

291 await session.execute(stmt) 

292 

293 log.info( 

294 "user.oauth_accounts_deleted", 

295 user_id=user.id, 

296 ) 

297 

298 

299user = UserService() 1a