Coverage for polar/benefit/strategies/github_repository/service.py: 16%
120 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import contextlib 1a
2from collections.abc import AsyncIterator 1a
3from typing import TYPE_CHECKING, Any, cast 1a
5import structlog 1a
6from githubkit.exception import ( 1a
7 RateLimitExceeded,
8 RequestError,
9 RequestFailed,
10 RequestTimeout,
11)
13from polar.auth.models import AuthSubject, is_organization, is_user 1a
14from polar.integrations.github import client as github 1a
15from polar.integrations.github_repository_benefit.service import ( 1a
16 github_repository_benefit_user_service,
17)
18from polar.logging import Logger 1a
19from polar.models import Benefit, Customer, Organization, User 1a
20from polar.models.customer import CustomerOAuthPlatform 1a
22from ..base.service import ( 1a
23 BenefitActionRequiredError,
24 BenefitPropertiesValidationError,
25 BenefitRetriableError,
26 BenefitServiceProtocol,
27)
28from .properties import ( 1a
29 BenefitGitHubRepositoryProperties,
30 BenefitGrantGitHubRepositoryProperties,
31)
33if TYPE_CHECKING: 33 ↛ 34line 33 didn't jump to line 34 because the condition on line 33 was never true1a
34 from githubkit import AppInstallationAuthStrategy, GitHub
35 from githubkit.versions.latest.models import RepositoryInvitation
38log: Logger = structlog.get_logger() 1a
41class BenefitGitHubRepositoryService( 1a
42 BenefitServiceProtocol[
43 BenefitGitHubRepositoryProperties, BenefitGrantGitHubRepositoryProperties
44 ]
45):
46 async def grant( 1a
47 self,
48 benefit: Benefit,
49 customer: Customer,
50 grant_properties: BenefitGrantGitHubRepositoryProperties,
51 *,
52 update: bool = False,
53 attempt: int = 1,
54 ) -> BenefitGrantGitHubRepositoryProperties:
55 bound_logger = log.bind(
56 benefit_id=str(benefit.id),
57 user_id=str(customer.id),
58 )
59 bound_logger.debug("Grant benefit")
61 async with self._get_github_app_client(benefit) as client:
62 properties = self._get_properties(benefit)
63 repository_owner = properties["repository_owner"]
64 repository_name = properties["repository_name"]
65 permission = properties["permission"]
67 # If we already granted this benefit, make sure we revoke the previous config
68 if update and grant_properties:
69 bound_logger.debug("Grant benefit update")
70 previous_repository_owner = grant_properties.get("repository_owner")
71 previous_repository_name = grant_properties.get("repository_name")
72 previous_permission = grant_properties.get("permission")
73 granted_account_id = grant_properties.get("granted_account_id")
74 # The repository, the permission or the account changed: revoke first
75 if (
76 (
77 previous_repository_owner is not None
78 and repository_owner != previous_repository_owner
79 )
80 or (
81 previous_repository_name is not None
82 and repository_name != previous_repository_name
83 )
84 or (
85 previous_permission is not None
86 and permission != previous_permission
87 )
88 or (
89 granted_account_id is not None
90 and grant_properties.get("account_id") != granted_account_id
91 )
92 ):
93 bound_logger.debug(
94 "Revoke before granting because repository, permission or account changed"
95 )
96 await self.revoke(
97 benefit, customer, grant_properties, attempt=attempt
98 )
100 if (account_id := grant_properties.get("account_id")) is None:
101 raise BenefitActionRequiredError(
102 "The customer needs to connect their GitHub account"
103 )
105 oauth_account = customer.get_oauth_account(
106 account_id, CustomerOAuthPlatform.github
107 )
109 if oauth_account is None or oauth_account.account_username is None:
110 raise BenefitActionRequiredError(
111 "The customer needs to connect their GitHub account"
112 )
114 try:
115 await client.rest.repos.async_add_collaborator(
116 owner=repository_owner,
117 repo=repository_name,
118 username=oauth_account.account_username,
119 data={"permission": permission},
120 )
121 except RateLimitExceeded as e:
122 raise BenefitRetriableError(int(e.retry_after.total_seconds())) from e
123 except RequestFailed as e:
124 if e.response.is_client_error:
125 raise
126 raise BenefitRetriableError() from e
127 except (RequestTimeout, RequestError) as e:
128 raise BenefitRetriableError() from e
130 bound_logger.debug("Benefit granted")
132 # Store repository, permission and account ID to compare on update
133 return {
134 **grant_properties,
135 "repository_owner": repository_owner,
136 "repository_name": repository_name,
137 "permission": permission,
138 "granted_account_id": account_id,
139 }
141 async def cycle( 1a
142 self,
143 benefit: Benefit,
144 customer: Customer,
145 grant_properties: BenefitGrantGitHubRepositoryProperties,
146 *,
147 attempt: int = 1,
148 ) -> BenefitGrantGitHubRepositoryProperties:
149 return grant_properties
151 async def revoke( 1a
152 self,
153 benefit: Benefit,
154 customer: Customer,
155 grant_properties: BenefitGrantGitHubRepositoryProperties,
156 *,
157 attempt: int = 1,
158 ) -> BenefitGrantGitHubRepositoryProperties:
159 bound_logger = log.bind(
160 benefit_id=str(benefit.id),
161 customer_id=str(customer.id),
162 )
164 async with self._get_github_app_client(benefit) as client:
165 properties = self._get_properties(benefit)
166 repository_owner = properties["repository_owner"]
167 repository_name = properties["repository_name"]
169 if (account_id := grant_properties.get("granted_account_id")) is None:
170 raise BenefitActionRequiredError(
171 "The benefit was never granted to the customer"
172 )
174 oauth_account = customer.get_oauth_account(
175 account_id, CustomerOAuthPlatform.github
176 )
178 if oauth_account is None or oauth_account.account_username is None:
179 raise BenefitActionRequiredError(
180 "The customer needs to connect their GitHub account"
181 )
183 invitation = await self._get_invitation(
184 client,
185 repository_owner=repository_owner,
186 repository_name=repository_name,
187 user_id=int(oauth_account.account_id),
188 )
189 if invitation is not None:
190 bound_logger.debug("Invitation not yet accepted, removing it")
191 revoke_request = client.rest.repos.async_delete_invitation(
192 repository_owner, repository_name, invitation.id
193 )
194 else:
195 bound_logger.debug("Invitation not found, removing the user")
196 revoke_request = client.rest.repos.async_remove_collaborator(
197 repository_owner, repository_name, oauth_account.account_username
198 )
200 try:
201 await revoke_request
202 except RateLimitExceeded as e:
203 raise BenefitRetriableError(int(e.retry_after.total_seconds())) from e
204 except RequestFailed as e:
205 if e.response.is_client_error:
206 raise
207 raise BenefitRetriableError() from e
208 except (RequestTimeout, RequestError) as e:
209 raise BenefitRetriableError() from e
211 bound_logger.debug("Benefit revoked")
213 # Keep account_id in case we need to re-grant later
214 return {
215 "account_id": grant_properties.get("account_id"),
216 }
218 async def requires_update( 1a
219 self, benefit: Benefit, previous_properties: BenefitGitHubRepositoryProperties
220 ) -> bool:
221 new_properties = self._get_properties(benefit)
222 return (
223 new_properties["repository_owner"]
224 != previous_properties["repository_owner"]
225 or new_properties["repository_name"]
226 != previous_properties["repository_name"]
227 or new_properties["permission"] != previous_properties["permission"]
228 )
230 async def validate_properties( 1a
231 self, auth_subject: AuthSubject[User | Organization], properties: dict[str, Any]
232 ) -> BenefitGitHubRepositoryProperties:
233 if is_organization(auth_subject):
234 # TODO: Support organization tokens
235 raise BenefitPropertiesValidationError(
236 [
237 {
238 "type": "invalid_auth_subject",
239 "msg": (
240 "We do not yet support creating this benefit "
241 "with an organization token."
242 ),
243 "loc": ("repository_owner",),
244 "input": properties["repository_owner"],
245 }
246 ]
247 )
249 assert is_user(auth_subject)
250 user = auth_subject.subject
252 repository_owner = properties["repository_owner"]
253 repository_name = properties["repository_name"]
255 oauth = await github_repository_benefit_user_service.get_oauth_account(
256 self.session, user
257 )
258 if not oauth:
259 raise BenefitPropertiesValidationError(
260 [
261 {
262 "type": "invalid_user_auth",
263 "msg": "You have not authenticated with the github_repository_benefit app",
264 "loc": ("repository_owner",),
265 "input": repository_owner, # ?
266 }
267 ]
268 )
270 # check that use has access to the app installed on this repository
271 has_access = (
272 await github_repository_benefit_user_service.user_has_access_to_repository(
273 oauth, owner=repository_owner, name=repository_name
274 )
275 )
277 if not has_access:
278 raise BenefitPropertiesValidationError(
279 [
280 {
281 "type": "no_repository_acccess",
282 "msg": "You don't have access to this repository.",
283 "loc": ("repository_name",),
284 "input": repository_name,
285 }
286 ]
287 )
289 installation = (
290 await github_repository_benefit_user_service.get_repository_installation(
291 owner=repository_owner, name=repository_name
292 )
293 )
294 if not installation:
295 raise BenefitPropertiesValidationError(
296 [
297 {
298 "type": "no_repository_installation_found",
299 "msg": "Could not find a installation for this repository.",
300 "loc": ("repository_name",),
301 "input": repository_name,
302 }
303 ]
304 )
306 plan = await github_repository_benefit_user_service.get_billing_plan(
307 self.redis, oauth, installation
308 )
309 if not plan or plan.is_personal:
310 raise BenefitPropertiesValidationError(
311 [
312 {
313 "type": "personal_organization_repository",
314 "msg": "For security reasons, "
315 "repositories on personal organizations are not supported.",
316 "loc": ("repository_name",),
317 "input": repository_name,
318 }
319 ]
320 )
322 return cast(
323 BenefitGitHubRepositoryProperties,
324 {
325 **properties,
326 },
327 )
329 async def _get_invitation( 1a
330 self,
331 client: "GitHub[Any]",
332 *,
333 repository_owner: str,
334 repository_name: str,
335 user_id: int,
336 ) -> "RepositoryInvitation | None":
337 async for invitation in client.paginate(
338 client.rest.repos.async_list_invitations,
339 owner=repository_owner,
340 repo=repository_name,
341 ):
342 if invitation.invitee and invitation.invitee.id == user_id:
343 return invitation
345 return None
347 @contextlib.asynccontextmanager 1a
348 async def _get_github_app_client( 1a
349 self, benefit: Benefit
350 ) -> AsyncIterator["GitHub[AppInstallationAuthStrategy]"]:
351 properties = self._get_properties(benefit)
352 repository_owner = properties["repository_owner"]
353 repository_name = properties["repository_name"]
354 installation = (
355 await github_repository_benefit_user_service.get_repository_installation(
356 owner=repository_owner, name=repository_name
357 )
358 )
359 assert installation is not None
360 async with github.get_app_installation_client(installation.id) as client:
361 yield client