Coverage for polar/models/user.py: 67%

116 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import time 1ab

2from datetime import datetime 1ab

3from enum import StrEnum 1ab

4from typing import TYPE_CHECKING, Any 1ab

5from uuid import UUID 1ab

6 

7from sqlalchemy import ( 1ab

8 TIMESTAMP, 

9 Boolean, 

10 Column, 

11 ColumnElement, 

12 ForeignKey, 

13 Integer, 

14 String, 

15 Text, 

16 Uuid, 

17 and_, 

18 func, 

19) 

20from sqlalchemy.dialects.postgresql import JSONB 1ab

21from sqlalchemy.ext.hybrid import hybrid_property 1ab

22from sqlalchemy.orm import Mapped, declared_attr, mapped_column, relationship 1ab

23from sqlalchemy.schema import Index, UniqueConstraint 1ab

24 

25from polar.kit.db.models import RecordModel 1ab

26from polar.kit.extensions.sqlalchemy.types import StringEnum 1ab

27from polar.kit.schemas import Schema 1ab

28 

29from .account import Account 1ab

30 

31if TYPE_CHECKING: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true1ab

32 pass 

33 

34 

35class OAuthPlatform(StrEnum): 1ab

36 # maximum allowed length is 32 chars 

37 github = "github" 1ab

38 github_repository_benefit = "github_repository_benefit" 1ab

39 google = "google" 1ab

40 apple = "apple" 1ab

41 

42 

43class IdentityVerificationStatus(StrEnum): 1ab

44 unverified = "unverified" 1ab

45 pending = "pending" 1ab

46 verified = "verified" 1ab

47 failed = "failed" 1ab

48 

49 def get_display_name(self) -> str: 1ab

50 return { 

51 IdentityVerificationStatus.unverified: "Unverified", 

52 IdentityVerificationStatus.pending: "Pending", 

53 IdentityVerificationStatus.verified: "Verified", 

54 IdentityVerificationStatus.failed: "Failed", 

55 }[self] 

56 

57 

58class OAuthAccount(RecordModel): 1ab

59 __tablename__ = "oauth_accounts" 1ab

60 __table_args__ = ( 1ab

61 UniqueConstraint( 

62 "platform", 

63 "account_id", 

64 ), 

65 Index("idx_user_id_platform", "user_id", "platform"), 

66 ) 

67 

68 platform: Mapped[OAuthPlatform] = mapped_column(String(32), nullable=False) 1ab

69 access_token: Mapped[str] = mapped_column(String(1024), nullable=False) 1ab

70 expires_at: Mapped[int | None] = mapped_column(Integer, nullable=True, default=None) 1ab

71 refresh_token: Mapped[str | None] = mapped_column( 1ab

72 String(1024), nullable=True, default=None 

73 ) 

74 refresh_token_expires_at: Mapped[int | None] = mapped_column( 1ab

75 Integer, nullable=True, default=None 

76 ) 

77 account_id: Mapped[str] = mapped_column(String(320), nullable=False) 1ab

78 account_email: Mapped[str] = mapped_column(String(320), nullable=False) 1ab

79 account_username: Mapped[str | None] = mapped_column(String(320), nullable=True) 1ab

80 

81 user_id: Mapped[UUID] = mapped_column( 1ab

82 Uuid, 

83 ForeignKey("users.id", ondelete="cascade"), 

84 nullable=False, 

85 ) 

86 

87 user: Mapped["User"] = relationship("User", back_populates="oauth_accounts") 1ab

88 

89 def is_access_token_expired(self) -> bool: 1ab

90 if self.expires_at is None: 

91 return False 

92 return time.time() > self.expires_at 

93 

94 def should_refresh_access_token(self, unless_ttl_gt: int = 60 * 30) -> bool: 1ab

95 if ( 

96 self.expires_at 

97 and self.refresh_token 

98 and self.expires_at <= (time.time() + unless_ttl_gt) 

99 ): 

100 return True 

101 return False 

102 

103 

104class User(RecordModel): 1ab

105 __tablename__ = "users" 1ab

106 __table_args__ = ( 1ab

107 Index( 

108 "ix_users_email_case_insensitive", func.lower(Column("email")), unique=True 

109 ), 

110 ) 

111 

112 is_admin: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) 1ab

113 

114 email: Mapped[str] = mapped_column(String(320), nullable=False) 1ab

115 email_verified: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) 1ab

116 avatar_url: Mapped[str | None] = mapped_column(Text, nullable=True, default=None) 1ab

117 

118 account_id: Mapped[UUID | None] = mapped_column( 1ab

119 Uuid, 

120 ForeignKey("accounts.id", ondelete="set null"), 

121 nullable=True, 

122 ) 

123 

124 @declared_attr 1ab

125 def account(cls) -> Mapped[Account | None]: 1ab

126 return relationship( 1ab

127 Account, 

128 lazy="raise", 

129 back_populates="users", 

130 foreign_keys="[User.account_id]", 

131 ) 

132 

133 @declared_attr 1ab

134 def oauth_accounts(cls) -> Mapped[list[OAuthAccount]]: 1ab

135 return relationship(OAuthAccount, lazy="joined", back_populates="user") 1ab

136 

137 accepted_terms_of_service: Mapped[bool] = mapped_column( 1ab

138 Boolean, 

139 nullable=False, 

140 default=False, 

141 ) 

142 

143 stripe_customer_id: Mapped[str | None] = mapped_column( 1ab

144 String, nullable=True, default=None, unique=True 

145 ) 

146 

147 identity_verification_status: Mapped[IdentityVerificationStatus] = mapped_column( 1ab

148 StringEnum(IdentityVerificationStatus), 

149 nullable=False, 

150 default=IdentityVerificationStatus.unverified, 

151 ) 

152 identity_verification_id: Mapped[str | None] = mapped_column( 1ab

153 String, nullable=True, default=None, unique=True 

154 ) 

155 

156 @property 1ab

157 def identity_verified(self) -> bool: 1ab

158 return self.identity_verification_status == IdentityVerificationStatus.verified 

159 

160 # Time of blocking traffic/activity for given user 

161 blocked_at: Mapped[datetime | None] = mapped_column( 1ab

162 TIMESTAMP(timezone=True), 

163 nullable=True, 

164 default=None, 

165 ) 

166 

167 meta: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default=dict) 1ab

168 

169 @hybrid_property 1ab

170 def can_authenticate(self) -> bool: 1ab

171 return self.deleted_at is None and self.blocked_at is None 

172 

173 @can_authenticate.inplace.expression 1ab

174 @classmethod 1ab

175 def _can_authenticate_expression(cls) -> ColumnElement[bool]: 1ab

176 return and_(cls.deleted_at.is_(None), cls.blocked_at.is_(None)) 1c

177 

178 @property 1ab

179 def signup_attribution(self) -> dict[str, Any]: 1ab

180 return self.meta.get("signup", {}) 

181 

182 @signup_attribution.setter 1ab

183 def signup_attribution(self, value: dict[str, Any] | Schema | None) -> None: 1ab

184 if not value: 

185 return 

186 

187 meta = self.meta or {} 

188 if isinstance(value, Schema): 

189 value = value.model_dump(exclude_unset=True) 

190 

191 meta["signup"] = value 

192 self.meta = meta 

193 

194 @property 1ab

195 def had_creator_signup_intent(self) -> bool: 1ab

196 return self.signup_attribution.get("intent") == "creator" 

197 

198 @property 1ab

199 def campaign_code(self) -> str | None: 1ab

200 return self.signup_attribution.get("campaign") 

201 

202 def get_oauth_account(self, platform: OAuthPlatform) -> OAuthAccount | None: 1ab

203 return next( 

204 ( 

205 account 

206 for account in self.oauth_accounts 

207 if account.platform == platform 

208 ), 

209 None, 

210 ) 

211 

212 def get_github_account(self) -> OAuthAccount | None: 1ab

213 return self.get_oauth_account(OAuthPlatform.github) 

214 

215 @property 1ab

216 def posthog_distinct_id(self) -> str: 1ab

217 return f"user:{self.id}" 1c

218 

219 @property 1ab

220 def public_name(self) -> str: 1ab

221 github_oauth_account = self.get_github_account() 

222 if github_oauth_account is not None and github_oauth_account.account_username: 

223 return github_oauth_account.account_username 

224 return self.email[0] 

225 

226 @property 1ab

227 def github_username(self) -> str | None: 1ab

228 github_oauth_account = self.get_github_account() 

229 if github_oauth_account is not None: 

230 return github_oauth_account.account_username 

231 return None