Coverage for polar/integrations/github/service/secret_scanning.py: 51%

99 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import base64 1a

2import binascii 1a

3from typing import Annotated, Any, Literal, Protocol, TypedDict 1a

4 

5from cryptography.exceptions import InvalidSignature as CryptographyInvalidSignature 1a

6from cryptography.hazmat.primitives import hashes 1a

7from cryptography.hazmat.primitives.asymmetric import ec 1a

8from cryptography.hazmat.primitives.serialization import load_pem_public_key 1a

9from fastapi.exceptions import RequestValidationError 1a

10from pydantic import BeforeValidator, TypeAdapter, ValidationError 1a

11 

12from polar.auth.service import auth as auth_service 1a

13from polar.customer_session.service import customer_session as customer_session_service 1a

14from polar.enums import TokenType 1a

15from polar.exceptions import PolarError 1a

16from polar.kit.schemas import Schema 1a

17from polar.oauth2.service.oauth2_authorization_code import ( 1a

18 oauth2_authorization_code as oauth2_authorization_code_service, 

19) 

20from polar.oauth2.service.oauth2_client import oauth2_client as oauth2_client_service 1a

21from polar.oauth2.service.oauth2_token import oauth2_token as oauth2_token_service 1a

22from polar.organization_access_token.service import ( 1a

23 organization_access_token as organization_access_token_service, 

24) 

25from polar.personal_access_token.service import ( 1a

26 personal_access_token as personal_access_token_service, 

27) 

28from polar.postgres import AsyncSession 1a

29 

30from ..client import GitHub 1a

31 

32 

33class GitHubSecretScanningPublicKey(TypedDict): 1a

34 key_identifier: str 1a

35 key: str 1a

36 is_current: bool 1a

37 

38 

39class GitHubSecretScanningPublicKeyList(TypedDict): 1a

40 public_keys: list[GitHubSecretScanningPublicKey] 1a

41 

42 

43def _normalize_token_type(value: Any | None) -> Any | None: 1a

44 if isinstance(value, str): 

45 return value.lower() 

46 return value 

47 

48 

49class GitHubSecretScanningToken(Schema): 1a

50 token: str 1a

51 type: Annotated[TokenType, BeforeValidator(_normalize_token_type)] 1a

52 url: str | None = None 1a

53 source: str 1a

54 

55 

56GitHubSecretScanningTokenListAdapter = TypeAdapter(list[GitHubSecretScanningToken]) 1a

57 

58 

59class GitHubSecretScanningTokenResult(TypedDict): 1a

60 token_raw: str 1a

61 token_type: TokenType 1a

62 label: Literal["true_positive", "false_positive"] 1a

63 

64 

65class RevokedLeakedProtocol(Protocol): 1a

66 async def revoke_leaked( 66 ↛ exitline 66 didn't return from function 'revoke_leaked' because 1a

67 self, 

68 session: AsyncSession, 

69 token: str, 

70 token_type: TokenType, 

71 *, 

72 notifier: str, 

73 url: str | None, 

74 ) -> bool: ... 

75 

76 

77TOKEN_TYPE_SERVICE_MAP: dict[TokenType, RevokedLeakedProtocol] = { 1a

78 TokenType.client_secret: oauth2_client_service, 

79 TokenType.client_registration_token: oauth2_client_service, 

80 TokenType.authorization_code: oauth2_authorization_code_service, 

81 TokenType.access_token: oauth2_token_service, 

82 TokenType.refresh_token: oauth2_token_service, 

83 TokenType.personal_access_token: personal_access_token_service, 

84 TokenType.organization_access_token: organization_access_token_service, 

85 TokenType.customer_session_token: customer_session_service, 

86 TokenType.user_session_token: auth_service, 

87} 

88 

89 

90class GitHubSecretScanningError(PolarError): ... 1a

91 

92 

93class PublicKeyNotFound(GitHubSecretScanningError): 1a

94 def __init__(self, key_identifier: str) -> None: 1a

95 self.key_identifier = key_identifier 

96 message = f"Public key with key_identifier {key_identifier} not found." 

97 super().__init__(message, 400) 

98 

99 

100class InvalidPublicKey(GitHubSecretScanningError): 1a

101 def __init__(self, key_identifier: str, public_key: str) -> None: 1a

102 self.key_identifier = key_identifier 

103 self.public_key = public_key 

104 message = f"Public key with key_identifier {key_identifier} is invalid." 

105 super().__init__(message) 

106 

107 

108class InvalidSignature(GitHubSecretScanningError): 1a

109 def __init__(self, payload: str, signature: str, key_identifier: str) -> None: 1a

110 self.payload = payload 

111 self.signature = signature 

112 self.key_identifier = key_identifier 

113 message = "Invalid signature." 

114 super().__init__(message, status_code=403) 

115 

116 

117class GitHubSecretScanningService: 1a

118 async def verify_signature( 1a

119 self, payload: str, signature: str, key_identifier: str 

120 ) -> bool: 

121 raw_public_key = await self._get_public_key(key_identifier) 

122 public_key = load_pem_public_key(raw_public_key.encode()) 

123 if not isinstance(public_key, ec.EllipticCurvePublicKey): 

124 raise InvalidPublicKey(key_identifier, raw_public_key) 

125 

126 try: 

127 signature_bytes = base64.b64decode(signature) 

128 public_key.verify( 

129 signature_bytes, payload.encode(), ec.ECDSA(hashes.SHA256()) 

130 ) 

131 return True 

132 except (binascii.Error, CryptographyInvalidSignature) as e: 

133 raise InvalidSignature(payload, signature, key_identifier) from e 

134 

135 def validate_payload(self, payload: str) -> list[GitHubSecretScanningToken]: 1a

136 try: 

137 return GitHubSecretScanningTokenListAdapter.validate_json(payload) 

138 except ValidationError as e: 

139 raise RequestValidationError(e.errors(), body=payload) 

140 

141 async def handle_alert( 1a

142 self, session: AsyncSession, data: list[GitHubSecretScanningToken] 

143 ) -> list[GitHubSecretScanningTokenResult]: 

144 results = [] 

145 for match in data: 

146 result = await self._check_token(session, match) 

147 results.append(result) 

148 return results 

149 

150 async def _check_token( 1a

151 self, session: AsyncSession, match: GitHubSecretScanningToken 

152 ) -> GitHubSecretScanningTokenResult: 

153 service = TOKEN_TYPE_SERVICE_MAP[match.type] 

154 

155 leaked = await service.revoke_leaked( 

156 session, match.token, match.type, notifier="github", url=match.url 

157 ) 

158 

159 return { 

160 "token_raw": match.token, 

161 "token_type": match.type, 

162 "label": "true_positive" if leaked else "false_positive", 

163 } 

164 

165 async def _get_public_key(self, key_identifier: str) -> str: 1a

166 client = GitHub() 

167 response = await client.arequest("GET", "/meta/public_keys/secret_scanning") 

168 

169 data: GitHubSecretScanningPublicKeyList = response.json() 

170 for public_key in data["public_keys"]: 

171 if public_key["key_identifier"] == key_identifier: 

172 return public_key["key"] 

173 

174 raise PublicKeyNotFound(key_identifier) 

175 

176 

177secret_scanning = GitHubSecretScanningService() 1a