Coverage for polar/oauth2/schemas.py: 93%

130 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1import ipaddress 1a

2import re 1a

3from typing import Annotated, Any, Literal 1a

4 

5from fastapi.openapi.constants import REF_TEMPLATE 1a

6from pydantic import ( 1a

7 UUID4, 

8 AfterValidator, 

9 AnyUrl, 

10 BeforeValidator, 

11 Discriminator, 

12 EmailStr, 

13 Field, 

14 HttpUrl, 

15 TypeAdapter, 

16) 

17 

18from polar.auth.scope import SCOPES_SUPPORTED, Scope, scope_to_list 1a

19from polar.kit.schemas import Schema, TimestampedSchema 1a

20 

21from .sub_type import SubType 1a

22 

23_LOCALHOST_HOST_PATTERN = re.compile(r"([^\.]+\.)?localhost(\d+)?", flags=re.IGNORECASE) 1a

24 

25 

26def _is_localhost(host: str) -> bool: 1a

27 try: 

28 return ipaddress.IPv4Address(host).is_private 

29 except ValueError: 

30 return _LOCALHOST_HOST_PATTERN.match(host) is not None 

31 

32 

33def _is_https_or_localhost(value: HttpUrl) -> HttpUrl: 1a

34 if value.scheme == "http" and (value.host is None or not _is_localhost(value.host)): 

35 raise ValueError("An HTTPS URL is required.") 

36 return value 

37 

38 

39URIOrLocalhost = Annotated[AnyUrl, AfterValidator(_is_https_or_localhost)] 1a

40Scopes = Annotated[list[Scope], BeforeValidator(scope_to_list)] 1a

41 

42 

43class OAuth2ClientConfiguration(Schema): 1a

44 redirect_uris: list[URIOrLocalhost] 1a

45 token_endpoint_auth_method: Literal[ 1a

46 "client_secret_basic", "client_secret_post", "none" 

47 ] = "client_secret_post" 

48 grant_types: list[Literal["authorization_code", "refresh_token"]] = [ 1a

49 "authorization_code", 

50 "refresh_token", 

51 ] 

52 response_types: list[Literal["code"]] = ["code"] 1a

53 scope: str = " ".join(SCOPES_SUPPORTED) 1a

54 client_name: str 1a

55 client_uri: str | None = None 1a

56 logo_uri: HttpUrl | None = None 1a

57 tos_uri: HttpUrl | None = None 1a

58 policy_uri: HttpUrl | None = None 1a

59 default_sub_type: SubType = SubType.organization 1a

60 

61 

62class OAuth2ClientConfigurationUpdate(OAuth2ClientConfiguration): 1a

63 client_id: str 1a

64 

65 

66class OAuth2Client(TimestampedSchema, OAuth2ClientConfiguration): 1a

67 client_id: str 1a

68 client_secret: str 1a

69 client_id_issued_at: int 1a

70 client_secret_expires_at: int 1a

71 

72 

73class OAuth2ClientPublic(TimestampedSchema): 1a

74 client_id: str 1a

75 client_name: str | None 1a

76 client_uri: str | None 1a

77 logo_uri: str | None 1a

78 tos_uri: str | None 1a

79 policy_uri: str | None 1a

80 

81 

82class AuthorizeUser(Schema): 1a

83 id: UUID4 1a

84 email: EmailStr 1a

85 avatar_url: str | None 1a

86 

87 

88class AuthorizeOrganization(Schema): 1a

89 id: UUID4 1a

90 slug: str 1a

91 avatar_url: str | None 1a

92 

93 

94class AuthorizeResponseBase(Schema): 1a

95 client: OAuth2ClientPublic 1a

96 sub_type: SubType 1a

97 sub: AuthorizeUser | AuthorizeOrganization | None 1a

98 scopes: Scopes 1a

99 

100 

101class AuthorizeResponseUser(AuthorizeResponseBase): 1a

102 sub_type: Literal[SubType.user] 1a

103 sub: AuthorizeUser | None 1a

104 

105 

106class AuthorizeResponseOrganization(AuthorizeResponseBase): 1a

107 sub_type: Literal[SubType.organization] 1a

108 sub: AuthorizeOrganization | None 1a

109 organizations: list[AuthorizeOrganization] 1a

110 

111 

112AuthorizeResponse = Annotated[ 1a

113 AuthorizeResponseUser | AuthorizeResponseOrganization, 

114 Discriminator(discriminator="sub_type"), 

115] 

116 

117authorize_response_adapter: TypeAdapter[AuthorizeResponse] = TypeAdapter( 1a

118 AuthorizeResponse 

119) 

120 

121 

122class TokenRequestBase(Schema): 1a

123 grant_type: Literal["authorization_code", "refresh_token", "web"] 1a

124 client_id: str 1a

125 client_secret: str 1a

126 

127 

128class AuthorizationCodeTokenRequest(TokenRequestBase): 1a

129 grant_type: Literal["authorization_code"] 1a

130 code: str 1a

131 redirect_uri: HttpUrl 1a

132 

133 

134class RefreshTokenRequest(TokenRequestBase): 1a

135 grant_type: Literal["refresh_token"] 1a

136 

137 refresh_token: str 1a

138 

139 

140class WebTokenRequest(TokenRequestBase): 1a

141 grant_type: Literal["web"] 1a

142 session_token: str 1a

143 sub_type: Literal["user", "organization"] = Field(default="user") 1a

144 sub: UUID4 | None = None 1a

145 scope: str | None = Field(default=None) 1a

146 

147 

148class TokenResponse(Schema): 1a

149 access_token: str 1a

150 token_type: Literal["Bearer"] 1a

151 expires_in: int 1a

152 refresh_token: str | None 1a

153 scope: str 1a

154 id_token: str 1a

155 

156 

157class RevokeTokenRequest(Schema): 1a

158 token: str 1a

159 token_type_hint: Literal["access_token", "refresh_token"] | None = None 1a

160 client_id: str 1a

161 client_secret: str 1a

162 

163 

164class RevokeTokenResponse(Schema): ... 1a

165 

166 

167class IntrospectTokenRequest(Schema): 1a

168 token: str 1a

169 token_type_hint: Literal["access_token", "refresh_token"] | None = None 1a

170 client_id: str 1a

171 client_secret: str 1a

172 

173 

174class IntrospectTokenResponse(Schema): 1a

175 active: bool 1a

176 client_id: str 1a

177 token_type: Literal["access_token", "refresh_token"] 1a

178 scope: str 1a

179 sub_type: SubType 1a

180 sub: str 1a

181 aud: str 1a

182 iss: str 1a

183 exp: int 1a

184 iat: int 1a

185 

186 

187class UserInfoUser(Schema): 1a

188 sub: str 1a

189 name: str | None = None 1a

190 email: str | None = None 1a

191 email_verified: bool | None = None 1a

192 

193 

194class UserInfoOrganization(Schema): 1a

195 sub: str 1a

196 name: str | None = None 1a

197 

198 

199UserInfo = UserInfoUser | UserInfoOrganization 1a

200 

201 

202def add_oauth2_form_schemas(openapi_schema: dict[str, Any]) -> dict[str, Any]: 1a

203 openapi_schema["components"]["schemas"]["AuthorizationCodeTokenRequest"] = ( 1b

204 AuthorizationCodeTokenRequest.model_json_schema(ref_template=REF_TEMPLATE) 

205 ) 

206 openapi_schema["components"]["schemas"]["RefreshTokenRequest"] = ( 1b

207 RefreshTokenRequest.model_json_schema(ref_template=REF_TEMPLATE) 

208 ) 

209 openapi_schema["components"]["schemas"]["WebTokenRequest"] = ( 1b

210 WebTokenRequest.model_json_schema(ref_template=REF_TEMPLATE) 

211 ) 

212 openapi_schema["components"]["schemas"]["RevokeTokenRequest"] = ( 1b

213 RevokeTokenRequest.model_json_schema(ref_template=REF_TEMPLATE) 

214 ) 

215 openapi_schema["components"]["schemas"]["IntrospectTokenRequest"] = ( 1b

216 IntrospectTokenRequest.model_json_schema(ref_template=REF_TEMPLATE) 

217 ) 

218 return openapi_schema 1b