Coverage for polar/auth/models.py: 77%

60 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from functools import cached_property 1a

2from typing import Generic, TypeGuard, TypeVar 1a

3 

4from polar.enums import RateLimitGroup 1a

5from polar.models import ( 1a

6 Customer, 

7 CustomerSession, 

8 OAuth2Token, 

9 Organization, 

10 OrganizationAccessToken, 

11 PersonalAccessToken, 

12 User, 

13 UserSession, 

14) 

15 

16from .scope import Scope 1a

17 

18 

19class Anonymous: ... 1a

20 

21 

22Subject = User | Organization | Customer | Anonymous 1a

23SubjectType = type[User] | type[Organization] | type[Customer] | type[Anonymous] 1a

24Session = ( 1a

25 UserSession 

26 | OrganizationAccessToken 

27 | OAuth2Token 

28 | PersonalAccessToken 

29 | CustomerSession 

30) 

31 

32 

33S = TypeVar("S", bound=Subject, covariant=True) 1a

34 

35 

36class AuthSubject(Generic[S]): # noqa: UP046 # Don't use the new syntax as it allows us to force covariant typing 1a

37 subject: S 

38 scopes: set[Scope] 

39 session: Session | None 

40 

41 def __init__(self, subject: S, scopes: set[Scope], session: Session | None) -> None: 1a

42 self.subject = subject 1bc

43 self.scopes = scopes 1bc

44 self.session = session 1bc

45 

46 @cached_property 1a

47 def rate_limit_key(self) -> tuple[str, RateLimitGroup]: 1a

48 return self.rate_limit_user, self.rate_limit_group 

49 

50 @cached_property 1a

51 def rate_limit_user(self) -> str: 1a

52 if isinstance(self.session, OAuth2Token): 52 ↛ 53line 52 didn't jump to line 53 because the condition on line 52 was never true1bc

53 return f"oauth2_client:{self.session.client_id}" 

54 

55 match self.subject: 1bc

56 case User(): 1bc

57 return f"user:{self.subject.id}" 1b

58 case Organization(): 58 ↛ 59line 58 didn't jump to line 59 because the pattern on line 58 never matched1bc

59 return f"organization:{self.subject.id}" 

60 case Customer(): 60 ↛ 61line 60 didn't jump to line 61 because the pattern on line 60 never matched1bc

61 return f"customer:{self.subject.id}" 

62 case Anonymous(): 62 ↛ exitline 62 didn't return from function 'rate_limit_user' because the pattern on line 62 always matched1bc

63 return "anonymous" 1bc

64 

65 @cached_property 1a

66 def rate_limit_group(self) -> RateLimitGroup: 1a

67 if isinstance(self.session, UserSession): 67 ↛ 68line 67 didn't jump to line 68 because the condition on line 67 was never true1bc

68 return RateLimitGroup.web 

69 

70 if isinstance(self.subject, Organization): 70 ↛ 71line 70 didn't jump to line 71 because the condition on line 70 was never true1bc

71 return self.subject.rate_limit_group 

72 

73 if isinstance(self.session, OAuth2Token): 73 ↛ 74line 73 didn't jump to line 74 because the condition on line 73 was never true1bc

74 return self.session.client.rate_limit_group 

75 

76 return RateLimitGroup.default 1bc

77 

78 @cached_property 1a

79 def log_context(self) -> dict[str, str]: 1a

80 baggage: dict[str, str] = { 1bc

81 "subject_type": self.subject.__class__.__name__, 

82 "rate_limit_group": self.rate_limit_group.value, 

83 "rate_limit_user": self.rate_limit_user, 

84 } 

85 if isinstance(self.subject, User | Organization | Customer): 1bc

86 baggage["subject_id"] = str(self.subject.id) 1b

87 

88 if self.session: 1bc

89 baggage["session_type"] = self.session.__class__.__name__ 1b

90 

91 return baggage 1bc

92 

93 

94def is_anonymous[S: Subject]( 1a

95 auth_subject: AuthSubject[S], 

96) -> TypeGuard[AuthSubject[Anonymous]]: 

97 return isinstance(auth_subject.subject, Anonymous) 1b

98 

99 

100def is_user[S: Subject](auth_subject: AuthSubject[S]) -> TypeGuard[AuthSubject[User]]: 1a

101 return isinstance(auth_subject.subject, User) 1bc

102 

103 

104def is_organization[S: Subject]( 1a

105 auth_subject: AuthSubject[S], 

106) -> TypeGuard[AuthSubject[Organization]]: 

107 return isinstance(auth_subject.subject, Organization) 

108 

109 

110def is_customer[S: Subject]( 1a

111 auth_subject: AuthSubject[S], 

112) -> TypeGuard[AuthSubject[Customer]]: 

113 return isinstance(auth_subject.subject, Customer) 

114 

115 

116__all__ = [ 1a

117 "Subject", 

118 "SubjectType", 

119 "AuthSubject", 

120 "is_anonymous", 

121 "is_user", 

122 "is_organization", 

123 # Re-export subject types for convenience 

124 "Anonymous", 

125 "User", 

126 "Organization", 

127 "Customer", 

128]