Coverage for polar/oauth2/grants/refresh_token.py: 50%

28 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import time 1a

2import typing 1a

3 

4from authlib.oauth2.rfc6749.grants import RefreshTokenGrant as _RefreshTokenGrant 1a

5from sqlalchemy import select 1a

6 

7from polar.config import settings 1a

8from polar.kit.crypto import get_token_hash 1a

9from polar.models import OAuth2Token 1a

10 

11from ..sub_type import SubTypeValue 1a

12 

13if typing.TYPE_CHECKING: 13 ↛ 14line 13 didn't jump to line 14 because the condition on line 13 was never true1a

14 from ..authorization_server import AuthorizationServer 

15 

16 

17class RefreshTokenGrant(_RefreshTokenGrant): 1a

18 server: "AuthorizationServer" 

19 

20 INCLUDE_NEW_REFRESH_TOKEN = True 1a

21 TOKEN_ENDPOINT_AUTH_METHODS = ["client_secret_basic", "client_secret_post", "none"] 1a

22 

23 def authenticate_refresh_token(self, refresh_token: str) -> OAuth2Token | None: 1a

24 refresh_token_hash = get_token_hash(refresh_token, secret=settings.SECRET) 

25 statement = select(OAuth2Token).where( 

26 OAuth2Token.refresh_token == refresh_token_hash 

27 ) 

28 result = self.server.session.execute(statement) 

29 token = result.unique().scalar_one_or_none() 

30 if token is not None and not typing.cast(bool, token.is_revoked()): 

31 return token 

32 return None 

33 

34 def authenticate_user(self, refresh_token: OAuth2Token) -> SubTypeValue | None: 1a

35 return refresh_token.get_sub_type_value() 

36 

37 def revoke_old_credential(self, refresh_token: OAuth2Token) -> None: 1a

38 refresh_token.refresh_token_revoked_at = int(time.time()) # pyright: ignore 

39 self.server.session.add(refresh_token) 

40 self.server.session.flush()