Coverage for polar/auth/service.py: 39%

70 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from datetime import datetime, timedelta 1ab

2from typing import TypeVar 1ab

3 

4import structlog 1ab

5from fastapi import Request, Response 1ab

6from fastapi.responses import RedirectResponse 1ab

7from sqlalchemy import delete, select 1ab

8 

9from polar.auth.scope import Scope 1ab

10from polar.config import settings 1ab

11from polar.enums import TokenType 1ab

12from polar.kit.crypto import generate_token_hash_pair, get_token_hash 1ab

13from polar.kit.http import get_safe_return_url 1ab

14from polar.kit.utils import utc_now 1ab

15from polar.logging import Logger 1ab

16from polar.models import User, UserSession 1ab

17from polar.postgres import AsyncSession 1ab

18 

19log: Logger = structlog.get_logger() 1ab

20 

21USER_SESSION_TOKEN_PREFIX = "polar_us_" 1ab

22 

23R = TypeVar("R", bound=Response) 1ab

24 

25 

26class AuthService: 1ab

27 async def get_login_response( 1ab

28 self, 

29 session: AsyncSession, 

30 request: Request, 

31 user: User, 

32 *, 

33 return_to: str | None = None, 

34 ) -> RedirectResponse: 

35 token, user_session = await self._create_user_session( 

36 session=session, 

37 user=user, 

38 user_agent=request.headers.get("User-Agent", ""), 

39 scopes=[Scope.web_read, Scope.web_write], 

40 ) 

41 

42 return_url = get_safe_return_url(return_to) 

43 response = RedirectResponse(return_url, 303) 

44 response = self._set_user_session_cookie( 

45 request, response, token, user_session.expires_at 

46 ) 

47 return response 

48 

49 async def get_logout_response( 1ab

50 self, session: AsyncSession, request: Request, user_session: UserSession | None 

51 ) -> RedirectResponse: 

52 if user_session is not None: 

53 await session.delete(user_session) 

54 response = RedirectResponse(settings.FRONTEND_BASE_URL) 

55 response = self._set_user_session_cookie(request, response, "", 0) 

56 return response 

57 

58 async def authenticate( 1ab

59 self, 

60 session: AsyncSession, 

61 request: Request, 

62 cookie: str = settings.USER_SESSION_COOKIE_KEY, 

63 ) -> UserSession | None: 

64 token = request.cookies.get(cookie) 1cd

65 if token is None or not token.isascii(): 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was always true1cd

66 return None 1cd

67 

68 user_session = await self._get_user_session_by_token(session, token) 

69 

70 if user_session is None: 

71 return None 

72 

73 if not user_session.user.can_authenticate: 

74 return None 

75 

76 return user_session 

77 

78 async def delete_expired(self, session: AsyncSession) -> None: 1ab

79 statement = delete(UserSession).where(UserSession.expires_at < utc_now()) 

80 await session.execute(statement) 

81 

82 async def revoke_leaked( 1ab

83 self, 

84 session: AsyncSession, 

85 token: str, 

86 token_type: TokenType, 

87 *, 

88 notifier: str, 

89 url: str | None, 

90 ) -> bool: 

91 user_session = await self._get_user_session_by_token(session, token) 

92 

93 if user_session is None: 

94 return False 

95 

96 await session.delete(user_session) 

97 

98 log.info( 

99 "Revoke leaked user session token", 

100 id=user_session.id, 

101 notifier=notifier, 

102 url=url, 

103 ) 

104 

105 return True 

106 

107 async def _get_user_session_by_token( 1ab

108 self, session: AsyncSession, token: str, *, expired: bool = False 

109 ) -> UserSession | None: 

110 token_hash = get_token_hash(token, secret=settings.SECRET) 

111 statement = select(UserSession).where(UserSession.token == token_hash) 

112 if not expired: 

113 statement = statement.where(UserSession.expires_at > utc_now()) 

114 result = await session.execute(statement) 

115 return result.unique().scalar_one_or_none() 

116 

117 async def _create_user_session( 1ab

118 self, 

119 session: AsyncSession, 

120 user: User, 

121 *, 

122 user_agent: str, 

123 scopes: list[Scope], 

124 expire_in: timedelta = settings.USER_SESSION_TTL, 

125 ) -> tuple[str, UserSession]: 

126 token, token_hash = generate_token_hash_pair( 

127 secret=settings.SECRET, prefix=USER_SESSION_TOKEN_PREFIX 

128 ) 

129 user_session = UserSession( 

130 token=token_hash, 

131 user_agent=user_agent, 

132 user=user, 

133 scopes=scopes, 

134 expires_at=utc_now() + expire_in, 

135 ) 

136 session.add(user_session) 

137 await session.flush() 

138 

139 return token, user_session 

140 

141 def _set_user_session_cookie( 1ab

142 self, request: Request, response: R, value: str, expires: int | datetime 

143 ) -> R: 

144 is_localhost = request.url.hostname in ["127.0.0.1", "localhost"] 

145 secure = False if is_localhost else True 

146 response.set_cookie( 

147 settings.USER_SESSION_COOKIE_KEY, 

148 value=value, 

149 expires=expires, 

150 path="/", 

151 domain=settings.USER_SESSION_COOKIE_DOMAIN, 

152 secure=secure, 

153 httponly=True, 

154 samesite="lax", 

155 ) 

156 return response 

157 

158 

159auth = AuthService() 1ab