Coverage for polar/integrations/google/service.py: 37%
79 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1from typing import TypedDict 1a
3import httpx 1a
4import structlog 1a
5from httpx_oauth.clients.google import GoogleOAuth2 1a
6from httpx_oauth.oauth2 import OAuth2Token 1a
8from polar.config import settings 1a
9from polar.exceptions import PolarError 1a
10from polar.logging import Logger 1a
11from polar.models import OAuthAccount, User 1a
12from polar.models.user import OAuthPlatform 1a
13from polar.postgres import AsyncSession 1a
14from polar.user.oauth_service import oauth_account_service 1a
15from polar.user.repository import UserRepository 1a
16from polar.user.schemas import UserSignupAttribution 1a
17from polar.worker import enqueue_job 1a
19log: Logger = structlog.get_logger() 1a
21google_oauth_client = GoogleOAuth2( 1a
22 settings.GOOGLE_CLIENT_ID, settings.GOOGLE_CLIENT_SECRET
23)
26class GoogleUserProfile(TypedDict): 1a
27 id: str 1a
28 email: str 1a
29 email_verified: bool 1a
30 picture: str | None 1a
33class GoogleServiceError(PolarError): ... 1a
36class CannotLinkUnverifiedEmailError(GoogleServiceError): 1a
37 def __init__(self, email: str) -> None: 1a
38 message = (
39 f"An account already exists on Polar under the email {email}. "
40 "We cannot automatically link it to your Google account since "
41 "this email address is not verified on Google. "
42 "Either verify your email address on Google and try again "
43 "or sign in using your email."
44 )
45 super().__init__(message, 403)
48class AccountLinkedToAnotherUserError(GoogleServiceError): 1a
49 def __init__(self) -> None: 1a
50 message = (
51 "This Google account is already linked to another user on Polar. "
52 "You may have already created another account "
53 "with a different email address."
54 )
55 super().__init__(message, 403)
58class GoogleService: 1a
59 async def get_updated_or_create( 1a
60 self,
61 session: AsyncSession,
62 *,
63 token: OAuth2Token,
64 signup_attribution: UserSignupAttribution | None = None,
65 ) -> tuple[User, bool]:
66 google_profile = await self._get_profile(token["access_token"])
67 user_repository = UserRepository.from_session(session)
68 user = await user_repository.get_by_oauth_account(
69 OAuthPlatform.google, google_profile["id"]
70 )
72 if user is not None:
73 oauth_account = user.get_oauth_account(OAuthPlatform.google)
74 assert oauth_account is not None
75 oauth_account.access_token = token["access_token"]
76 oauth_account.expires_at = token["expires_at"]
77 oauth_account.account_username = google_profile["email"]
78 session.add(oauth_account)
79 return (user, False)
81 oauth_account = OAuthAccount(
82 platform=OAuthPlatform.google,
83 account_id=google_profile["id"],
84 account_email=google_profile["email"],
85 account_username=google_profile["email"],
86 access_token=token["access_token"],
87 expires_at=token["expires_at"],
88 )
90 user = await user_repository.get_by_email(google_profile["email"])
91 if user is not None:
92 if google_profile["email_verified"]:
93 user.oauth_accounts.append(oauth_account)
94 session.add(user)
95 return (user, False)
96 else:
97 raise CannotLinkUnverifiedEmailError(google_profile["email"])
99 user = User(
100 email=google_profile["email"],
101 email_verified=google_profile["email_verified"],
102 avatar_url=google_profile["picture"],
103 oauth_accounts=[oauth_account],
104 signup_attribution=signup_attribution,
105 )
107 session.add(user)
108 await session.flush()
110 enqueue_job("user.on_after_signup", user_id=user.id)
112 return (user, True)
114 async def link_user( 1a
115 self,
116 session: AsyncSession,
117 *,
118 user: User,
119 token: OAuth2Token,
120 ) -> User:
121 google_profile = await self._get_profile(token["access_token"])
123 oauth_account = await oauth_account_service.get_by_platform_and_account_id(
124 session, OAuthPlatform.google, google_profile["id"]
125 )
126 if oauth_account is not None:
127 if oauth_account.user_id != user.id:
128 raise AccountLinkedToAnotherUserError()
129 else:
130 oauth_account = OAuthAccount(
131 platform=OAuthPlatform.google,
132 account_id=google_profile["id"],
133 account_email=google_profile["email"],
134 )
135 user.oauth_accounts.append(oauth_account)
136 log.info(
137 "oauth_account.connect",
138 user_id=user.id,
139 platform="google",
140 account_email=google_profile["email"],
141 )
143 oauth_account.access_token = token["access_token"]
144 oauth_account.expires_at = token["expires_at"]
145 oauth_account.account_username = google_profile["email"]
146 session.add(user)
148 await session.flush()
150 return user
152 async def _get_profile(self, token: str) -> GoogleUserProfile: 1a
153 async with httpx.AsyncClient() as client:
154 response = await client.get(
155 "https://openidconnect.googleapis.com/v1/userinfo",
156 headers={"Authorization": f"Bearer {token}"},
157 )
158 response.raise_for_status()
160 data = response.json()
161 return {
162 "id": data["sub"],
163 "email": data["email"],
164 "email_verified": data["email_verified"],
165 "picture": data.get("picture"),
166 }
169google = GoogleService() 1a