Coverage for polar/integrations/github/service/secret_scanning.py: 51%
99 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import base64 1a
2import binascii 1a
3from typing import Annotated, Any, Literal, Protocol, TypedDict 1a
5from cryptography.exceptions import InvalidSignature as CryptographyInvalidSignature 1a
6from cryptography.hazmat.primitives import hashes 1a
7from cryptography.hazmat.primitives.asymmetric import ec 1a
8from cryptography.hazmat.primitives.serialization import load_pem_public_key 1a
9from fastapi.exceptions import RequestValidationError 1a
10from pydantic import BeforeValidator, TypeAdapter, ValidationError 1a
12from polar.auth.service import auth as auth_service 1a
13from polar.customer_session.service import customer_session as customer_session_service 1a
14from polar.enums import TokenType 1a
15from polar.exceptions import PolarError 1a
16from polar.kit.schemas import Schema 1a
17from polar.oauth2.service.oauth2_authorization_code import ( 1a
18 oauth2_authorization_code as oauth2_authorization_code_service,
19)
20from polar.oauth2.service.oauth2_client import oauth2_client as oauth2_client_service 1a
21from polar.oauth2.service.oauth2_token import oauth2_token as oauth2_token_service 1a
22from polar.organization_access_token.service import ( 1a
23 organization_access_token as organization_access_token_service,
24)
25from polar.personal_access_token.service import ( 1a
26 personal_access_token as personal_access_token_service,
27)
28from polar.postgres import AsyncSession 1a
30from ..client import GitHub 1a
33class GitHubSecretScanningPublicKey(TypedDict): 1a
34 key_identifier: str 1a
35 key: str 1a
36 is_current: bool 1a
39class GitHubSecretScanningPublicKeyList(TypedDict): 1a
40 public_keys: list[GitHubSecretScanningPublicKey] 1a
43def _normalize_token_type(value: Any | None) -> Any | None: 1a
44 if isinstance(value, str):
45 return value.lower()
46 return value
49class GitHubSecretScanningToken(Schema): 1a
50 token: str 1a
51 type: Annotated[TokenType, BeforeValidator(_normalize_token_type)] 1a
52 url: str | None = None 1a
53 source: str 1a
56GitHubSecretScanningTokenListAdapter = TypeAdapter(list[GitHubSecretScanningToken]) 1a
59class GitHubSecretScanningTokenResult(TypedDict): 1a
60 token_raw: str 1a
61 token_type: TokenType 1a
62 label: Literal["true_positive", "false_positive"] 1a
65class RevokedLeakedProtocol(Protocol): 1a
66 async def revoke_leaked( 66 ↛ exitline 66 didn't return from function 'revoke_leaked' because 1a
67 self,
68 session: AsyncSession,
69 token: str,
70 token_type: TokenType,
71 *,
72 notifier: str,
73 url: str | None,
74 ) -> bool: ...
77TOKEN_TYPE_SERVICE_MAP: dict[TokenType, RevokedLeakedProtocol] = { 1a
78 TokenType.client_secret: oauth2_client_service,
79 TokenType.client_registration_token: oauth2_client_service,
80 TokenType.authorization_code: oauth2_authorization_code_service,
81 TokenType.access_token: oauth2_token_service,
82 TokenType.refresh_token: oauth2_token_service,
83 TokenType.personal_access_token: personal_access_token_service,
84 TokenType.organization_access_token: organization_access_token_service,
85 TokenType.customer_session_token: customer_session_service,
86 TokenType.user_session_token: auth_service,
87}
90class GitHubSecretScanningError(PolarError): ... 1a
93class PublicKeyNotFound(GitHubSecretScanningError): 1a
94 def __init__(self, key_identifier: str) -> None: 1a
95 self.key_identifier = key_identifier
96 message = f"Public key with key_identifier {key_identifier} not found."
97 super().__init__(message, 400)
100class InvalidPublicKey(GitHubSecretScanningError): 1a
101 def __init__(self, key_identifier: str, public_key: str) -> None: 1a
102 self.key_identifier = key_identifier
103 self.public_key = public_key
104 message = f"Public key with key_identifier {key_identifier} is invalid."
105 super().__init__(message)
108class InvalidSignature(GitHubSecretScanningError): 1a
109 def __init__(self, payload: str, signature: str, key_identifier: str) -> None: 1a
110 self.payload = payload
111 self.signature = signature
112 self.key_identifier = key_identifier
113 message = "Invalid signature."
114 super().__init__(message, status_code=403)
117class GitHubSecretScanningService: 1a
118 async def verify_signature( 1a
119 self, payload: str, signature: str, key_identifier: str
120 ) -> bool:
121 raw_public_key = await self._get_public_key(key_identifier)
122 public_key = load_pem_public_key(raw_public_key.encode())
123 if not isinstance(public_key, ec.EllipticCurvePublicKey):
124 raise InvalidPublicKey(key_identifier, raw_public_key)
126 try:
127 signature_bytes = base64.b64decode(signature)
128 public_key.verify(
129 signature_bytes, payload.encode(), ec.ECDSA(hashes.SHA256())
130 )
131 return True
132 except (binascii.Error, CryptographyInvalidSignature) as e:
133 raise InvalidSignature(payload, signature, key_identifier) from e
135 def validate_payload(self, payload: str) -> list[GitHubSecretScanningToken]: 1a
136 try:
137 return GitHubSecretScanningTokenListAdapter.validate_json(payload)
138 except ValidationError as e:
139 raise RequestValidationError(e.errors(), body=payload)
141 async def handle_alert( 1a
142 self, session: AsyncSession, data: list[GitHubSecretScanningToken]
143 ) -> list[GitHubSecretScanningTokenResult]:
144 results = []
145 for match in data:
146 result = await self._check_token(session, match)
147 results.append(result)
148 return results
150 async def _check_token( 1a
151 self, session: AsyncSession, match: GitHubSecretScanningToken
152 ) -> GitHubSecretScanningTokenResult:
153 service = TOKEN_TYPE_SERVICE_MAP[match.type]
155 leaked = await service.revoke_leaked(
156 session, match.token, match.type, notifier="github", url=match.url
157 )
159 return {
160 "token_raw": match.token,
161 "token_type": match.type,
162 "label": "true_positive" if leaked else "false_positive",
163 }
165 async def _get_public_key(self, key_identifier: str) -> str: 1a
166 client = GitHub()
167 response = await client.arequest("GET", "/meta/public_keys/secret_scanning")
169 data: GitHubSecretScanningPublicKeyList = response.json()
170 for public_key in data["public_keys"]:
171 if public_key["key_identifier"] == key_identifier:
172 return public_key["key"]
174 raise PublicKeyNotFound(key_identifier)
177secret_scanning = GitHubSecretScanningService() 1a