Coverage for polar/benefit/grant/service.py: 25%

210 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import builtins 1a

2from collections.abc import Sequence 1a

3from typing import Any, Literal, TypeVar, Unpack, overload 1a

4from uuid import UUID 1a

5 

6import structlog 1a

7from sqlalchemy import select 1a

8from sqlalchemy.orm import joinedload 1a

9 

10from polar.customer.repository import CustomerRepository 1a

11from polar.event.service import event as event_service 1a

12from polar.event.system import SystemEvent, build_system_event 1a

13from polar.eventstream.service import publish as eventstream_publish 1a

14from polar.exceptions import PolarError 1a

15from polar.kit.pagination import PaginationParams, paginate 1a

16from polar.kit.services import ResourceServiceReader 1a

17from polar.kit.sorting import Sorting 1a

18from polar.logging import Logger 1a

19from polar.models import Benefit, BenefitGrant, Customer, Product 1a

20from polar.models.benefit_grant import BenefitGrantScope 1a

21from polar.models.webhook_endpoint import WebhookEventType 1a

22from polar.postgres import AsyncSession, sql 1a

23from polar.redis import Redis 1a

24from polar.webhook.service import webhook as webhook_service 1a

25from polar.worker import enqueue_job 1a

26 

27from ..registry import get_benefit_strategy 1a

28from ..strategies import ( 1a

29 BenefitActionRequiredError, 

30 BenefitGrantProperties, 

31 BenefitProperties, 

32) 

33from .repository import BenefitGrantRepository 1a

34from .scope import scope_to_args 1a

35from .sorting import BenefitGrantSortProperty 1a

36 

37log: Logger = structlog.get_logger() 1a

38 

39BG = TypeVar("BG", bound=BenefitGrant) 1a

40 

41 

42class BenefitGrantError(PolarError): ... 1a

43 

44 

45class EmptyScopeError(BenefitGrantError): 1a

46 def __init__(self) -> None: 1a

47 message = "A scope must be provided to retrieve a benefit grant." 

48 super().__init__(message, 500) 

49 

50 

51class BenefitGrantService(ResourceServiceReader[BenefitGrant]): 1a

52 @overload 1a

53 async def get( 53 ↛ exitline 53 didn't return from function 'get' because 1a

54 self, 

55 session: AsyncSession, 

56 id: UUID, 

57 allow_deleted: bool = False, 

58 loaded: bool = False, 

59 *, 

60 class_: None = None, 

61 options: Sequence[sql.ExecutableOption] | None = None, 

62 ) -> BenefitGrant | None: ... 

63 

64 @overload 1a

65 async def get( 65 ↛ exitline 65 didn't return from function 'get' because 1a

66 self, 

67 session: AsyncSession, 

68 id: UUID, 

69 allow_deleted: bool = False, 

70 loaded: bool = False, 

71 *, 

72 class_: type[BG] | None = None, 

73 options: Sequence[sql.ExecutableOption] | None = None, 

74 ) -> BG | None: ... 

75 

76 async def get( 1a

77 self, 

78 session: AsyncSession, 

79 id: UUID, 

80 allow_deleted: bool = False, 

81 loaded: bool = False, 

82 *, 

83 class_: Any = None, 

84 options: Sequence[sql.ExecutableOption] | None = None, 

85 ) -> Any | None: 

86 if class_ is None: 

87 class_ = BenefitGrant 

88 

89 query = select(class_).where(class_.id == id) 

90 if not allow_deleted: 

91 query = query.where(class_.deleted_at.is_(None)) 

92 

93 if loaded: 

94 query = query.options( 

95 joinedload(BenefitGrant.customer), 

96 joinedload(BenefitGrant.benefit).joinedload(Benefit.organization), 

97 ) 

98 

99 if options is not None: 

100 query = query.options(*options) 

101 

102 res = await session.execute(query) 

103 return res.unique().scalar_one_or_none() 

104 

105 async def list( 1a

106 self, 

107 session: AsyncSession, 

108 benefit: Benefit, 

109 *, 

110 is_granted: bool | None = None, 

111 customer_id: Sequence[UUID] | None = None, 

112 pagination: PaginationParams, 

113 ) -> tuple[Sequence[BenefitGrant], int]: 

114 statement = ( 

115 select(BenefitGrant) 

116 .where( 

117 BenefitGrant.benefit_id == benefit.id, 

118 BenefitGrant.deleted_at.is_(None), 

119 ) 

120 .order_by(BenefitGrant.created_at.desc()) 

121 .options( 

122 joinedload(BenefitGrant.customer), 

123 joinedload(BenefitGrant.benefit), 

124 ) 

125 ) 

126 

127 if is_granted is not None: 

128 statement = statement.where(BenefitGrant.is_granted.is_(is_granted)) 

129 

130 if customer_id is not None: 

131 statement = statement.where(BenefitGrant.customer_id.in_(customer_id)) 

132 

133 return await paginate(session, statement, pagination=pagination) 

134 

135 async def list_by_organization( 1a

136 self, 

137 session: AsyncSession, 

138 organization_id: UUID, 

139 *, 

140 is_granted: bool | None = None, 

141 customer_id: Sequence[UUID] | None = None, 

142 pagination: PaginationParams, 

143 sorting: builtins.list[Sorting[BenefitGrantSortProperty]] = [ 

144 (BenefitGrantSortProperty.created_at, True) 

145 ], 

146 ) -> tuple[Sequence[BenefitGrant], int]: 

147 repository = BenefitGrantRepository.from_session(session) 1b

148 statement = ( 1b

149 select(BenefitGrant) 

150 .join(Benefit, BenefitGrant.benefit_id == Benefit.id) 

151 .where( 

152 Benefit.organization_id == organization_id, 

153 BenefitGrant.deleted_at.is_(None), 

154 ) 

155 .options( 

156 joinedload(BenefitGrant.customer), 

157 joinedload(BenefitGrant.benefit).joinedload(Benefit.organization), 

158 ) 

159 ) 

160 

161 if is_granted is not None: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true1b

162 statement = statement.where(BenefitGrant.is_granted.is_(is_granted)) 

163 

164 if customer_id is not None: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true1b

165 statement = statement.where(BenefitGrant.customer_id.in_(customer_id)) 

166 

167 statement = repository.apply_sorting(statement, sorting) 1b

168 

169 return await repository.paginate( 1b

170 statement, limit=pagination.limit, page=pagination.page 

171 ) 

172 

173 async def grant_benefit( 1a

174 self, 

175 session: AsyncSession, 

176 redis: Redis, 

177 customer: Customer, 

178 benefit: Benefit, 

179 *, 

180 attempt: int = 1, 

181 **scope: Unpack[BenefitGrantScope], 

182 ) -> BenefitGrant: 

183 log.info( 

184 "Granting benefit", benefit_id=str(benefit.id), customer_id=str(customer.id) 

185 ) 

186 

187 repository = BenefitGrantRepository.from_session(session) 

188 grant = await repository.get_by_benefit_and_scope(customer, benefit, **scope) 

189 

190 if grant is None: 

191 grant = BenefitGrant( 

192 customer=customer, benefit=benefit, properties={}, **scope 

193 ) 

194 session.add(grant) 

195 elif grant.is_granted: 

196 return grant 

197 

198 previous_properties = grant.properties 

199 benefit_strategy = get_benefit_strategy(benefit.type, session, redis) 

200 try: 

201 properties = await benefit_strategy.grant( 

202 benefit, 

203 customer, 

204 grant.properties, 

205 attempt=attempt, 

206 ) 

207 except BenefitActionRequiredError as e: 

208 grant.set_grant_failed(e) 

209 else: 

210 grant.properties = properties 

211 grant.set_granted() 

212 

213 session.add(grant) 

214 await session.flush() 

215 

216 await eventstream_publish( 

217 "benefit.granted", 

218 {"benefit_id": benefit.id, "benefit_type": benefit.type}, 

219 customer_id=customer.id, 

220 ) 

221 

222 await event_service.create_event( 

223 session, 

224 build_system_event( 

225 SystemEvent.benefit_granted, 

226 customer=customer, 

227 organization=benefit.organization, 

228 metadata={ 

229 "benefit_id": str(benefit.id), 

230 "benefit_grant_id": str(grant.id), 

231 "benefit_type": benefit.type, 

232 }, 

233 ), 

234 ) 

235 

236 log.info( 

237 "Benefit granted", 

238 benefit_id=str(benefit.id), 

239 customer_id=str(customer.id), 

240 grant_id=str(grant.id), 

241 ) 

242 

243 await self._send_webhook( 

244 session, 

245 benefit, 

246 grant, 

247 event_type=WebhookEventType.benefit_grant_created, 

248 previous_grant_properties=previous_properties, 

249 ) 

250 return grant 

251 

252 async def revoke_benefit( 1a

253 self, 

254 session: AsyncSession, 

255 redis: Redis, 

256 customer: Customer, 

257 benefit: Benefit, 

258 *, 

259 attempt: int = 1, 

260 **scope: Unpack[BenefitGrantScope], 

261 ) -> BenefitGrant: 

262 log.info( 

263 "Revoking benefit", benefit_id=str(benefit.id), customer_id=str(customer.id) 

264 ) 

265 

266 repository = BenefitGrantRepository.from_session(session) 

267 grant = await repository.get_by_benefit_and_scope(customer, benefit, **scope) 

268 

269 if grant is None: 

270 grant = BenefitGrant( 

271 customer=customer, benefit=benefit, properties={}, **scope 

272 ) 

273 session.add(grant) 

274 elif grant.is_revoked: 

275 return grant 

276 

277 previous_properties = grant.properties 

278 

279 benefit_strategy = get_benefit_strategy(benefit.type, session, redis) 

280 # Call the revoke logic in two cases: 

281 # * If the service requires grants to be revoked individually 

282 # * If there is only one grant remaining for this benefit, 

283 # so the benefit remains if other grants exist via other purchases 

284 other_grants = await repository.list_granted_by_benefit_and_customer( 

285 benefit, customer 

286 ) 

287 if benefit_strategy.should_revoke_individually or len(other_grants) < 2: 

288 try: 

289 properties = await benefit_strategy.revoke( 

290 benefit, 

291 customer, 

292 grant.properties, 

293 attempt=attempt, 

294 ) 

295 grant.properties = properties 

296 except BenefitActionRequiredError: 

297 pass 

298 

299 grant.set_revoked() 

300 

301 session.add(grant) 

302 await session.flush() 

303 

304 await eventstream_publish( 

305 "benefit.revoked", 

306 {"benefit_id": benefit.id, "benefit_type": benefit.type}, 

307 customer_id=customer.id, 

308 ) 

309 

310 await event_service.create_event( 

311 session, 

312 build_system_event( 

313 SystemEvent.benefit_revoked, 

314 customer=customer, 

315 organization=benefit.organization, 

316 metadata={ 

317 "benefit_id": str(benefit.id), 

318 "benefit_grant_id": str(grant.id), 

319 "benefit_type": benefit.type, 

320 }, 

321 ), 

322 ) 

323 

324 log.info( 

325 "Benefit revoked", 

326 benefit_id=str(benefit.id), 

327 customer_id=str(customer.id), 

328 grant_id=str(grant.id), 

329 ) 

330 

331 await self._send_webhook( 

332 session, 

333 benefit, 

334 grant, 

335 event_type=WebhookEventType.benefit_grant_revoked, 

336 previous_grant_properties=previous_properties, 

337 ) 

338 return grant 

339 

340 async def enqueue_benefits_grants( 1a

341 self, 

342 session: AsyncSession, 

343 task: Literal["grant", "revoke"], 

344 customer: Customer, 

345 product: Product, 

346 **scope: Unpack[BenefitGrantScope], 

347 ) -> None: 

348 # Get granted benefits that are not part of this product. 

349 # It happens if the subscription has been upgraded/downgraded. 

350 repository = BenefitGrantRepository.from_session(session) 

351 outdated_grants = await repository.list_outdated_grants(product, **scope) 

352 

353 for benefit in product.benefits: 

354 enqueue_job( 

355 f"benefit.{task}", 

356 customer_id=customer.id, 

357 benefit_id=benefit.id, 

358 **scope_to_args(scope), 

359 ) 

360 

361 for outdated_grant in outdated_grants: 

362 enqueue_job( 

363 "benefit.revoke", 

364 customer_id=customer.id, 

365 benefit_id=outdated_grant.benefit_id, 

366 **scope_to_args(scope), 

367 ) 

368 

369 async def enqueue_benefit_grant_updates( 1a

370 self, 

371 session: AsyncSession, 

372 redis: Redis, 

373 benefit: Benefit, 

374 previous_properties: BenefitProperties, 

375 ) -> None: 

376 benefit_strategy = get_benefit_strategy(benefit.type, session, redis) 

377 if not await benefit_strategy.requires_update(benefit, previous_properties): 

378 return 

379 

380 repository = BenefitGrantRepository.from_session(session) 

381 grants = await repository.list_granted_by_benefit(benefit) 

382 for grant in grants: 

383 enqueue_job("benefit.update", benefit_grant_id=grant.id) 

384 

385 async def update_benefit_grant( 1a

386 self, 

387 session: AsyncSession, 

388 redis: Redis, 

389 grant: BenefitGrant, 

390 *, 

391 attempt: int = 1, 

392 ) -> BenefitGrant: 

393 # Don't update revoked benefits 

394 if grant.is_revoked: 

395 return grant 

396 

397 benefit = grant.benefit 

398 

399 customer_repository = CustomerRepository.from_session(session) 

400 customer = await customer_repository.get_by_id(grant.customer_id) 

401 # Deleted customer, don't update the grant 

402 if customer is None: 

403 return grant 

404 

405 previous_properties = grant.properties 

406 benefit_strategy = get_benefit_strategy(benefit.type, session, redis) 

407 try: 

408 properties = await benefit_strategy.grant( 

409 benefit, 

410 customer, 

411 grant.properties, 

412 update=True, 

413 attempt=attempt, 

414 ) 

415 except BenefitActionRequiredError as e: 

416 grant.set_grant_failed(e) 

417 else: 

418 grant.properties = properties 

419 grant.set_granted() 

420 

421 session.add(grant) 

422 

423 await event_service.create_event( 

424 session, 

425 build_system_event( 

426 SystemEvent.benefit_updated, 

427 customer=customer, 

428 organization=benefit.organization, 

429 metadata={ 

430 "benefit_id": str(benefit.id), 

431 "benefit_grant_id": str(grant.id), 

432 "benefit_type": benefit.type, 

433 }, 

434 ), 

435 ) 

436 

437 await self._send_webhook( 

438 session, 

439 benefit, 

440 grant, 

441 event_type=WebhookEventType.benefit_grant_updated, 

442 previous_grant_properties=previous_properties, 

443 ) 

444 return grant 

445 

446 async def enqueue_benefit_grant_cycles( 1a

447 self, 

448 session: AsyncSession, 

449 redis: Redis, 

450 **scope: Unpack[BenefitGrantScope], 

451 ) -> None: 

452 repository = BenefitGrantRepository.from_session(session) 

453 grants = await repository.list_granted_by_scope(**scope) 

454 for grant in grants: 

455 enqueue_job("benefit.cycle", benefit_grant_id=grant.id) 

456 

457 async def cycle_benefit_grant( 1a

458 self, 

459 session: AsyncSession, 

460 redis: Redis, 

461 grant: BenefitGrant, 

462 *, 

463 attempt: int = 1, 

464 ) -> BenefitGrant: 

465 # Don't cycle revoked benefits 

466 if grant.is_revoked: 

467 return grant 

468 

469 benefit = grant.benefit 

470 

471 customer_repository = CustomerRepository.from_session(session) 

472 customer = await customer_repository.get_by_id(grant.customer_id) 

473 assert customer is not None 

474 

475 previous_properties = grant.properties 

476 benefit_strategy = get_benefit_strategy(benefit.type, session, redis) 

477 try: 

478 properties = await benefit_strategy.cycle( 

479 benefit, 

480 customer, 

481 grant.properties, 

482 attempt=attempt, 

483 ) 

484 except BenefitActionRequiredError as e: 

485 grant.set_grant_failed(e) 

486 else: 

487 grant.properties = properties 

488 

489 grant.set_modified_at() 

490 session.add(grant) 

491 

492 await event_service.create_event( 

493 session, 

494 build_system_event( 

495 SystemEvent.benefit_cycled, 

496 customer=customer, 

497 organization=benefit.organization, 

498 metadata={ 

499 "benefit_id": str(benefit.id), 

500 "benefit_grant_id": str(grant.id), 

501 "benefit_type": benefit.type, 

502 }, 

503 ), 

504 ) 

505 

506 await self._send_webhook( 

507 session, 

508 benefit, 

509 grant, 

510 event_type=WebhookEventType.benefit_grant_cycled, 

511 previous_grant_properties=previous_properties, 

512 ) 

513 return grant 

514 

515 async def enqueue_benefit_grant_deletions( 1a

516 self, session: AsyncSession, benefit: Benefit 

517 ) -> None: 

518 repository = BenefitGrantRepository.from_session(session) 

519 grants = await repository.list_granted_by_benefit(benefit) 

520 for grant in grants: 

521 enqueue_job("benefit.delete_grant", benefit_grant_id=grant.id) 

522 

523 async def enqueue_customer_grant_deletions( 1a

524 self, session: AsyncSession, customer: Customer 

525 ) -> None: 

526 repository = BenefitGrantRepository.from_session(session) 

527 grants = await repository.list_granted_by_customer(customer.id) 

528 for grant in grants: 

529 enqueue_job("benefit.delete_grant", benefit_grant_id=grant.id) 

530 

531 async def delete_benefit_grant( 1a

532 self, 

533 session: AsyncSession, 

534 redis: Redis, 

535 grant: BenefitGrant, 

536 *, 

537 attempt: int = 1, 

538 ) -> BenefitGrant: 

539 # Already revoked, nothing to do 

540 if grant.is_revoked: 

541 return grant 

542 

543 await session.refresh(grant, {"benefit"}) 

544 benefit = grant.benefit 

545 

546 customer_repository = CustomerRepository.from_session(session) 

547 customer = await customer_repository.get_by_id( 

548 grant.customer_id, include_deleted=True 

549 ) 

550 assert customer is not None 

551 

552 previous_properties = grant.properties 

553 benefit_strategy = get_benefit_strategy(benefit.type, session, redis) 

554 properties = await benefit_strategy.revoke( 

555 benefit, 

556 customer, 

557 grant.properties, 

558 attempt=attempt, 

559 ) 

560 

561 grant.properties = properties 

562 grant.set_revoked() 

563 

564 session.add(grant) 

565 await self._send_webhook( 

566 session, 

567 benefit, 

568 grant, 

569 event_type=WebhookEventType.benefit_grant_revoked, 

570 previous_grant_properties=previous_properties, 

571 ) 

572 return grant 

573 

574 async def _send_webhook( 1a

575 self, 

576 session: AsyncSession, 

577 benefit: Benefit, 

578 grant: BenefitGrant, 

579 event_type: Literal[ 

580 WebhookEventType.benefit_grant_created, 

581 WebhookEventType.benefit_grant_updated, 

582 WebhookEventType.benefit_grant_cycled, 

583 WebhookEventType.benefit_grant_revoked, 

584 ], 

585 previous_grant_properties: BenefitGrantProperties, 

586 ) -> None: 

587 loaded = await self.get(session, grant.id, loaded=True) 

588 assert loaded is not None 

589 loaded.previous_properties = previous_grant_properties 

590 await webhook_service.send(session, benefit.organization, event_type, loaded) 

591 enqueue_job( 

592 "customer.webhook", 

593 WebhookEventType.customer_state_changed, 

594 grant.customer_id, 

595 ) 

596 

597 

598benefit_grant = BenefitGrantService(BenefitGrant) 1a