Coverage for polar/oauth2/grants/refresh_token.py: 50%
28 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import time 1a
2import typing 1a
4from authlib.oauth2.rfc6749.grants import RefreshTokenGrant as _RefreshTokenGrant 1a
5from sqlalchemy import select 1a
7from polar.config import settings 1a
8from polar.kit.crypto import get_token_hash 1a
9from polar.models import OAuth2Token 1a
11from ..sub_type import SubTypeValue 1a
13if typing.TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a
14 from ..authorization_server import AuthorizationServer
17class RefreshTokenGrant(_RefreshTokenGrant): 1a
18 server: "AuthorizationServer"
20 INCLUDE_NEW_REFRESH_TOKEN = True 1a
21 TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_basic", "client_secret_post", "none"] 1a
23 def authenticate_refresh_token(self, refresh_token: str) -> OAuth2Token | None: 1a
24 refresh_token_hash = get_token_hash(refresh_token, secret=settings.SECRET)
25 statement = select(OAuth2Token).where(
26 OAuth2Token.refresh_token == refresh_token_hash
27 )
28 result = self.server.session.execute(statement)
29 token = result.unique().scalar_one_or_none()
30 if token is not None and not typing.cast(bool, token.is_revoked()):
31 return token
32 return None
34 def authenticate_user(self, refresh_token: OAuth2Token) -> SubTypeValue | None: 1a
35 return refresh_token.get_sub_type_value()
37 def revoke_old_credential(self, refresh_token: OAuth2Token) -> None: 1a
38 refresh_token.refresh_token_revoked_at = int(time.time()) # pyright: ignore
39 self.server.session.add(refresh_token)
40 self.server.session.flush()