Coverage for polar/customer_portal/service/benefit_grant.py: 26%

77 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import uuid 1a

2from collections.abc import Sequence 1a

3from enum import StrEnum 1a

4from typing import Any, cast 1a

5 

6from sqlalchemy import Select, UnaryExpression, and_, asc, desc, or_, select 1a

7from sqlalchemy.orm import contains_eager, joinedload 1a

8 

9from polar.auth.models import AuthSubject 1a

10from polar.customer.repository import CustomerRepository 1a

11from polar.exceptions import NotPermitted, PolarRequestValidationError 1a

12from polar.kit.db.postgres import AsyncSession 1a

13from polar.kit.pagination import PaginationParams, paginate 1a

14from polar.kit.services import ResourceServiceReader 1a

15from polar.kit.sorting import Sorting 1a

16from polar.models import ( 1a

17 Benefit, 

18 BenefitGrant, 

19 Customer, 

20 Order, 

21 Organization, 

22 Product, 

23 ProductBenefit, 

24 Subscription, 

25) 

26from polar.models.benefit import BenefitType 1a

27from polar.worker import enqueue_job 1a

28 

29from ..schemas.benefit_grant import ( 1a

30 CustomerBenefitGrantDiscordUpdate, 

31 CustomerBenefitGrantGitHubRepositoryUpdate, 

32 CustomerBenefitGrantUpdate, 

33) 

34 

35 

36class CustomerBenefitGrantSortProperty(StrEnum): 1a

37 granted_at = "granted_at" 1a

38 type = "type" 1a

39 organization = "organization" 1a

40 product_benefit = "product_benefit" 1a

41 

42 

43class CustomerBenefitGrantService(ResourceServiceReader[BenefitGrant]): 1a

44 async def list( 1a

45 self, 

46 session: AsyncSession, 

47 auth_subject: AuthSubject[Customer], 

48 *, 

49 type: Sequence[BenefitType] | None = None, 

50 benefit_id: Sequence[uuid.UUID] | None = None, 

51 checkout_id: Sequence[uuid.UUID] | None = None, 

52 order_id: Sequence[uuid.UUID] | None = None, 

53 subscription_id: Sequence[uuid.UUID] | None = None, 

54 pagination: PaginationParams, 

55 sorting: list[Sorting[CustomerBenefitGrantSortProperty]] = [ 

56 (CustomerBenefitGrantSortProperty.product_benefit, False), 

57 (CustomerBenefitGrantSortProperty.granted_at, True), 

58 ], 

59 ) -> tuple[Sequence[BenefitGrant], int]: 

60 statement = self._get_readable_benefit_grant_statement(auth_subject).options( 

61 joinedload(BenefitGrant.customer) 

62 ) 

63 

64 if type is not None: 

65 statement = statement.where(Benefit.type.in_(type)) 

66 

67 if benefit_id is not None: 

68 statement = statement.where(BenefitGrant.benefit_id.in_(benefit_id)) 

69 

70 statement = ( 

71 statement.join( 

72 Subscription, 

73 onclause=Subscription.id == BenefitGrant.subscription_id, 

74 isouter=True, 

75 ) 

76 .join( 

77 Order, 

78 onclause=Order.id == BenefitGrant.order_id, 

79 isouter=True, 

80 ) 

81 .join( 

82 Product, 

83 onclause=or_( 

84 Product.id == Subscription.product_id, 

85 Product.id == Order.product_id, 

86 ), 

87 isouter=True, 

88 ) 

89 .join( 

90 ProductBenefit, 

91 onclause=and_( 

92 ProductBenefit.product_id == Product.id, 

93 ProductBenefit.benefit_id == BenefitGrant.benefit_id, 

94 ), 

95 isouter=True, 

96 ) 

97 ) 

98 

99 if checkout_id is not None: 

100 statement = statement.where( 

101 or_( 

102 Subscription.checkout_id.in_(checkout_id), 

103 Order.checkout_id.in_(checkout_id), 

104 ) 

105 ) 

106 

107 if order_id is not None: 

108 statement = statement.where(BenefitGrant.order_id.in_(order_id)) 

109 

110 if subscription_id is not None: 

111 statement = statement.where( 

112 BenefitGrant.subscription_id.in_(subscription_id) 

113 ) 

114 

115 order_by_clauses: list[UnaryExpression[Any]] = [] 

116 for criterion, is_desc in sorting: 

117 clause_function = desc if is_desc else asc 

118 if criterion == CustomerBenefitGrantSortProperty.granted_at: 

119 order_by_clauses.append(clause_function(BenefitGrant.granted_at)) 

120 elif criterion == CustomerBenefitGrantSortProperty.type: 

121 order_by_clauses.append(clause_function(Benefit.type)) 

122 elif criterion == CustomerBenefitGrantSortProperty.organization: 

123 order_by_clauses.append(clause_function(Organization.slug)) 

124 elif criterion == CustomerBenefitGrantSortProperty.product_benefit: 

125 order_by_clauses.append(clause_function(ProductBenefit.order)) 

126 statement = statement.order_by(*order_by_clauses) 

127 

128 return await paginate(session, statement, pagination=pagination) 

129 

130 async def get_by_id( 1a

131 self, 

132 session: AsyncSession, 

133 auth_subject: AuthSubject[Customer], 

134 id: uuid.UUID, 

135 ) -> BenefitGrant | None: 

136 statement = ( 

137 self._get_readable_benefit_grant_statement(auth_subject) 

138 .where(BenefitGrant.id == id) 

139 .options(joinedload(BenefitGrant.customer)) 

140 ) 

141 

142 result = await session.execute(statement) 

143 return result.scalar_one_or_none() 

144 

145 async def update( 1a

146 self, 

147 session: AsyncSession, 

148 benefit_grant: BenefitGrant, 

149 benefit_grant_update: CustomerBenefitGrantUpdate, 

150 ) -> BenefitGrant: 

151 if benefit_grant.is_revoked: 

152 raise NotPermitted("Cannot update a revoked benefit grant.") 

153 

154 if benefit_grant_update.benefit_type != benefit_grant.benefit.type: 

155 raise PolarRequestValidationError( 

156 [ 

157 { 

158 "type": "value_error", 

159 "loc": ("body", "benefit_type"), 

160 "msg": "Benefit type must match the existing granted benefit type.", 

161 "input": benefit_grant_update.benefit_type, 

162 } 

163 ] 

164 ) 

165 

166 if isinstance( 

167 benefit_grant_update, CustomerBenefitGrantDiscordUpdate 

168 ) or isinstance( 

169 benefit_grant_update, CustomerBenefitGrantGitHubRepositoryUpdate 

170 ): 

171 account_id = benefit_grant_update.properties["account_id"] 

172 if account_id is not None: 

173 platform = benefit_grant_update.get_oauth_platform() 

174 

175 customer_repository = CustomerRepository.from_session(session) 

176 customer = await customer_repository.get_by_id( 

177 benefit_grant.customer_id 

178 ) 

179 assert customer is not None 

180 

181 oauth_account = customer.get_oauth_account(account_id, platform) 

182 if oauth_account is None: 

183 raise PolarRequestValidationError( 

184 [ 

185 { 

186 "type": "value_error", 

187 "loc": ("body", "properties", "account_id"), 

188 "msg": "OAuth account does not exist.", 

189 "input": account_id, 

190 } 

191 ] 

192 ) 

193 

194 benefit_grant.properties = cast( 

195 Any, 

196 { 

197 **benefit_grant.properties, 

198 **benefit_grant_update.properties, 

199 }, 

200 ) 

201 

202 enqueue_job("benefit.update", benefit_grant.id) 

203 

204 for attr, value in benefit_grant_update.model_dump( 

205 exclude_unset=True, exclude={"properties", "benefit_type"} 

206 ).items(): 

207 setattr(benefit_grant, attr, value) 

208 

209 session.add(benefit_grant) 

210 return benefit_grant 

211 

212 def _get_readable_benefit_grant_statement( 1a

213 self, auth_subject: AuthSubject[Customer] 

214 ) -> Select[tuple[BenefitGrant]]: 

215 return ( 

216 select(BenefitGrant) 

217 .join(Benefit, onclause=Benefit.id == BenefitGrant.benefit_id) 

218 .join(Organization, onclause=Benefit.organization_id == Organization.id) 

219 .where( 

220 BenefitGrant.deleted_at.is_(None), 

221 BenefitGrant.is_revoked.is_(False), 

222 BenefitGrant.customer_id == auth_subject.subject.id, 

223 ) 

224 .options( 

225 contains_eager(BenefitGrant.benefit).options( 

226 contains_eager(Benefit.organization) 

227 ), 

228 ) 

229 ) 

230 

231 

232customer_benefit_grant = CustomerBenefitGrantService(BenefitGrant) 1a