Coverage for polar/integrations/google/service.py: 37%

79 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from typing import TypedDict 1a

2 

3import httpx 1a

4import structlog 1a

5from httpx_oauth.clients.google import GoogleOAuth2 1a

6from httpx_oauth.oauth2 import OAuth2Token 1a

7 

8from polar.config import settings 1a

9from polar.exceptions import PolarError 1a

10from polar.logging import Logger 1a

11from polar.models import OAuthAccount, User 1a

12from polar.models.user import OAuthPlatform 1a

13from polar.postgres import AsyncSession 1a

14from polar.user.oauth_service import oauth_account_service 1a

15from polar.user.repository import UserRepository 1a

16from polar.user.schemas import UserSignupAttribution 1a

17from polar.worker import enqueue_job 1a

18 

19log: Logger = structlog.get_logger() 1a

20 

21google_oauth_client = GoogleOAuth2( 1a

22 settings.GOOGLE_CLIENT_ID, settings.GOOGLE_CLIENT_SECRET 

23) 

24 

25 

26class GoogleUserProfile(TypedDict): 1a

27 id: str 1a

28 email: str 1a

29 email_verified: bool 1a

30 picture: str | None 1a

31 

32 

33class GoogleServiceError(PolarError): ... 1a

34 

35 

36class CannotLinkUnverifiedEmailError(GoogleServiceError): 1a

37 def __init__(self, email: str) -> None: 1a

38 message = ( 

39 f"An account already exists on Polar under the email {email}. " 

40 "We cannot automatically link it to your Google account since " 

41 "this email address is not verified on Google. " 

42 "Either verify your email address on Google and try again " 

43 "or sign in using your email." 

44 ) 

45 super().__init__(message, 403) 

46 

47 

48class AccountLinkedToAnotherUserError(GoogleServiceError): 1a

49 def __init__(self) -> None: 1a

50 message = ( 

51 "This Google account is already linked to another user on Polar. " 

52 "You may have already created another account " 

53 "with a different email address." 

54 ) 

55 super().__init__(message, 403) 

56 

57 

58class GoogleService: 1a

59 async def get_updated_or_create( 1a

60 self, 

61 session: AsyncSession, 

62 *, 

63 token: OAuth2Token, 

64 signup_attribution: UserSignupAttribution | None = None, 

65 ) -> tuple[User, bool]: 

66 google_profile = await self._get_profile(token["access_token"]) 

67 user_repository = UserRepository.from_session(session) 

68 user = await user_repository.get_by_oauth_account( 

69 OAuthPlatform.google, google_profile["id"] 

70 ) 

71 

72 if user is not None: 

73 oauth_account = user.get_oauth_account(OAuthPlatform.google) 

74 assert oauth_account is not None 

75 oauth_account.access_token = token["access_token"] 

76 oauth_account.expires_at = token["expires_at"] 

77 oauth_account.account_username = google_profile["email"] 

78 session.add(oauth_account) 

79 return (user, False) 

80 

81 oauth_account = OAuthAccount( 

82 platform=OAuthPlatform.google, 

83 account_id=google_profile["id"], 

84 account_email=google_profile["email"], 

85 account_username=google_profile["email"], 

86 access_token=token["access_token"], 

87 expires_at=token["expires_at"], 

88 ) 

89 

90 user = await user_repository.get_by_email(google_profile["email"]) 

91 if user is not None: 

92 if google_profile["email_verified"]: 

93 user.oauth_accounts.append(oauth_account) 

94 session.add(user) 

95 return (user, False) 

96 else: 

97 raise CannotLinkUnverifiedEmailError(google_profile["email"]) 

98 

99 user = User( 

100 email=google_profile["email"], 

101 email_verified=google_profile["email_verified"], 

102 avatar_url=google_profile["picture"], 

103 oauth_accounts=[oauth_account], 

104 signup_attribution=signup_attribution, 

105 ) 

106 

107 session.add(user) 

108 await session.flush() 

109 

110 enqueue_job("user.on_after_signup", user_id=user.id) 

111 

112 return (user, True) 

113 

114 async def link_user( 1a

115 self, 

116 session: AsyncSession, 

117 *, 

118 user: User, 

119 token: OAuth2Token, 

120 ) -> User: 

121 google_profile = await self._get_profile(token["access_token"]) 

122 

123 oauth_account = await oauth_account_service.get_by_platform_and_account_id( 

124 session, OAuthPlatform.google, google_profile["id"] 

125 ) 

126 if oauth_account is not None: 

127 if oauth_account.user_id != user.id: 

128 raise AccountLinkedToAnotherUserError() 

129 else: 

130 oauth_account = OAuthAccount( 

131 platform=OAuthPlatform.google, 

132 account_id=google_profile["id"], 

133 account_email=google_profile["email"], 

134 ) 

135 user.oauth_accounts.append(oauth_account) 

136 log.info( 

137 "oauth_account.connect", 

138 user_id=user.id, 

139 platform="google", 

140 account_email=google_profile["email"], 

141 ) 

142 

143 oauth_account.access_token = token["access_token"] 

144 oauth_account.expires_at = token["expires_at"] 

145 oauth_account.account_username = google_profile["email"] 

146 session.add(user) 

147 

148 await session.flush() 

149 

150 return user 

151 

152 async def _get_profile(self, token: str) -> GoogleUserProfile: 1a

153 async with httpx.AsyncClient() as client: 

154 response = await client.get( 

155 "https://openidconnect.googleapis.com/v1/userinfo", 

156 headers={"Authorization": f"Bearer {token}"}, 

157 ) 

158 response.raise_for_status() 

159 

160 data = response.json() 

161 return { 

162 "id": data["sub"], 

163 "email": data["email"], 

164 "email_verified": data["email_verified"], 

165 "picture": data.get("picture"), 

166 } 

167 

168 

169google = GoogleService() 1a