Coverage for polar/personal_access_token/service.py: 60%

56 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from collections.abc import Sequence 1a

2from datetime import datetime 1a

3from uuid import UUID 1a

4 

5import structlog 1a

6from sqlalchemy import Select, or_, select, update 1a

7from sqlalchemy.orm import contains_eager 1a

8 

9from polar.auth.models import AuthSubject 1a

10from polar.config import settings 1a

11from polar.email.react import render_email_template 1a

12from polar.email.schemas import ( 1a

13 PersonalAccessTokenLeakedEmail, 

14 PersonalAccessTokenLeakedProps, 

15) 

16from polar.email.sender import enqueue_email 1a

17from polar.enums import TokenType 1a

18from polar.kit.crypto import get_token_hash 1a

19from polar.kit.pagination import PaginationParams, paginate 1a

20from polar.kit.services import ResourceServiceReader 1a

21from polar.kit.utils import utc_now 1a

22from polar.logging import Logger 1a

23from polar.models import PersonalAccessToken, User 1a

24from polar.postgres import AsyncSession 1a

25 

26log: Logger = structlog.get_logger() 1a

27 

28TOKEN_PREFIX = "polar_pat_" 1a

29 

30 

31class PersonalAccessTokenService(ResourceServiceReader[PersonalAccessToken]): 1a

32 async def list( 1a

33 self, 

34 session: AsyncSession, 

35 auth_subject: AuthSubject[User], 

36 *, 

37 pagination: PaginationParams, 

38 ) -> tuple[Sequence[PersonalAccessToken], int]: 

39 statement = self._get_readable_order_statement(auth_subject) 

40 return await paginate(session, statement, pagination=pagination) 

41 

42 async def get_by_id( 1a

43 self, session: AsyncSession, auth_subject: AuthSubject[User], id: UUID 

44 ) -> PersonalAccessToken | None: 

45 statement = self._get_readable_order_statement(auth_subject).where( 

46 PersonalAccessToken.id == id, 

47 PersonalAccessToken.deleted_at.is_(None), 

48 ) 

49 result = await session.execute(statement) 

50 return result.scalar_one_or_none() 

51 

52 async def get_by_token( 1a

53 self, session: AsyncSession, token: str, *, expired: bool = False 

54 ) -> PersonalAccessToken | None: 

55 token_hash = get_token_hash(token, secret=settings.SECRET) 1b

56 statement = ( 1b

57 select(PersonalAccessToken) 

58 .join(PersonalAccessToken.user) 

59 .where( 

60 PersonalAccessToken.token == token_hash, 

61 PersonalAccessToken.deleted_at.is_(None), 

62 User.can_authenticate.is_(True), 

63 ) 

64 .options(contains_eager(PersonalAccessToken.user)) 

65 ) 

66 if not expired: 66 ↛ 74line 66 didn't jump to line 74 because the condition on line 66 was always true1b

67 statement = statement.where( 1b

68 or_( 

69 PersonalAccessToken.expires_at.is_(None), 

70 PersonalAccessToken.expires_at > utc_now(), 

71 ) 

72 ) 

73 

74 result = await session.execute(statement) 1b

75 return result.unique().scalar_one_or_none() 

76 

77 async def delete( 1a

78 self, session: AsyncSession, personal_access_token: PersonalAccessToken 

79 ) -> None: 

80 personal_access_token.set_deleted_at() 

81 session.add(personal_access_token) 

82 

83 async def record_usage( 1a

84 self, session: AsyncSession, id: UUID, last_used_at: datetime 

85 ) -> None: 

86 statement = ( 

87 update(PersonalAccessToken) 

88 .where(PersonalAccessToken.id == id) 

89 .values(last_used_at=last_used_at) 

90 ) 

91 await session.execute(statement) 

92 

93 async def revoke_leaked( 1a

94 self, 

95 session: AsyncSession, 

96 token: str, 

97 token_type: TokenType, 

98 *, 

99 notifier: str, 

100 url: str | None = None, 

101 ) -> bool: 

102 personal_access_token = await self.get_by_token(session, token) 

103 

104 if personal_access_token is None: 

105 return False 

106 

107 personal_access_token.set_deleted_at() 

108 session.add(personal_access_token) 

109 

110 email = personal_access_token.user.email 

111 

112 body = render_email_template( 

113 PersonalAccessTokenLeakedEmail( 

114 props=PersonalAccessTokenLeakedProps( 

115 email=email, 

116 personal_access_token=personal_access_token.comment, 

117 notifier=notifier, 

118 url=url or "", 

119 ) 

120 ) 

121 ) 

122 

123 enqueue_email( 

124 to_email_addr=email, 

125 subject="Security Notice - Your Polar Personal Access Token has been leaked", 

126 html_content=body, 

127 ) 

128 

129 log.info( 

130 "Revoke leaked personal access token", 

131 id=personal_access_token.id, 

132 notifier=notifier, 

133 url=url, 

134 ) 

135 

136 return True 

137 

138 def _get_readable_order_statement( 1a

139 self, auth_subject: AuthSubject[User] 

140 ) -> Select[tuple[PersonalAccessToken]]: 

141 return select(PersonalAccessToken).where( 

142 PersonalAccessToken.user_id == auth_subject.subject.id, 

143 PersonalAccessToken.deleted_at.is_(None), 

144 ) 

145 

146 

147personal_access_token = PersonalAccessTokenService(PersonalAccessToken) 1a