Coverage for polar/integrations/apple/service.py: 36%

82 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from typing import TypedDict 1a

2 

3import jwt 1a

4import structlog 1a

5from httpx_oauth.clients.openid import OpenID 1a

6from httpx_oauth.oauth2 import OAuth2Token 1a

7 

8from polar.config import settings 1a

9from polar.exceptions import PolarError 1a

10from polar.kit.utils import utc_now 1a

11from polar.logging import Logger 1a

12from polar.models import OAuthAccount, User 1a

13from polar.models.user import OAuthPlatform 1a

14from polar.postgres import AsyncSession 1a

15from polar.user.oauth_service import oauth_account_service 1a

16from polar.user.repository import UserRepository 1a

17from polar.user.schemas import UserSignupAttribution 1a

18from polar.worker import enqueue_job 1a

19 

20log: Logger = structlog.get_logger() 1a

21 

22 

23def get_apple_oauth_client(secret: bool = False) -> OpenID: 1a

24 if secret: 

25 iat = int(utc_now().timestamp()) 

26 client_secret = jwt.encode( 

27 { 

28 "iss": settings.APPLE_TEAM_ID, 

29 "aud": "https://appleid.apple.com", 

30 "sub": settings.APPLE_CLIENT_ID, 

31 "iat": iat, 

32 "exp": iat + 3600, 

33 }, 

34 settings.APPLE_KEY_VALUE, 

35 algorithm="ES256", 

36 headers={ 

37 "kid": settings.APPLE_KEY_ID, 

38 }, 

39 ) 

40 else: 

41 client_secret = "" 

42 return OpenID( 

43 settings.APPLE_CLIENT_ID, 

44 client_secret, 

45 "https://appleid.apple.com/.well-known/openid-configuration", 

46 base_scopes=["openid", "email"], 

47 ) 

48 

49 

50jwks_client = jwt.PyJWKClient("https://appleid.apple.com/auth/keys") 1a

51 

52 

53class AppleUserProfile(TypedDict): 1a

54 id: str 1a

55 email: str 1a

56 email_verified: bool 1a

57 

58 

59class AppleServiceError(PolarError): ... 1a

60 

61 

62class CannotLinkUnverifiedEmailError(AppleServiceError): 1a

63 def __init__(self, email: str) -> None: 1a

64 message = ( 

65 f"An account already exists on Polar under the email {email}. " 

66 "We cannot automatically link it to your Apple account since " 

67 "this email address is not verified on Apple. " 

68 "Either verify your email address on Apple and try again " 

69 "or sign in using your email." 

70 ) 

71 super().__init__(message, 403) 

72 

73 

74class AccountLinkedToAnotherUserError(AppleServiceError): 1a

75 def __init__(self) -> None: 1a

76 message = ( 

77 "This Apple account is already linked to another user on Polar. " 

78 "You may have already created another account " 

79 "with a different email address." 

80 ) 

81 super().__init__(message, 403) 

82 

83 

84class AppleService: 1a

85 async def get_updated_or_create( 1a

86 self, 

87 session: AsyncSession, 

88 *, 

89 token: OAuth2Token, 

90 signup_attribution: UserSignupAttribution | None = None, 

91 ) -> tuple[User, bool]: 

92 profile = await self._decode_profile(token["id_token"]) 

93 user_repository = UserRepository.from_session(session) 

94 user = await user_repository.get_by_oauth_account( 

95 OAuthPlatform.apple, profile["id"] 

96 ) 

97 

98 if user is not None: 

99 oauth_account = user.get_oauth_account(OAuthPlatform.apple) 

100 assert oauth_account is not None 

101 oauth_account.access_token = token["access_token"] 

102 oauth_account.expires_at = token["expires_at"] 

103 oauth_account.account_username = profile["email"] 

104 session.add(oauth_account) 

105 return (user, False) 

106 

107 oauth_account = OAuthAccount( 

108 platform=OAuthPlatform.apple, 

109 account_id=profile["id"], 

110 account_email=profile["email"], 

111 account_username=profile["email"], 

112 access_token=token["access_token"], 

113 expires_at=token["expires_at"], 

114 ) 

115 

116 user = await user_repository.get_by_email(profile["email"]) 

117 if user is not None: 

118 if profile["email_verified"]: 

119 user.oauth_accounts.append(oauth_account) 

120 session.add(user) 

121 return (user, False) 

122 else: 

123 raise CannotLinkUnverifiedEmailError(profile["email"]) 

124 

125 user = User( 

126 email=profile["email"], 

127 email_verified=profile["email_verified"], 

128 avatar_url=None, 

129 oauth_accounts=[oauth_account], 

130 signup_attribution=signup_attribution, 

131 ) 

132 

133 session.add(user) 

134 await session.flush() 

135 

136 enqueue_job("user.on_after_signup", user_id=user.id) 

137 

138 return (user, True) 

139 

140 async def link_user( 1a

141 self, 

142 session: AsyncSession, 

143 *, 

144 user: User, 

145 token: OAuth2Token, 

146 ) -> User: 

147 profile = await self._decode_profile(token["id_token"]) 

148 

149 oauth_account = await oauth_account_service.get_by_platform_and_account_id( 

150 session, OAuthPlatform.apple, profile["id"] 

151 ) 

152 if oauth_account is not None: 

153 if oauth_account.user_id != user.id: 

154 raise AccountLinkedToAnotherUserError() 

155 else: 

156 oauth_account = OAuthAccount( 

157 platform=OAuthPlatform.apple, 

158 account_id=profile["id"], 

159 account_email=profile["email"], 

160 ) 

161 user.oauth_accounts.append(oauth_account) 

162 log.info( 

163 "oauth_account.connect", 

164 user_id=user.id, 

165 platform="apple", 

166 account_email=profile["email"], 

167 ) 

168 

169 oauth_account.access_token = token["access_token"] 

170 oauth_account.expires_at = token["expires_at"] 

171 oauth_account.account_username = profile["email"] 

172 session.add(user) 

173 

174 await session.flush() 

175 

176 return user 

177 

178 async def _decode_profile(self, id_token: str) -> AppleUserProfile: 1a

179 id_token_data = jwt.decode( 

180 id_token, 

181 key=jwks_client.get_signing_key_from_jwt(id_token), 

182 algorithms=["RS256"], 

183 audience=settings.APPLE_CLIENT_ID, 

184 issuer="https://appleid.apple.com", 

185 ) 

186 return { 

187 "id": id_token_data["sub"], 

188 "email": id_token_data["email"], 

189 "email_verified": id_token_data["email_verified"], 

190 } 

191 

192 

193apple = AppleService() 1a