Coverage for polar/models/user.py: 67%
116 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import time 1ab
2from datetime import datetime 1ab
3from enum import StrEnum 1ab
4from typing import TYPE_CHECKING, Any 1ab
5from uuid import UUID 1ab
7from sqlalchemy import ( 1ab
8 TIMESTAMP,
9 Boolean,
10 Column,
11 ColumnElement,
12 ForeignKey,
13 Integer,
14 String,
15 Text,
16 Uuid,
17 and_,
18 func,
19)
20from sqlalchemy.dialects.postgresql import JSONB 1ab
21from sqlalchemy.ext.hybrid import hybrid_property 1ab
22from sqlalchemy.orm import Mapped, declared_attr, mapped_column, relationship 1ab
23from sqlalchemy.schema import Index, UniqueConstraint 1ab
25from polar.kit.db.models import RecordModel 1ab
26from polar.kit.extensions.sqlalchemy.types import StringEnum 1ab
27from polar.kit.schemas import Schema 1ab
29from .account import Account 1ab
31if TYPE_CHECKING: 31 ↛ 32line 31 didn't jump to line 32 because the condition on line 31 was never true1ab
32 pass
35class OAuthPlatform(StrEnum): 1ab
36 # maximum allowed length is 32 chars
37 github = "github" 1ab
38 github_repository_benefit = "github_repository_benefit" 1ab
39 google = "google" 1ab
40 apple = "apple" 1ab
43class IdentityVerificationStatus(StrEnum): 1ab
44 unverified = "unverified" 1ab
45 pending = "pending" 1ab
46 verified = "verified" 1ab
47 failed = "failed" 1ab
49 def get_display_name(self) -> str: 1ab
50 return {
51 IdentityVerificationStatus.unverified: "Unverified",
52 IdentityVerificationStatus.pending: "Pending",
53 IdentityVerificationStatus.verified: "Verified",
54 IdentityVerificationStatus.failed: "Failed",
55 }[self]
58class OAuthAccount(RecordModel): 1ab
59 __tablename__ = "oauth_accounts" 1ab
60 __table_args__ = ( 1ab
61 UniqueConstraint(
62 "platform",
63 "account_id",
64 ),
65 Index("idx_user_id_platform", "user_id", "platform"),
66 )
68 platform: Mapped[OAuthPlatform] = mapped_column(String(32), nullable=False) 1ab
69 access_token: Mapped[str] = mapped_column(String(1024), nullable=False) 1ab
70 expires_at: Mapped[int | None] = mapped_column(Integer, nullable=True, default=None) 1ab
71 refresh_token: Mapped[str | None] = mapped_column( 1ab
72 String(1024), nullable=True, default=None
73 )
74 refresh_token_expires_at: Mapped[int | None] = mapped_column( 1ab
75 Integer, nullable=True, default=None
76 )
77 account_id: Mapped[str] = mapped_column(String(320), nullable=False) 1ab
78 account_email: Mapped[str] = mapped_column(String(320), nullable=False) 1ab
79 account_username: Mapped[str | None] = mapped_column(String(320), nullable=True) 1ab
81 user_id: Mapped[UUID] = mapped_column( 1ab
82 Uuid,
83 ForeignKey("users.id", ondelete="cascade"),
84 nullable=False,
85 )
87 user: Mapped["User"] = relationship("User", back_populates="oauth_accounts") 1ab
89 def is_access_token_expired(self) -> bool: 1ab
90 if self.expires_at is None:
91 return False
92 return time.time() > self.expires_at
94 def should_refresh_access_token(self, unless_ttl_gt: int = 60 * 30) -> bool: 1ab
95 if (
96 self.expires_at
97 and self.refresh_token
98 and self.expires_at <= (time.time() + unless_ttl_gt)
99 ):
100 return True
101 return False
104class User(RecordModel): 1ab
105 __tablename__ = "users" 1ab
106 __table_args__ = ( 1ab
107 Index(
108 "ix_users_email_case_insensitive", func.lower(Column("email")), unique=True
109 ),
110 )
112 is_admin: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) 1ab
114 email: Mapped[str] = mapped_column(String(320), nullable=False) 1ab
115 email_verified: Mapped[bool] = mapped_column(Boolean, nullable=False, default=False) 1ab
116 avatar_url: Mapped[str | None] = mapped_column(Text, nullable=True, default=None) 1ab
118 account_id: Mapped[UUID | None] = mapped_column( 1ab
119 Uuid,
120 ForeignKey("accounts.id", ondelete="set null"),
121 nullable=True,
122 )
124 @declared_attr 1ab
125 def account(cls) -> Mapped[Account | None]: 1ab
126 return relationship( 1ab
127 Account,
128 lazy="raise",
129 back_populates="users",
130 foreign_keys="[User.account_id]",
131 )
133 @declared_attr 1ab
134 def oauth_accounts(cls) -> Mapped[list[OAuthAccount]]: 1ab
135 return relationship(OAuthAccount, lazy="joined", back_populates="user") 1ab
137 accepted_terms_of_service: Mapped[bool] = mapped_column( 1ab
138 Boolean,
139 nullable=False,
140 default=False,
141 )
143 stripe_customer_id: Mapped[str | None] = mapped_column( 1ab
144 String, nullable=True, default=None, unique=True
145 )
147 identity_verification_status: Mapped[IdentityVerificationStatus] = mapped_column( 1ab
148 StringEnum(IdentityVerificationStatus),
149 nullable=False,
150 default=IdentityVerificationStatus.unverified,
151 )
152 identity_verification_id: Mapped[str | None] = mapped_column( 1ab
153 String, nullable=True, default=None, unique=True
154 )
156 @property 1ab
157 def identity_verified(self) -> bool: 1ab
158 return self.identity_verification_status == IdentityVerificationStatus.verified
160 # Time of blocking traffic/activity for given user
161 blocked_at: Mapped[datetime | None] = mapped_column( 1ab
162 TIMESTAMP(timezone=True),
163 nullable=True,
164 default=None,
165 )
167 meta: Mapped[dict[str, Any]] = mapped_column(JSONB, nullable=False, default=dict) 1ab
169 @hybrid_property 1ab
170 def can_authenticate(self) -> bool: 1ab
171 return self.deleted_at is None and self.blocked_at is None
173 @can_authenticate.inplace.expression 1ab
174 @classmethod 1ab
175 def _can_authenticate_expression(cls) -> ColumnElement[bool]: 1ab
176 return and_(cls.deleted_at.is_(None), cls.blocked_at.is_(None)) 1c
178 @property 1ab
179 def signup_attribution(self) -> dict[str, Any]: 1ab
180 return self.meta.get("signup", {})
182 @signup_attribution.setter 1ab
183 def signup_attribution(self, value: dict[str, Any] | Schema | None) -> None: 1ab
184 if not value:
185 return
187 meta = self.meta or {}
188 if isinstance(value, Schema):
189 value = value.model_dump(exclude_unset=True)
191 meta["signup"] = value
192 self.meta = meta
194 @property 1ab
195 def had_creator_signup_intent(self) -> bool: 1ab
196 return self.signup_attribution.get("intent") == "creator"
198 @property 1ab
199 def campaign_code(self) -> str | None: 1ab
200 return self.signup_attribution.get("campaign")
202 def get_oauth_account(self, platform: OAuthPlatform) -> OAuthAccount | None: 1ab
203 return next(
204 (
205 account
206 for account in self.oauth_accounts
207 if account.platform == platform
208 ),
209 None,
210 )
212 def get_github_account(self) -> OAuthAccount | None: 1ab
213 return self.get_oauth_account(OAuthPlatform.github)
215 @property 1ab
216 def posthog_distinct_id(self) -> str: 1ab
217 return f"user:{self.id}" 1c
219 @property 1ab
220 def public_name(self) -> str: 1ab
221 github_oauth_account = self.get_github_account()
222 if github_oauth_account is not None and github_oauth_account.account_username:
223 return github_oauth_account.account_username
224 return self.email[0]
226 @property 1ab
227 def github_username(self) -> str | None: 1ab
228 github_oauth_account = self.get_github_account()
229 if github_oauth_account is not None:
230 return github_oauth_account.account_username
231 return None