Coverage for polar/integrations/github/endpoints.py: 37%

80 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from typing import Any 1a

2from uuid import UUID 1a

3 

4from fastapi import Depends, Header, Request 1a

5from fastapi.responses import JSONResponse, RedirectResponse 1a

6from httpx_oauth.clients.github import GitHubOAuth2 1a

7from httpx_oauth.integrations.fastapi import OAuth2AuthorizeCallback 1a

8from httpx_oauth.oauth2 import OAuth2Token 1a

9 

10from polar.auth.dependencies import WebUserOrAnonymous 1a

11from polar.auth.models import is_user 1a

12from polar.auth.service import auth as auth_service 1a

13from polar.config import settings 1a

14from polar.exceptions import NotPermitted, PolarRedirectionError 1a

15from polar.integrations.loops.service import loops as loops_service 1a

16from polar.kit import jwt 1a

17from polar.kit.http import ReturnTo 1a

18from polar.openapi import APITag 1a

19from polar.postgres import AsyncSession, get_db_session 1a

20from polar.posthog import posthog 1a

21from polar.routing import APIRouter 1a

22from polar.user.schemas import UserSignupAttribution, UserSignupAttributionQuery 1a

23 

24from .service.secret_scanning import secret_scanning as secret_scanning_service 1a

25from .service.user import GithubUserServiceError, github_user 1a

26 

27router = APIRouter( 1a

28 prefix="/integrations/github", tags=["integrations_github", APITag.private] 

29) 

30 

31 

32github_oauth_client = GitHubOAuth2( 1a

33 settings.GITHUB_CLIENT_ID, settings.GITHUB_CLIENT_SECRET 

34) 

35oauth2_authorize_callback = OAuth2AuthorizeCallback( 1a

36 github_oauth_client, route_name="integrations.github.callback" 

37) 

38 

39 

40class OAuthCallbackError(PolarRedirectionError): ... 1a

41 

42 

43class NotPermittedOrganizationBillingPlan(NotPermitted): 1a

44 def __init__(self) -> None: 1a

45 message = "Organization billing plan not accessible." 

46 super().__init__(message) 

47 

48 

49@router.get("/authorize", name="integrations.github.authorize") 1a

50async def github_authorize( 1a

51 request: Request, 

52 auth_subject: WebUserOrAnonymous, 

53 return_to: ReturnTo, 

54 signup_attribution: UserSignupAttributionQuery, 

55 payment_intent_id: str | None = None, 

56) -> RedirectResponse: 

57 state: dict[str, Any] = {} 

58 if payment_intent_id: 

59 state["payment_intent_id"] = payment_intent_id 

60 

61 state["return_to"] = return_to 

62 

63 if signup_attribution: 

64 state["signup_attribution"] = signup_attribution.model_dump(exclude_unset=True) 

65 

66 if is_user(auth_subject): 

67 state["user_id"] = str(auth_subject.subject.id) 

68 

69 encoded_state = jwt.encode(data=state, secret=settings.SECRET, type="github_oauth") 

70 authorization_url = await github_oauth_client.get_authorization_url( 

71 redirect_uri=str(request.url_for("integrations.github.callback")), 

72 state=encoded_state, 

73 scope=["user", "user:email"], 

74 ) 

75 return RedirectResponse(authorization_url, 303) 

76 

77 

78@router.get("/callback", name="integrations.github.callback") 1a

79async def github_callback( 1a

80 request: Request, 

81 auth_subject: WebUserOrAnonymous, 

82 session: AsyncSession = Depends(get_db_session), 

83 access_token_state: tuple[OAuth2Token, str | None] = Depends( 

84 oauth2_authorize_callback 

85 ), 

86) -> RedirectResponse: 

87 token_data, state = access_token_state 

88 error_description = token_data.get("error_description") 

89 if error_description: 

90 raise OAuthCallbackError(error_description) 

91 if not state: 

92 raise OAuthCallbackError("No state") 

93 

94 try: 

95 state_data = jwt.decode( 

96 token=state, secret=settings.SECRET, type="github_oauth" 

97 ) 

98 except jwt.DecodeError as e: 

99 raise OAuthCallbackError("Invalid state") from e 

100 

101 return_to = state_data.get("return_to", None) 

102 state_user_id = state_data.get("user_id") 

103 

104 state_signup_attribution = state_data.get("signup_attribution") 

105 if state_signup_attribution: 

106 state_signup_attribution = UserSignupAttribution.model_validate( 

107 state_signup_attribution 

108 ) 

109 

110 try: 

111 if ( 

112 is_user(auth_subject) 

113 and state_user_id is not None 

114 and auth_subject.subject.id == UUID(state_user_id) 

115 ): 

116 is_signup = False 

117 user = await github_user.link_user( 

118 session, user=auth_subject.subject, token=token_data 

119 ) 

120 else: 

121 user, is_signup = await github_user.get_updated_or_create( 

122 session, 

123 token=token_data, 

124 signup_attribution=state_signup_attribution, 

125 ) 

126 

127 except GithubUserServiceError as e: 

128 raise OAuthCallbackError(e.message, e.status_code, return_to=return_to) from e 

129 

130 # Event tracking last to ensure business critical data is stored first 

131 if is_signup: 

132 posthog.user_signup(user, "github") 

133 await loops_service.user_signup(user, githubLogin=True) 

134 else: 

135 posthog.user_login(user, "github") 

136 await loops_service.user_update(session, user, githubLogin=True) 

137 

138 return await auth_service.get_login_response( 

139 session, request, user, return_to=return_to 

140 ) 

141 

142 

143@router.post("/secret-scanning", include_in_schema=False) 1a

144async def secret_scanning( 1ab

145 request: Request, 

146 github_public_key_identifier: str = Header(), 

147 github_public_key_signature: str = Header(), 

148 session: AsyncSession = Depends(get_db_session), 

149) -> JSONResponse: 

150 payload = (await request.body()).decode() 

151 await secret_scanning_service.verify_signature( 

152 payload, github_public_key_signature, github_public_key_identifier 

153 ) 

154 

155 data = secret_scanning_service.validate_payload(payload) 

156 

157 response_data = await secret_scanning_service.handle_alert(session, data) 

158 return JSONResponse(content=response_data)