Coverage for polar/personal_access_token/service.py: 60%
56 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1from collections.abc import Sequence 1a
2from datetime import datetime 1a
3from uuid import UUID 1a
5import structlog 1a
6from sqlalchemy import Select, or_, select, update 1a
7from sqlalchemy.orm import contains_eager 1a
9from polar.auth.models import AuthSubject 1a
10from polar.config import settings 1a
11from polar.email.react import render_email_template 1a
12from polar.email.schemas import ( 1a
13 PersonalAccessTokenLeakedEmail,
14 PersonalAccessTokenLeakedProps,
15)
16from polar.email.sender import enqueue_email 1a
17from polar.enums import TokenType 1a
18from polar.kit.crypto import get_token_hash 1a
19from polar.kit.pagination import PaginationParams, paginate 1a
20from polar.kit.services import ResourceServiceReader 1a
21from polar.kit.utils import utc_now 1a
22from polar.logging import Logger 1a
23from polar.models import PersonalAccessToken, User 1a
24from polar.postgres import AsyncSession 1a
26log: Logger = structlog.get_logger() 1a
28TOKEN_PREFIX = "polar_pat_" 1a
31class PersonalAccessTokenService(ResourceServiceReader[PersonalAccessToken]): 1a
32 async def list( 1a
33 self,
34 session: AsyncSession,
35 auth_subject: AuthSubject[User],
36 *,
37 pagination: PaginationParams,
38 ) -> tuple[Sequence[PersonalAccessToken], int]:
39 statement = self._get_readable_order_statement(auth_subject)
40 return await paginate(session, statement, pagination=pagination)
42 async def get_by_id( 1a
43 self, session: AsyncSession, auth_subject: AuthSubject[User], id: UUID
44 ) -> PersonalAccessToken | None:
45 statement = self._get_readable_order_statement(auth_subject).where(
46 PersonalAccessToken.id == id,
47 PersonalAccessToken.deleted_at.is_(None),
48 )
49 result = await session.execute(statement)
50 return result.scalar_one_or_none()
52 async def get_by_token( 1a
53 self, session: AsyncSession, token: str, *, expired: bool = False
54 ) -> PersonalAccessToken | None:
55 token_hash = get_token_hash(token, secret=settings.SECRET) 1b
56 statement = ( 1b
57 select(PersonalAccessToken)
58 .join(PersonalAccessToken.user)
59 .where(
60 PersonalAccessToken.token == token_hash,
61 PersonalAccessToken.deleted_at.is_(None),
62 User.can_authenticate.is_(True),
63 )
64 .options(contains_eager(PersonalAccessToken.user))
65 )
66 if not expired: 66 ↛ 74line 66 didn't jump to line 74 because the condition on line 66 was always true1b
67 statement = statement.where( 1b
68 or_(
69 PersonalAccessToken.expires_at.is_(None),
70 PersonalAccessToken.expires_at > utc_now(),
71 )
72 )
74 result = await session.execute(statement) 1b
75 return result.unique().scalar_one_or_none()
77 async def delete( 1a
78 self, session: AsyncSession, personal_access_token: PersonalAccessToken
79 ) -> None:
80 personal_access_token.set_deleted_at()
81 session.add(personal_access_token)
83 async def record_usage( 1a
84 self, session: AsyncSession, id: UUID, last_used_at: datetime
85 ) -> None:
86 statement = (
87 update(PersonalAccessToken)
88 .where(PersonalAccessToken.id == id)
89 .values(last_used_at=last_used_at)
90 )
91 await session.execute(statement)
93 async def revoke_leaked( 1a
94 self,
95 session: AsyncSession,
96 token: str,
97 token_type: TokenType,
98 *,
99 notifier: str,
100 url: str | None = None,
101 ) -> bool:
102 personal_access_token = await self.get_by_token(session, token)
104 if personal_access_token is None:
105 return False
107 personal_access_token.set_deleted_at()
108 session.add(personal_access_token)
110 email = personal_access_token.user.email
112 body = render_email_template(
113 PersonalAccessTokenLeakedEmail(
114 props=PersonalAccessTokenLeakedProps(
115 email=email,
116 personal_access_token=personal_access_token.comment,
117 notifier=notifier,
118 url=url or "",
119 )
120 )
121 )
123 enqueue_email(
124 to_email_addr=email,
125 subject="Security Notice - Your Polar Personal Access Token has been leaked",
126 html_content=body,
127 )
129 log.info(
130 "Revoke leaked personal access token",
131 id=personal_access_token.id,
132 notifier=notifier,
133 url=url,
134 )
136 return True
138 def _get_readable_order_statement( 1a
139 self, auth_subject: AuthSubject[User]
140 ) -> Select[tuple[PersonalAccessToken]]:
141 return select(PersonalAccessToken).where(
142 PersonalAccessToken.user_id == auth_subject.subject.id,
143 PersonalAccessToken.deleted_at.is_(None),
144 )
147personal_access_token = PersonalAccessTokenService(PersonalAccessToken) 1a