Coverage for polar/oauth2/schemas.py: 93%
130 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1import ipaddress 1a
2import re 1a
3from typing import Annotated, Any, Literal 1a
5from fastapi.openapi.constants import REF_TEMPLATE 1a
6from pydantic import ( 1a
7 UUID4,
8 AfterValidator,
9 AnyUrl,
10 BeforeValidator,
11 Discriminator,
12 EmailStr,
13 Field,
14 HttpUrl,
15 TypeAdapter,
16)
18from polar.auth.scope import SCOPES_SUPPORTED, Scope, scope_to_list 1a
19from polar.kit.schemas import Schema, TimestampedSchema 1a
21from .sub_type import SubType 1a
23_LOCALHOST_HOST_PATTERN = re.compile(r"([^\.]+\.)?localhost(\d+)?", flags=re.IGNORECASE) 1a
26def _is_localhost(host: str) -> bool: 1a
27 try:
28 return ipaddress.IPv4Address(host).is_private
29 except ValueError:
30 return _LOCALHOST_HOST_PATTERN.match(host) is not None
33def _is_https_or_localhost(value: HttpUrl) -> HttpUrl: 1a
34 if value.scheme == "http" and (value.host is None or not _is_localhost(value.host)):
35 raise ValueError("An HTTPS URL is required.")
36 return value
39URIOrLocalhost = Annotated[AnyUrl, AfterValidator(_is_https_or_localhost)] 1a
40Scopes = Annotated[list[Scope], BeforeValidator(scope_to_list)] 1a
43class OAuth2ClientConfiguration(Schema): 1a
44 redirect_uris: list[URIOrLocalhost] 1a
45 token_endpoint_auth_method: Literal[ 1a
46 "client_secret_basic", "client_secret_post", "none"
47 ] = "client_secret_post"
48 grant_types: list[Literal["authorization_code", "refresh_token"]] = [ 1a
49 "authorization_code",
50 "refresh_token",
51 ]
52 response_types: list[Literal["code"]] = ["code"] 1a
53 scope: str = " ".join(SCOPES_SUPPORTED) 1a
54 client_name: str 1a
55 client_uri: str | None = None 1a
56 logo_uri: HttpUrl | None = None 1a
57 tos_uri: HttpUrl | None = None 1a
58 policy_uri: HttpUrl | None = None 1a
59 default_sub_type: SubType = SubType.organization 1a
62class OAuth2ClientConfigurationUpdate(OAuth2ClientConfiguration): 1a
63 client_id: str 1a
66class OAuth2Client(TimestampedSchema, OAuth2ClientConfiguration): 1a
67 client_id: str 1a
68 client_secret: str 1a
69 client_id_issued_at: int 1a
70 client_secret_expires_at: int 1a
73class OAuth2ClientPublic(TimestampedSchema): 1a
74 client_id: str 1a
75 client_name: str | None 1a
76 client_uri: str | None 1a
77 logo_uri: str | None 1a
78 tos_uri: str | None 1a
79 policy_uri: str | None 1a
82class AuthorizeUser(Schema): 1a
83 id: UUID4 1a
84 email: EmailStr 1a
85 avatar_url: str | None 1a
88class AuthorizeOrganization(Schema): 1a
89 id: UUID4 1a
90 slug: str 1a
91 avatar_url: str | None 1a
94class AuthorizeResponseBase(Schema): 1a
95 client: OAuth2ClientPublic 1a
96 sub_type: SubType 1a
97 sub: AuthorizeUser | AuthorizeOrganization | None 1a
98 scopes: Scopes 1a
101class AuthorizeResponseUser(AuthorizeResponseBase): 1a
102 sub_type: Literal[SubType.user] 1a
103 sub: AuthorizeUser | None 1a
106class AuthorizeResponseOrganization(AuthorizeResponseBase): 1a
107 sub_type: Literal[SubType.organization] 1a
108 sub: AuthorizeOrganization | None 1a
109 organizations: list[AuthorizeOrganization] 1a
112AuthorizeResponse = Annotated[ 1a
113 AuthorizeResponseUser | AuthorizeResponseOrganization,
114 Discriminator(discriminator="sub_type"),
115]
117authorize_response_adapter: TypeAdapter[AuthorizeResponse] = TypeAdapter( 1a
118 AuthorizeResponse
119)
122class TokenRequestBase(Schema): 1a
123 grant_type: Literal["authorization_code", "refresh_token", "web"] 1a
124 client_id: str 1a
125 client_secret: str 1a
128class AuthorizationCodeTokenRequest(TokenRequestBase): 1a
129 grant_type: Literal["authorization_code"] 1a
130 code: str 1a
131 redirect_uri: HttpUrl 1a
134class RefreshTokenRequest(TokenRequestBase): 1a
135 grant_type: Literal["refresh_token"] 1a
137 refresh_token: str 1a
140class WebTokenRequest(TokenRequestBase): 1a
141 grant_type: Literal["web"] 1a
142 session_token: str 1a
143 sub_type: Literal["user", "organization"] = Field(default="user") 1a
144 sub: UUID4 | None = None 1a
145 scope: str | None = Field(default=None) 1a
148class TokenResponse(Schema): 1a
149 access_token: str 1a
150 token_type: Literal["Bearer"] 1a
151 expires_in: int 1a
152 refresh_token: str | None 1a
153 scope: str 1a
154 id_token: str 1a
157class RevokeTokenRequest(Schema): 1a
158 token: str 1a
159 token_type_hint: Literal["access_token", "refresh_token"] | None = None 1a
160 client_id: str 1a
161 client_secret: str 1a
164class RevokeTokenResponse(Schema): ... 1a
167class IntrospectTokenRequest(Schema): 1a
168 token: str 1a
169 token_type_hint: Literal["access_token", "refresh_token"] | None = None 1a
170 client_id: str 1a
171 client_secret: str 1a
174class IntrospectTokenResponse(Schema): 1a
175 active: bool 1a
176 client_id: str 1a
177 token_type: Literal["access_token", "refresh_token"] 1a
178 scope: str 1a
179 sub_type: SubType 1a
180 sub: str 1a
181 aud: str 1a
182 iss: str 1a
183 exp: int 1a
184 iat: int 1a
187class UserInfoUser(Schema): 1a
188 sub: str 1a
189 name: str | None = None 1a
190 email: str | None = None 1a
191 email_verified: bool | None = None 1a
194class UserInfoOrganization(Schema): 1a
195 sub: str 1a
196 name: str | None = None 1a
199UserInfo = UserInfoUser | UserInfoOrganization 1a
202def add_oauth2_form_schemas(openapi_schema: dict[str, Any]) -> dict[str, Any]: 1a
203 openapi_schema["components"]["schemas"]["AuthorizationCodeTokenRequest"] = ( 1b
204 AuthorizationCodeTokenRequest.model_json_schema(ref_template=REF_TEMPLATE)
205 )
206 openapi_schema["components"]["schemas"]["RefreshTokenRequest"] = ( 1b
207 RefreshTokenRequest.model_json_schema(ref_template=REF_TEMPLATE)
208 )
209 openapi_schema["components"]["schemas"]["WebTokenRequest"] = ( 1b
210 WebTokenRequest.model_json_schema(ref_template=REF_TEMPLATE)
211 )
212 openapi_schema["components"]["schemas"]["RevokeTokenRequest"] = ( 1b
213 RevokeTokenRequest.model_json_schema(ref_template=REF_TEMPLATE)
214 )
215 openapi_schema["components"]["schemas"]["IntrospectTokenRequest"] = ( 1b
216 IntrospectTokenRequest.model_json_schema(ref_template=REF_TEMPLATE)
217 )
218 return openapi_schema 1b