Coverage for polar/auth/dependencies.py: 91%

68 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from collections.abc import Awaitable, Callable 1a

2from inspect import Parameter, Signature 1a

3from typing import Annotated, Any 1a

4 

5from fastapi import Depends, Request, Security 1a

6from fastapi.security import HTTPBearer, OpenIdConnect 1a

7from makefun import with_signature 1a

8 

9from polar.auth.scope import RESERVED_SCOPES, Scope 1a

10from polar.exceptions import Unauthorized 1a

11from polar.oauth2.exceptions import InsufficientScopeError 1a

12 

13from .models import ( 1a

14 Anonymous, 

15 AuthSubject, 

16 Customer, 

17 Organization, 

18 Subject, 

19 SubjectType, 

20 User, 

21 is_anonymous, 

22) 

23 

24oidc_scheme = OpenIdConnect( 1a

25 scheme_name="oidc", 

26 openIdConnectUrl="/.well-known/openid-configuration", 

27 auto_error=False, 

28) 

29oat_scheme = HTTPBearer( 1a

30 scheme_name="oat", 

31 auto_error=False, 

32 description="You can generate an **Organization Access Token** from your organization's settings.", 

33) 

34pat_scheme = HTTPBearer( 1a

35 scheme_name="pat", 

36 auto_error=False, 

37 description="You can generate a **Personal Access Token** from your [settings](https://polar.sh/settings).", 

38) 

39customer_session_scheme = HTTPBearer( 1a

40 scheme_name="customer_session", 

41 auto_error=False, 

42 description=( 

43 "Customer session tokens are specific tokens " 

44 "that are used to authenticate customers on your organization. " 

45 "You can create those sessions programmatically using the " 

46 "[Create Customer Session endpoint](/api-reference/customer-portal/sessions/create)." 

47 ), 

48) 

49 

50 

51_auth_subject_factory_cache: dict[ 1a

52 frozenset[SubjectType], Callable[..., Awaitable[AuthSubject[Subject]]] 

53] = {} 

54 

55 

56def _get_auth_subject_factory( 1a

57 allowed_subjects: frozenset[SubjectType], 

58) -> Callable[..., Awaitable[AuthSubject[Subject]]]: 

59 if allowed_subjects in _auth_subject_factory_cache: 1a

60 return _auth_subject_factory_cache[allowed_subjects] 1a

61 

62 parameters: list[Parameter] = [ 1a

63 Parameter( 

64 name="request", 

65 kind=Parameter.POSITIONAL_OR_KEYWORD, 

66 annotation=Request, 

67 ) 

68 ] 

69 if User in allowed_subjects or Organization in allowed_subjects: 1a

70 parameters += [ 1a

71 Parameter( 

72 name="oauth2_credentials", 

73 kind=Parameter.KEYWORD_ONLY, 

74 default=Depends(oidc_scheme), 

75 ) 

76 ] 

77 if User in allowed_subjects: 1a

78 parameters += [ 1a

79 Parameter( 

80 name="personal_access_token_credentials", 

81 kind=Parameter.KEYWORD_ONLY, 

82 default=Depends(pat_scheme), 

83 ), 

84 ] 

85 if Organization in allowed_subjects: 1a

86 parameters += [ 1a

87 Parameter( 

88 name="organization_access_token_credentials", 

89 kind=Parameter.KEYWORD_ONLY, 

90 default=Depends(oat_scheme), 

91 ) 

92 ] 

93 if Customer in allowed_subjects: 1a

94 parameters.append( 1a

95 Parameter( 

96 name="customer_session_credentials", 

97 kind=Parameter.KEYWORD_ONLY, 

98 default=Depends(customer_session_scheme), 

99 ) 

100 ) 

101 

102 signature = Signature(parameters) 1a

103 

104 @with_signature(signature) 1a

105 async def get_auth_subject(request: Request, **kwargs: Any) -> AuthSubject[Subject]: 1a

106 try: 1b

107 return request.state.auth_subject 1b

108 except AttributeError as e: 

109 raise RuntimeError( 

110 "AuthSubject is not present in the request state. " 

111 "Did you forget to add AuthSubjectMiddleware?" 

112 ) from e 

113 

114 _auth_subject_factory_cache[allowed_subjects] = get_auth_subject 1a

115 

116 return get_auth_subject 1a

117 

118 

119class _Authenticator: 1a

120 def __init__( 1a

121 self, 

122 *, 

123 allowed_subjects: frozenset[SubjectType], 

124 required_scopes: set[Scope] | None = None, 

125 ) -> None: 

126 self.allowed_subjects = allowed_subjects 1a

127 self.required_scopes = required_scopes 1a

128 

129 async def __call__( 1a

130 self, auth_subject: AuthSubject[Subject] 

131 ) -> AuthSubject[Subject]: 

132 # Not allowed subject, fallback to Anonymous 

133 subject_type = type(auth_subject.subject) 1b

134 if subject_type not in self.allowed_subjects: 1b

135 auth_subject = AuthSubject(Anonymous(), set(), None) 1b

136 

137 # Anonymous 

138 if is_anonymous(auth_subject): 1b

139 if Anonymous in self.allowed_subjects: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true1b

140 return auth_subject 

141 else: 

142 raise Unauthorized() 1b

143 

144 # No required scopes 

145 if not self.required_scopes: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true1b

146 return auth_subject 

147 

148 # Have at least one of the required scopes. Allow this request. 

149 if auth_subject.scopes & self.required_scopes: 149 ↛ 152line 149 didn't jump to line 152 because the condition on line 149 was always true1b

150 return auth_subject 1b

151 

152 raise InsufficientScopeError({s for s in self.required_scopes}) 

153 

154 

155def Authenticator( 1a

156 allowed_subjects: set[SubjectType], 

157 required_scopes: set[Scope] | None = None, 

158) -> _Authenticator: 

159 """ 

160 Here comes some blood magic 🧙‍♂️ 

161 

162 Generate a version of `_Authenticator` with an overriden `__call__` signature. 

163 

164 By doing so, we can dynamically inject the required scopes into FastAPI 

165 dependency, so they are properrly detected by the OpenAPI generator. 

166 """ 

167 allowed_subjects_frozen = frozenset(allowed_subjects) 1a

168 

169 parameters: list[Parameter] = [ 1a

170 Parameter(name="self", kind=Parameter.POSITIONAL_OR_KEYWORD), 

171 Parameter( 

172 name="auth_subject", 

173 kind=Parameter.POSITIONAL_OR_KEYWORD, 

174 default=Security( 

175 _get_auth_subject_factory(allowed_subjects_frozen), 

176 scopes=sorted( 

177 [ 

178 s.value 

179 for s in (required_scopes or {}) 

180 if s not in RESERVED_SCOPES 

181 ] 

182 ), 

183 ), 

184 ), 

185 ] 

186 signature = Signature(parameters) 1a

187 

188 class _AuthenticatorSignature(_Authenticator): 1a

189 @with_signature(signature) 1a

190 async def __call__( 1a

191 self, auth_subject: AuthSubject[Subject] 

192 ) -> AuthSubject[Subject]: 

193 return await super().__call__(auth_subject) 1b

194 

195 return _AuthenticatorSignature( 1a

196 allowed_subjects=allowed_subjects_frozen, required_scopes=required_scopes 

197 ) 

198 

199 

200_WebUserOrAnonymous = Authenticator( 1a

201 allowed_subjects={Anonymous, User}, 

202 required_scopes={Scope.web_write}, 

203) 

204WebUserOrAnonymous = Annotated[ 1a

205 AuthSubject[Anonymous | User], Depends(_WebUserOrAnonymous) 

206] 

207 

208_WebUserRead = Authenticator( 1a

209 allowed_subjects={User}, required_scopes={Scope.web_read, Scope.web_write} 

210) 

211WebUserRead = Annotated[AuthSubject[User], Depends(_WebUserRead)] 1a

212 

213_WebUserWrite = Authenticator( 1a

214 allowed_subjects={User}, required_scopes={Scope.web_write} 

215) 

216WebUserWrite = Annotated[AuthSubject[User], Depends(_WebUserWrite)] 1a