Coverage for polar/customer_portal/service/benefit_grant.py: 26%
77 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import uuid 1a
2from collections.abc import Sequence 1a
3from enum import StrEnum 1a
4from typing import Any, cast 1a
6from sqlalchemy import Select, UnaryExpression, and_, asc, desc, or_, select 1a
7from sqlalchemy.orm import contains_eager, joinedload 1a
9from polar.auth.models import AuthSubject 1a
10from polar.customer.repository import CustomerRepository 1a
11from polar.exceptions import NotPermitted, PolarRequestValidationError 1a
12from polar.kit.db.postgres import AsyncSession 1a
13from polar.kit.pagination import PaginationParams, paginate 1a
14from polar.kit.services import ResourceServiceReader 1a
15from polar.kit.sorting import Sorting 1a
16from polar.models import ( 1a
17 Benefit,
18 BenefitGrant,
19 Customer,
20 Order,
21 Organization,
22 Product,
23 ProductBenefit,
24 Subscription,
25)
26from polar.models.benefit import BenefitType 1a
27from polar.worker import enqueue_job 1a
29from ..schemas.benefit_grant import ( 1a
30 CustomerBenefitGrantDiscordUpdate,
31 CustomerBenefitGrantGitHubRepositoryUpdate,
32 CustomerBenefitGrantUpdate,
33)
36class CustomerBenefitGrantSortProperty(StrEnum): 1a
37 granted_at = "granted_at" 1a
38 type = "type" 1a
39 organization = "organization" 1a
40 product_benefit = "product_benefit" 1a
43class CustomerBenefitGrantService(ResourceServiceReader[BenefitGrant]): 1a
44 async def list( 1a
45 self,
46 session: AsyncSession,
47 auth_subject: AuthSubject[Customer],
48 *,
49 type: Sequence[BenefitType] | None = None,
50 benefit_id: Sequence[uuid.UUID] | None = None,
51 checkout_id: Sequence[uuid.UUID] | None = None,
52 order_id: Sequence[uuid.UUID] | None = None,
53 subscription_id: Sequence[uuid.UUID] | None = None,
54 pagination: PaginationParams,
55 sorting: list[Sorting[CustomerBenefitGrantSortProperty]] = [
56 (CustomerBenefitGrantSortProperty.product_benefit, False),
57 (CustomerBenefitGrantSortProperty.granted_at, True),
58 ],
59 ) -> tuple[Sequence[BenefitGrant], int]:
60 statement = self._get_readable_benefit_grant_statement(auth_subject).options(
61 joinedload(BenefitGrant.customer)
62 )
64 if type is not None:
65 statement = statement.where(Benefit.type.in_(type))
67 if benefit_id is not None:
68 statement = statement.where(BenefitGrant.benefit_id.in_(benefit_id))
70 statement = (
71 statement.join(
72 Subscription,
73 onclause=Subscription.id == BenefitGrant.subscription_id,
74 isouter=True,
75 )
76 .join(
77 Order,
78 onclause=Order.id == BenefitGrant.order_id,
79 isouter=True,
80 )
81 .join(
82 Product,
83 onclause=or_(
84 Product.id == Subscription.product_id,
85 Product.id == Order.product_id,
86 ),
87 isouter=True,
88 )
89 .join(
90 ProductBenefit,
91 onclause=and_(
92 ProductBenefit.product_id == Product.id,
93 ProductBenefit.benefit_id == BenefitGrant.benefit_id,
94 ),
95 isouter=True,
96 )
97 )
99 if checkout_id is not None:
100 statement = statement.where(
101 or_(
102 Subscription.checkout_id.in_(checkout_id),
103 Order.checkout_id.in_(checkout_id),
104 )
105 )
107 if order_id is not None:
108 statement = statement.where(BenefitGrant.order_id.in_(order_id))
110 if subscription_id is not None:
111 statement = statement.where(
112 BenefitGrant.subscription_id.in_(subscription_id)
113 )
115 order_by_clauses: list[UnaryExpression[Any]] = []
116 for criterion, is_desc in sorting:
117 clause_function = desc if is_desc else asc
118 if criterion == CustomerBenefitGrantSortProperty.granted_at:
119 order_by_clauses.append(clause_function(BenefitGrant.granted_at))
120 elif criterion == CustomerBenefitGrantSortProperty.type:
121 order_by_clauses.append(clause_function(Benefit.type))
122 elif criterion == CustomerBenefitGrantSortProperty.organization:
123 order_by_clauses.append(clause_function(Organization.slug))
124 elif criterion == CustomerBenefitGrantSortProperty.product_benefit:
125 order_by_clauses.append(clause_function(ProductBenefit.order))
126 statement = statement.order_by(*order_by_clauses)
128 return await paginate(session, statement, pagination=pagination)
130 async def get_by_id( 1a
131 self,
132 session: AsyncSession,
133 auth_subject: AuthSubject[Customer],
134 id: uuid.UUID,
135 ) -> BenefitGrant | None:
136 statement = (
137 self._get_readable_benefit_grant_statement(auth_subject)
138 .where(BenefitGrant.id == id)
139 .options(joinedload(BenefitGrant.customer))
140 )
142 result = await session.execute(statement)
143 return result.scalar_one_or_none()
145 async def update( 1a
146 self,
147 session: AsyncSession,
148 benefit_grant: BenefitGrant,
149 benefit_grant_update: CustomerBenefitGrantUpdate,
150 ) -> BenefitGrant:
151 if benefit_grant.is_revoked:
152 raise NotPermitted("Cannot update a revoked benefit grant.")
154 if benefit_grant_update.benefit_type != benefit_grant.benefit.type:
155 raise PolarRequestValidationError(
156 [
157 {
158 "type": "value_error",
159 "loc": ("body", "benefit_type"),
160 "msg": "Benefit type must match the existing granted benefit type.",
161 "input": benefit_grant_update.benefit_type,
162 }
163 ]
164 )
166 if isinstance(
167 benefit_grant_update, CustomerBenefitGrantDiscordUpdate
168 ) or isinstance(
169 benefit_grant_update, CustomerBenefitGrantGitHubRepositoryUpdate
170 ):
171 account_id = benefit_grant_update.properties["account_id"]
172 if account_id is not None:
173 platform = benefit_grant_update.get_oauth_platform()
175 customer_repository = CustomerRepository.from_session(session)
176 customer = await customer_repository.get_by_id(
177 benefit_grant.customer_id
178 )
179 assert customer is not None
181 oauth_account = customer.get_oauth_account(account_id, platform)
182 if oauth_account is None:
183 raise PolarRequestValidationError(
184 [
185 {
186 "type": "value_error",
187 "loc": ("body", "properties", "account_id"),
188 "msg": "OAuth account does not exist.",
189 "input": account_id,
190 }
191 ]
192 )
194 benefit_grant.properties = cast(
195 Any,
196 {
197 **benefit_grant.properties,
198 **benefit_grant_update.properties,
199 },
200 )
202 enqueue_job("benefit.update", benefit_grant.id)
204 for attr, value in benefit_grant_update.model_dump(
205 exclude_unset=True, exclude={"properties", "benefit_type"}
206 ).items():
207 setattr(benefit_grant, attr, value)
209 session.add(benefit_grant)
210 return benefit_grant
212 def _get_readable_benefit_grant_statement( 1a
213 self, auth_subject: AuthSubject[Customer]
214 ) -> Select[tuple[BenefitGrant]]:
215 return (
216 select(BenefitGrant)
217 .join(Benefit, onclause=Benefit.id == BenefitGrant.benefit_id)
218 .join(Organization, onclause=Benefit.organization_id == Organization.id)
219 .where(
220 BenefitGrant.deleted_at.is_(None),
221 BenefitGrant.is_revoked.is_(False),
222 BenefitGrant.customer_id == auth_subject.subject.id,
223 )
224 .options(
225 contains_eager(BenefitGrant.benefit).options(
226 contains_eager(Benefit.organization)
227 ),
228 )
229 )
232customer_benefit_grant = CustomerBenefitGrantService(BenefitGrant) 1a