Coverage for polar/integrations/github_repository_benefit/service.py: 21%

159 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from typing import TYPE_CHECKING 1a

2 

3import structlog 1a

4from githubkit.exception import GitHubException 1a

5from httpx_oauth.clients.github import GitHubOAuth2 1a

6from httpx_oauth.oauth2 import OAuth2Token, RefreshTokenError 1a

7from sqlalchemy.exc import IntegrityError 1a

8 

9import polar.integrations.github.client as github 1a

10from polar.config import settings 1a

11from polar.exceptions import PolarError, ResourceAlreadyExists 1a

12from polar.integrations.github.service.user import github_user as github_user_service 1a

13from polar.integrations.github_repository_benefit.schemas import ( 1a

14 GitHubInvitesBenefitOrganization, 

15 GitHubInvitesBenefitRepository, 

16) 

17from polar.logging import Logger 1a

18from polar.models import OAuthAccount, User 1a

19from polar.models.user import OAuthPlatform 1a

20from polar.postgres import AsyncSession 1a

21from polar.redis import Redis 1a

22 

23from .types import SimpleUser 1a

24 

25if TYPE_CHECKING: 25 ↛ 26line 25 didn't jump to line 26 because the condition on line 25 was never true1a

26 from . import types 

27 

28log: Logger = structlog.get_logger() 1a

29 

30github_oauth_client = GitHubOAuth2( 1a

31 settings.GITHUB_REPOSITORY_BENEFITS_CLIENT_ID, 

32 settings.GITHUB_REPOSITORY_BENEFITS_CLIENT_SECRET, 

33) 

34 

35 

36class GitHubError(PolarError): ... 1a

37 

38 

39class GitHubRepositoryBenefitAccountNotConnected(GitHubError): 1a

40 def __init__(self, user: User) -> None: 1a

41 self.user = user 

42 message = "You don't have a GitHubRepositoryBenefit account connected." 

43 super().__init__(message) 

44 

45 

46class GitHubRepositoryBenefitExpiredAccessToken(GitHubError): 1a

47 def __init__(self, user: User) -> None: 1a

48 self.user = user 

49 message = "The access token is expired and no refresh token is available." 

50 super().__init__(message, 401) 

51 

52 

53class GitHubRepositoryRefreshTokenError(GitHubError): 1a

54 def __init__(self) -> None: 1a

55 message = ( 

56 "An error occurred while refreshing the access token. " 

57 "Please reconnect your account." 

58 ) 

59 super().__init__(message, 401) 

60 

61 

62class GitHubRepositoryBenefitNoAccess(GitHubError): 1a

63 def __init__(self) -> None: 1a

64 message = ( 

65 "The user does not have access to this resource, or it's not bee installed" 

66 ) 

67 super().__init__(message, 401) 

68 

69 

70class GitHubRepositoryBenefitUserService: 1a

71 async def create_oauth_account( 1a

72 self, session: AsyncSession, user: User, oauth2_token_data: OAuth2Token 

73 ) -> OAuthAccount: 

74 access_token = oauth2_token_data["access_token"] 

75 

76 client = github.get_client(access_token=access_token) 

77 user_data = await client.rest.users.async_get_authenticated() 

78 github.ensure_expected_response(user_data) 

79 

80 account_id = user_data.parsed_data.id 

81 account_username = user_data.parsed_data.login 

82 

83 ( 

84 account_email, 

85 email_is_verified, 

86 ) = await github_user_service.fetch_authenticated_user_primary_email( 

87 client=client 

88 ) 

89 

90 oauth_account = OAuthAccount( 

91 platform=OAuthPlatform.github_repository_benefit, 

92 access_token=access_token, 

93 expires_at=oauth2_token_data["expires_at"], 

94 refresh_token=oauth2_token_data["refresh_token"], 

95 account_id=str(account_id), 

96 account_email=account_email, 

97 account_username=account_username, 

98 user=user, 

99 ) 

100 

101 nested = await session.begin_nested() 

102 try: 

103 session.add(oauth_account) 

104 await nested.commit() 

105 await session.flush() 

106 except IntegrityError as e: 

107 await nested.rollback() 

108 raise ResourceAlreadyExists() from e 

109 

110 return oauth_account 

111 

112 async def update_oauth_account( 1a

113 self, session: AsyncSession, user: User, oauth2_token_data: OAuth2Token 

114 ) -> OAuthAccount: 

115 account = user.get_oauth_account(OAuthPlatform.github_repository_benefit) 

116 if account is None: 

117 raise GitHubRepositoryBenefitAccountNotConnected(user) 

118 

119 account.access_token = oauth2_token_data["access_token"] 

120 account.expires_at = oauth2_token_data["expires_at"] 

121 account.refresh_token = oauth2_token_data["refresh_token"] 

122 

123 client = github.get_client(access_token=account.access_token) 

124 user_data = await client.rest.users.async_get_authenticated() 

125 github.ensure_expected_response(user_data) 

126 

127 ( 

128 account_email, 

129 _, 

130 ) = await github_user_service.fetch_authenticated_user_primary_email( 

131 client=client 

132 ) 

133 

134 account.account_email = account_email 

135 account.account_username = user_data.parsed_data.login 

136 

137 session.add(account) 

138 

139 return account 

140 

141 async def get_oauth_account( 1a

142 self, session: AsyncSession, user: User 

143 ) -> OAuthAccount: 

144 account = user.get_oauth_account(OAuthPlatform.github_repository_benefit) 

145 if account is None: 

146 raise GitHubRepositoryBenefitAccountNotConnected(user) 

147 

148 if account.is_access_token_expired(): 

149 if account.refresh_token is None: 

150 raise GitHubRepositoryBenefitExpiredAccessToken(user) 

151 

152 try: 

153 refreshed_token_data = await github_oauth_client.refresh_token( 

154 account.refresh_token 

155 ) 

156 except RefreshTokenError as e: 

157 raise GitHubRepositoryRefreshTokenError() from e 

158 

159 account.access_token = refreshed_token_data["access_token"] 

160 account.expires_at = refreshed_token_data["expires_at"] 

161 account.refresh_token = refreshed_token_data["refresh_token"] 

162 session.add(account) 

163 await session.flush() 

164 

165 log.info( 

166 "github.auth.refresh.succeeded", 

167 user_id=account.user_id, 

168 platform=account.platform, 

169 ) 

170 

171 return account 

172 

173 async def list_user_installations( 1a

174 self, oauth: OAuthAccount 

175 ) -> list["types.Installation"]: 

176 client = github.get_client(access_token=oauth.access_token) 

177 

178 def map_installations_func( 

179 r: github.Response["types.UserInstallationsGetResponse200"], 

180 ) -> list["types.Installation"]: 

181 return r.parsed_data.installations 

182 

183 installations: list[types.Installation] = [] 

184 async for install in client.paginate( 

185 client.rest.apps.async_list_installations_for_authenticated_user, 

186 map_func=map_installations_func, 

187 ): 

188 installations.append(install) 

189 

190 return installations 

191 

192 async def list_orgs_with_billing_plans( 1a

193 self, 

194 redis: Redis, 

195 oauth: OAuthAccount, 

196 installations: list["types.Installation"], 

197 ) -> list[GitHubInvitesBenefitOrganization]: 

198 res: list[GitHubInvitesBenefitOrganization] = [] 

199 

200 for i in installations: 

201 if b := await self.get_billing_plan(redis, oauth, i): 

202 res.append(b) 

203 

204 return res 

205 

206 async def get_billing_plan( 1a

207 self, redis: Redis, oauth: OAuthAccount, installation: "types.Installation" 

208 ) -> GitHubInvitesBenefitOrganization | None: 

209 if installation.account is None: 

210 return None 

211 if not isinstance(installation.account, SimpleUser): 

212 return None 

213 

214 plan: ( 

215 types.PublicUserPropPlan 

216 | types.PrivateUserPropPlan 

217 | types.OrganizationFullPropPlan 

218 | None 

219 ) = None 

220 

221 if installation.target_type == "User": 

222 user_client = github.get_client(access_token=oauth.access_token) 

223 user_response = await user_client.rest.users.async_get_authenticated() 

224 if user_response.parsed_data and user_response.parsed_data.plan: 

225 plan = user_response.parsed_data.plan 

226 

227 elif installation.target_type == "Organization": 

228 try: 

229 org_client = github.get_app_installation_client(installation.id) 

230 org_response = await org_client.rest.orgs.async_get( 

231 installation.account.login 

232 ) 

233 if ( 

234 org_response 

235 and org_response.parsed_data 

236 and org_response.parsed_data.plan 

237 ): 

238 plan = org_response.parsed_data.plan 

239 except GitHubException as e: 

240 log.error( 

241 "failed to get github org plan", 

242 installation_id=installation.id, 

243 organization=installation.account.login, 

244 error_type=type(e).__name__, 

245 error_message=str(e), 

246 exc_info=True, 

247 ) 

248 except Exception as e: 

249 log.error( 

250 "unexpected error getting github org plan", 

251 installation_id=installation.id, 

252 organization=installation.account.login, 

253 error_type=type(e).__name__, 

254 error_message=str(e), 

255 exc_info=True, 

256 ) 

257 

258 plan_name = plan.name if plan else "" 

259 

260 return GitHubInvitesBenefitOrganization( 

261 name=installation.account.login, 

262 is_personal=installation.target_type == "User", 

263 plan_name=plan_name, 

264 is_free=plan_name.lower() == "free", 

265 ) 

266 

267 async def list_repositories( 1a

268 self, 

269 oauth: OAuthAccount, 

270 installations: list["types.Installation"], 

271 ) -> list[GitHubInvitesBenefitRepository]: 

272 client = github.get_client(access_token=oauth.access_token) 

273 

274 """ 

275 Load user accessible installations from GitHub API 

276 Finds the union between app installations and the users user-to-server token. 

277 """ 

278 

279 res: list[GitHubInvitesBenefitRepository] = [] 

280 

281 def map_repos_func( 

282 r: github.Response[ 

283 "types.UserInstallationsInstallationIdRepositoriesGetResponse200" 

284 ], 

285 ) -> list["types.Repository"]: 

286 return r.parsed_data.repositories 

287 

288 # get repos 

289 for install in installations: 

290 if install.account is None: 

291 continue 

292 if not isinstance(install.account, SimpleUser): 

293 continue 

294 

295 async for repo in client.paginate( 

296 client.rest.apps.async_list_installation_repos_for_authenticated_user, 

297 map_func=map_repos_func, 

298 installation_id=install.id, 

299 ): 

300 res.append( 

301 GitHubInvitesBenefitRepository( 

302 repository_owner=install.account.login, 

303 repository_name=repo.name, 

304 ) 

305 ) 

306 

307 return res 

308 

309 async def get_repository_installation( 1a

310 self, *, owner: str, name: str 

311 ) -> "types.Installation | None": 

312 with github.get_app_client() as app_client: 

313 repo_install = await app_client.rest.apps.async_get_repo_installation( 

314 owner, name 

315 ) 

316 if repo_install.status_code == 200: 

317 return repo_install.parsed_data 

318 return None 

319 

320 async def user_has_access_to_repository( 1a

321 self, oauth: OAuthAccount, *, owner: str, name: str 

322 ) -> bool: 

323 installation = await self.get_repository_installation(owner=owner, name=name) 

324 if not installation: 

325 raise GitHubRepositoryBenefitNoAccess() 

326 

327 all_user_installations = await self.list_user_installations(oauth) 

328 

329 all_installation_ids = [i.id for i in all_user_installations] 

330 

331 if installation.id in all_installation_ids: 

332 return True 

333 

334 return False 

335 

336 

337github_repository_benefit_user_service = GitHubRepositoryBenefitUserService() 1a