Coverage for polar/auth/dependencies.py: 91%
68 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1from collections.abc import Awaitable, Callable 1a
2from inspect import Parameter, Signature 1a
3from typing import Annotated, Any 1a
5from fastapi import Depends, Request, Security 1a
6from fastapi.security import HTTPBearer, OpenIdConnect 1a
7from makefun import with_signature 1a
9from polar.auth.scope import RESERVED_SCOPES, Scope 1a
10from polar.exceptions import Unauthorized 1a
11from polar.oauth2.exceptions import InsufficientScopeError 1a
13from .models import ( 1a
14 Anonymous,
15 AuthSubject,
16 Customer,
17 Organization,
18 Subject,
19 SubjectType,
20 User,
21 is_anonymous,
22)
24oidc_scheme = OpenIdConnect( 1a
25 scheme_name="oidc",
26 openIdConnectUrl="/.well-known/openid-configuration",
27 auto_error=False,
28)
29oat_scheme = HTTPBearer( 1a
30 scheme_name="oat",
31 auto_error=False,
32 description="You can generate an **Organization Access Token** from your organization's settings.",
33)
34pat_scheme = HTTPBearer( 1a
35 scheme_name="pat",
36 auto_error=False,
37 description="You can generate a **Personal Access Token** from your [settings](https://polar.sh/settings).",
38)
39customer_session_scheme = HTTPBearer( 1a
40 scheme_name="customer_session",
41 auto_error=False,
42 description=(
43 "Customer session tokens are specific tokens "
44 "that are used to authenticate customers on your organization. "
45 "You can create those sessions programmatically using the "
46 "[Create Customer Session endpoint](/api-reference/customer-portal/sessions/create)."
47 ),
48)
51_auth_subject_factory_cache: dict[ 1a
52 frozenset[SubjectType], Callable[..., Awaitable[AuthSubject[Subject]]]
53] = {}
56def _get_auth_subject_factory( 1a
57 allowed_subjects: frozenset[SubjectType],
58) -> Callable[..., Awaitable[AuthSubject[Subject]]]:
59 if allowed_subjects in _auth_subject_factory_cache: 1a
60 return _auth_subject_factory_cache[allowed_subjects] 1a
62 parameters: list[Parameter] = [ 1a
63 Parameter(
64 name="request",
65 kind=Parameter.POSITIONAL_OR_KEYWORD,
66 annotation=Request,
67 )
68 ]
69 if User in allowed_subjects or Organization in allowed_subjects: 1a
70 parameters += [ 1a
71 Parameter(
72 name="oauth2_credentials",
73 kind=Parameter.KEYWORD_ONLY,
74 default=Depends(oidc_scheme),
75 )
76 ]
77 if User in allowed_subjects: 1a
78 parameters += [ 1a
79 Parameter(
80 name="personal_access_token_credentials",
81 kind=Parameter.KEYWORD_ONLY,
82 default=Depends(pat_scheme),
83 ),
84 ]
85 if Organization in allowed_subjects: 1a
86 parameters += [ 1a
87 Parameter(
88 name="organization_access_token_credentials",
89 kind=Parameter.KEYWORD_ONLY,
90 default=Depends(oat_scheme),
91 )
92 ]
93 if Customer in allowed_subjects: 1a
94 parameters.append( 1a
95 Parameter(
96 name="customer_session_credentials",
97 kind=Parameter.KEYWORD_ONLY,
98 default=Depends(customer_session_scheme),
99 )
100 )
102 signature = Signature(parameters) 1a
104 @with_signature(signature) 1a
105 async def get_auth_subject(request: Request, **kwargs: Any) -> AuthSubject[Subject]: 1a
106 try: 1b
107 return request.state.auth_subject 1b
108 except AttributeError as e:
109 raise RuntimeError(
110 "AuthSubject is not present in the request state. "
111 "Did you forget to add AuthSubjectMiddleware?"
112 ) from e
114 _auth_subject_factory_cache[allowed_subjects] = get_auth_subject 1a
116 return get_auth_subject 1a
119class _Authenticator: 1a
120 def __init__( 1a
121 self,
122 *,
123 allowed_subjects: frozenset[SubjectType],
124 required_scopes: set[Scope] | None = None,
125 ) -> None:
126 self.allowed_subjects = allowed_subjects 1a
127 self.required_scopes = required_scopes 1a
129 async def __call__( 1a
130 self, auth_subject: AuthSubject[Subject]
131 ) -> AuthSubject[Subject]:
132 # Not allowed subject, fallback to Anonymous
133 subject_type = type(auth_subject.subject) 1b
134 if subject_type not in self.allowed_subjects: 1b
135 auth_subject = AuthSubject(Anonymous(), set(), None) 1b
137 # Anonymous
138 if is_anonymous(auth_subject): 1b
139 if Anonymous in self.allowed_subjects: 139 ↛ 140line 139 didn't jump to line 140 because the condition on line 139 was never true1b
140 return auth_subject
141 else:
142 raise Unauthorized() 1b
144 # No required scopes
145 if not self.required_scopes: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true1b
146 return auth_subject
148 # Have at least one of the required scopes. Allow this request.
149 if auth_subject.scopes & self.required_scopes: 149 ↛ 152line 149 didn't jump to line 152 because the condition on line 149 was always true1b
150 return auth_subject 1b
152 raise InsufficientScopeError({s for s in self.required_scopes})
155def Authenticator( 1a
156 allowed_subjects: set[SubjectType],
157 required_scopes: set[Scope] | None = None,
158) -> _Authenticator:
159 """
160 Here comes some blood magic 🧙♂️
162 Generate a version of `_Authenticator` with an overriden `__call__` signature.
164 By doing so, we can dynamically inject the required scopes into FastAPI
165 dependency, so they are properrly detected by the OpenAPI generator.
166 """
167 allowed_subjects_frozen = frozenset(allowed_subjects) 1a
169 parameters: list[Parameter] = [ 1a
170 Parameter(name="self", kind=Parameter.POSITIONAL_OR_KEYWORD),
171 Parameter(
172 name="auth_subject",
173 kind=Parameter.POSITIONAL_OR_KEYWORD,
174 default=Security(
175 _get_auth_subject_factory(allowed_subjects_frozen),
176 scopes=sorted(
177 [
178 s.value
179 for s in (required_scopes or {})
180 if s not in RESERVED_SCOPES
181 ]
182 ),
183 ),
184 ),
185 ]
186 signature = Signature(parameters) 1a
188 class _AuthenticatorSignature(_Authenticator): 1a
189 @with_signature(signature) 1a
190 async def __call__( 1a
191 self, auth_subject: AuthSubject[Subject]
192 ) -> AuthSubject[Subject]:
193 return await super().__call__(auth_subject) 1b
195 return _AuthenticatorSignature( 1a
196 allowed_subjects=allowed_subjects_frozen, required_scopes=required_scopes
197 )
200_WebUserOrAnonymous = Authenticator( 1a
201 allowed_subjects={Anonymous, User},
202 required_scopes={Scope.web_write},
203)
204WebUserOrAnonymous = Annotated[ 1a
205 AuthSubject[Anonymous | User], Depends(_WebUserOrAnonymous)
206]
208_WebUserRead = Authenticator( 1a
209 allowed_subjects={User}, required_scopes={Scope.web_read, Scope.web_write}
210)
211WebUserRead = Annotated[AuthSubject[User], Depends(_WebUserRead)] 1a
213_WebUserWrite = Authenticator( 1a
214 allowed_subjects={User}, required_scopes={Scope.web_write}
215)
216WebUserWrite = Annotated[AuthSubject[User], Depends(_WebUserWrite)] 1a