Coverage for polar/auth/service.py: 39%
70 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1from datetime import datetime, timedelta 1ab
2from typing import TypeVar 1ab
4import structlog 1ab
5from fastapi import Request, Response 1ab
6from fastapi.responses import RedirectResponse 1ab
7from sqlalchemy import delete, select 1ab
9from polar.auth.scope import Scope 1ab
10from polar.config import settings 1ab
11from polar.enums import TokenType 1ab
12from polar.kit.crypto import generate_token_hash_pair, get_token_hash 1ab
13from polar.kit.http import get_safe_return_url 1ab
14from polar.kit.utils import utc_now 1ab
15from polar.logging import Logger 1ab
16from polar.models import User, UserSession 1ab
17from polar.postgres import AsyncSession 1ab
19log: Logger = structlog.get_logger() 1ab
21USER_SESSION_TOKEN_PREFIX = "polar_us_" 1ab
23R = TypeVar("R", bound=Response) 1ab
26class AuthService: 1ab
27 async def get_login_response( 1ab
28 self,
29 session: AsyncSession,
30 request: Request,
31 user: User,
32 *,
33 return_to: str | None = None,
34 ) -> RedirectResponse:
35 token, user_session = await self._create_user_session(
36 session=session,
37 user=user,
38 user_agent=request.headers.get("User-Agent", ""),
39 scopes=[Scope.web_read, Scope.web_write],
40 )
42 return_url = get_safe_return_url(return_to)
43 response = RedirectResponse(return_url, 303)
44 response = self._set_user_session_cookie(
45 request, response, token, user_session.expires_at
46 )
47 return response
49 async def get_logout_response( 1ab
50 self, session: AsyncSession, request: Request, user_session: UserSession | None
51 ) -> RedirectResponse:
52 if user_session is not None:
53 await session.delete(user_session)
54 response = RedirectResponse(settings.FRONTEND_BASE_URL)
55 response = self._set_user_session_cookie(request, response, "", 0)
56 return response
58 async def authenticate( 1ab
59 self,
60 session: AsyncSession,
61 request: Request,
62 cookie: str = settings.USER_SESSION_COOKIE_KEY,
63 ) -> UserSession | None:
64 token = request.cookies.get(cookie) 1cd
65 if token is None or not token.isascii(): 65 ↛ 68line 65 didn't jump to line 68 because the condition on line 65 was always true1cd
66 return None 1cd
68 user_session = await self._get_user_session_by_token(session, token)
70 if user_session is None:
71 return None
73 if not user_session.user.can_authenticate:
74 return None
76 return user_session
78 async def delete_expired(self, session: AsyncSession) -> None: 1ab
79 statement = delete(UserSession).where(UserSession.expires_at < utc_now())
80 await session.execute(statement)
82 async def revoke_leaked( 1ab
83 self,
84 session: AsyncSession,
85 token: str,
86 token_type: TokenType,
87 *,
88 notifier: str,
89 url: str | None,
90 ) -> bool:
91 user_session = await self._get_user_session_by_token(session, token)
93 if user_session is None:
94 return False
96 await session.delete(user_session)
98 log.info(
99 "Revoke leaked user session token",
100 id=user_session.id,
101 notifier=notifier,
102 url=url,
103 )
105 return True
107 async def _get_user_session_by_token( 1ab
108 self, session: AsyncSession, token: str, *, expired: bool = False
109 ) -> UserSession | None:
110 token_hash = get_token_hash(token, secret=settings.SECRET)
111 statement = select(UserSession).where(UserSession.token == token_hash)
112 if not expired:
113 statement = statement.where(UserSession.expires_at > utc_now())
114 result = await session.execute(statement)
115 return result.unique().scalar_one_or_none()
117 async def _create_user_session( 1ab
118 self,
119 session: AsyncSession,
120 user: User,
121 *,
122 user_agent: str,
123 scopes: list[Scope],
124 expire_in: timedelta = settings.USER_SESSION_TTL,
125 ) -> tuple[str, UserSession]:
126 token, token_hash = generate_token_hash_pair(
127 secret=settings.SECRET, prefix=USER_SESSION_TOKEN_PREFIX
128 )
129 user_session = UserSession(
130 token=token_hash,
131 user_agent=user_agent,
132 user=user,
133 scopes=scopes,
134 expires_at=utc_now() + expire_in,
135 )
136 session.add(user_session)
137 await session.flush()
139 return token, user_session
141 def _set_user_session_cookie( 1ab
142 self, request: Request, response: R, value: str, expires: int | datetime
143 ) -> R:
144 is_localhost = request.url.hostname in ["127.0.0.1", "localhost"]
145 secure = False if is_localhost else True
146 response.set_cookie(
147 settings.USER_SESSION_COOKIE_KEY,
148 value=value,
149 expires=expires,
150 path="/",
151 domain=settings.USER_SESSION_COOKIE_DOMAIN,
152 secure=secure,
153 httponly=True,
154 samesite="lax",
155 )
156 return response
159auth = AuthService() 1ab