Coverage for polar/benefit/grant/service.py: 25%
210 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import builtins 1a
2from collections.abc import Sequence 1a
3from typing import Any, Literal, TypeVar, Unpack, overload 1a
4from uuid import UUID 1a
6import structlog 1a
7from sqlalchemy import select 1a
8from sqlalchemy.orm import joinedload 1a
10from polar.customer.repository import CustomerRepository 1a
11from polar.event.service import event as event_service 1a
12from polar.event.system import SystemEvent, build_system_event 1a
13from polar.eventstream.service import publish as eventstream_publish 1a
14from polar.exceptions import PolarError 1a
15from polar.kit.pagination import PaginationParams, paginate 1a
16from polar.kit.services import ResourceServiceReader 1a
17from polar.kit.sorting import Sorting 1a
18from polar.logging import Logger 1a
19from polar.models import Benefit, BenefitGrant, Customer, Product 1a
20from polar.models.benefit_grant import BenefitGrantScope 1a
21from polar.models.webhook_endpoint import WebhookEventType 1a
22from polar.postgres import AsyncSession, sql 1a
23from polar.redis import Redis 1a
24from polar.webhook.service import webhook as webhook_service 1a
25from polar.worker import enqueue_job 1a
27from ..registry import get_benefit_strategy 1a
28from ..strategies import ( 1a
29 BenefitActionRequiredError,
30 BenefitGrantProperties,
31 BenefitProperties,
32)
33from .repository import BenefitGrantRepository 1a
34from .scope import scope_to_args 1a
35from .sorting import BenefitGrantSortProperty 1a
37log: Logger = structlog.get_logger() 1a
39BG = TypeVar("BG", bound=BenefitGrant) 1a
42class BenefitGrantError(PolarError): ... 1a
45class EmptyScopeError(BenefitGrantError): 1a
46 def __init__(self) -> None: 1a
47 message = "A scope must be provided to retrieve a benefit grant."
48 super().__init__(message, 500)
51class BenefitGrantService(ResourceServiceReader[BenefitGrant]): 1a
52 @overload 1a
53 async def get( 53 ↛ exitline 53 didn't return from function 'get' because 1a
54 self,
55 session: AsyncSession,
56 id: UUID,
57 allow_deleted: bool = False,
58 loaded: bool = False,
59 *,
60 class_: None = None,
61 options: Sequence[sql.ExecutableOption] | None = None,
62 ) -> BenefitGrant | None: ...
64 @overload 1a
65 async def get( 65 ↛ exitline 65 didn't return from function 'get' because 1a
66 self,
67 session: AsyncSession,
68 id: UUID,
69 allow_deleted: bool = False,
70 loaded: bool = False,
71 *,
72 class_: type[BG] | None = None,
73 options: Sequence[sql.ExecutableOption] | None = None,
74 ) -> BG | None: ...
76 async def get( 1a
77 self,
78 session: AsyncSession,
79 id: UUID,
80 allow_deleted: bool = False,
81 loaded: bool = False,
82 *,
83 class_: Any = None,
84 options: Sequence[sql.ExecutableOption] | None = None,
85 ) -> Any | None:
86 if class_ is None:
87 class_ = BenefitGrant
89 query = select(class_).where(class_.id == id)
90 if not allow_deleted:
91 query = query.where(class_.deleted_at.is_(None))
93 if loaded:
94 query = query.options(
95 joinedload(BenefitGrant.customer),
96 joinedload(BenefitGrant.benefit).joinedload(Benefit.organization),
97 )
99 if options is not None:
100 query = query.options(*options)
102 res = await session.execute(query)
103 return res.unique().scalar_one_or_none()
105 async def list( 1a
106 self,
107 session: AsyncSession,
108 benefit: Benefit,
109 *,
110 is_granted: bool | None = None,
111 customer_id: Sequence[UUID] | None = None,
112 pagination: PaginationParams,
113 ) -> tuple[Sequence[BenefitGrant], int]:
114 statement = (
115 select(BenefitGrant)
116 .where(
117 BenefitGrant.benefit_id == benefit.id,
118 BenefitGrant.deleted_at.is_(None),
119 )
120 .order_by(BenefitGrant.created_at.desc())
121 .options(
122 joinedload(BenefitGrant.customer),
123 joinedload(BenefitGrant.benefit),
124 )
125 )
127 if is_granted is not None:
128 statement = statement.where(BenefitGrant.is_granted.is_(is_granted))
130 if customer_id is not None:
131 statement = statement.where(BenefitGrant.customer_id.in_(customer_id))
133 return await paginate(session, statement, pagination=pagination)
135 async def list_by_organization( 1a
136 self,
137 session: AsyncSession,
138 organization_id: UUID,
139 *,
140 is_granted: bool | None = None,
141 customer_id: Sequence[UUID] | None = None,
142 pagination: PaginationParams,
143 sorting: builtins.list[Sorting[BenefitGrantSortProperty]] = [
144 (BenefitGrantSortProperty.created_at, True)
145 ],
146 ) -> tuple[Sequence[BenefitGrant], int]:
147 repository = BenefitGrantRepository.from_session(session) 1b
148 statement = ( 1b
149 select(BenefitGrant)
150 .join(Benefit, BenefitGrant.benefit_id == Benefit.id)
151 .where(
152 Benefit.organization_id == organization_id,
153 BenefitGrant.deleted_at.is_(None),
154 )
155 .options(
156 joinedload(BenefitGrant.customer),
157 joinedload(BenefitGrant.benefit).joinedload(Benefit.organization),
158 )
159 )
161 if is_granted is not None: 161 ↛ 162line 161 didn't jump to line 162 because the condition on line 161 was never true1b
162 statement = statement.where(BenefitGrant.is_granted.is_(is_granted))
164 if customer_id is not None: 164 ↛ 165line 164 didn't jump to line 165 because the condition on line 164 was never true1b
165 statement = statement.where(BenefitGrant.customer_id.in_(customer_id))
167 statement = repository.apply_sorting(statement, sorting) 1b
169 return await repository.paginate( 1b
170 statement, limit=pagination.limit, page=pagination.page
171 )
173 async def grant_benefit( 1a
174 self,
175 session: AsyncSession,
176 redis: Redis,
177 customer: Customer,
178 benefit: Benefit,
179 *,
180 attempt: int = 1,
181 **scope: Unpack[BenefitGrantScope],
182 ) -> BenefitGrant:
183 log.info(
184 "Granting benefit", benefit_id=str(benefit.id), customer_id=str(customer.id)
185 )
187 repository = BenefitGrantRepository.from_session(session)
188 grant = await repository.get_by_benefit_and_scope(customer, benefit, **scope)
190 if grant is None:
191 grant = BenefitGrant(
192 customer=customer, benefit=benefit, properties={}, **scope
193 )
194 session.add(grant)
195 elif grant.is_granted:
196 return grant
198 previous_properties = grant.properties
199 benefit_strategy = get_benefit_strategy(benefit.type, session, redis)
200 try:
201 properties = await benefit_strategy.grant(
202 benefit,
203 customer,
204 grant.properties,
205 attempt=attempt,
206 )
207 except BenefitActionRequiredError as e:
208 grant.set_grant_failed(e)
209 else:
210 grant.properties = properties
211 grant.set_granted()
213 session.add(grant)
214 await session.flush()
216 await eventstream_publish(
217 "benefit.granted",
218 {"benefit_id": benefit.id, "benefit_type": benefit.type},
219 customer_id=customer.id,
220 )
222 await event_service.create_event(
223 session,
224 build_system_event(
225 SystemEvent.benefit_granted,
226 customer=customer,
227 organization=benefit.organization,
228 metadata={
229 "benefit_id": str(benefit.id),
230 "benefit_grant_id": str(grant.id),
231 "benefit_type": benefit.type,
232 },
233 ),
234 )
236 log.info(
237 "Benefit granted",
238 benefit_id=str(benefit.id),
239 customer_id=str(customer.id),
240 grant_id=str(grant.id),
241 )
243 await self._send_webhook(
244 session,
245 benefit,
246 grant,
247 event_type=WebhookEventType.benefit_grant_created,
248 previous_grant_properties=previous_properties,
249 )
250 return grant
252 async def revoke_benefit( 1a
253 self,
254 session: AsyncSession,
255 redis: Redis,
256 customer: Customer,
257 benefit: Benefit,
258 *,
259 attempt: int = 1,
260 **scope: Unpack[BenefitGrantScope],
261 ) -> BenefitGrant:
262 log.info(
263 "Revoking benefit", benefit_id=str(benefit.id), customer_id=str(customer.id)
264 )
266 repository = BenefitGrantRepository.from_session(session)
267 grant = await repository.get_by_benefit_and_scope(customer, benefit, **scope)
269 if grant is None:
270 grant = BenefitGrant(
271 customer=customer, benefit=benefit, properties={}, **scope
272 )
273 session.add(grant)
274 elif grant.is_revoked:
275 return grant
277 previous_properties = grant.properties
279 benefit_strategy = get_benefit_strategy(benefit.type, session, redis)
280 # Call the revoke logic in two cases:
281 # * If the service requires grants to be revoked individually
282 # * If there is only one grant remaining for this benefit,
283 # so the benefit remains if other grants exist via other purchases
284 other_grants = await repository.list_granted_by_benefit_and_customer(
285 benefit, customer
286 )
287 if benefit_strategy.should_revoke_individually or len(other_grants) < 2:
288 try:
289 properties = await benefit_strategy.revoke(
290 benefit,
291 customer,
292 grant.properties,
293 attempt=attempt,
294 )
295 grant.properties = properties
296 except BenefitActionRequiredError:
297 pass
299 grant.set_revoked()
301 session.add(grant)
302 await session.flush()
304 await eventstream_publish(
305 "benefit.revoked",
306 {"benefit_id": benefit.id, "benefit_type": benefit.type},
307 customer_id=customer.id,
308 )
310 await event_service.create_event(
311 session,
312 build_system_event(
313 SystemEvent.benefit_revoked,
314 customer=customer,
315 organization=benefit.organization,
316 metadata={
317 "benefit_id": str(benefit.id),
318 "benefit_grant_id": str(grant.id),
319 "benefit_type": benefit.type,
320 },
321 ),
322 )
324 log.info(
325 "Benefit revoked",
326 benefit_id=str(benefit.id),
327 customer_id=str(customer.id),
328 grant_id=str(grant.id),
329 )
331 await self._send_webhook(
332 session,
333 benefit,
334 grant,
335 event_type=WebhookEventType.benefit_grant_revoked,
336 previous_grant_properties=previous_properties,
337 )
338 return grant
340 async def enqueue_benefits_grants( 1a
341 self,
342 session: AsyncSession,
343 task: Literal["grant", "revoke"],
344 customer: Customer,
345 product: Product,
346 **scope: Unpack[BenefitGrantScope],
347 ) -> None:
348 # Get granted benefits that are not part of this product.
349 # It happens if the subscription has been upgraded/downgraded.
350 repository = BenefitGrantRepository.from_session(session)
351 outdated_grants = await repository.list_outdated_grants(product, **scope)
353 for benefit in product.benefits:
354 enqueue_job(
355 f"benefit.{task}",
356 customer_id=customer.id,
357 benefit_id=benefit.id,
358 **scope_to_args(scope),
359 )
361 for outdated_grant in outdated_grants:
362 enqueue_job(
363 "benefit.revoke",
364 customer_id=customer.id,
365 benefit_id=outdated_grant.benefit_id,
366 **scope_to_args(scope),
367 )
369 async def enqueue_benefit_grant_updates( 1a
370 self,
371 session: AsyncSession,
372 redis: Redis,
373 benefit: Benefit,
374 previous_properties: BenefitProperties,
375 ) -> None:
376 benefit_strategy = get_benefit_strategy(benefit.type, session, redis)
377 if not await benefit_strategy.requires_update(benefit, previous_properties):
378 return
380 repository = BenefitGrantRepository.from_session(session)
381 grants = await repository.list_granted_by_benefit(benefit)
382 for grant in grants:
383 enqueue_job("benefit.update", benefit_grant_id=grant.id)
385 async def update_benefit_grant( 1a
386 self,
387 session: AsyncSession,
388 redis: Redis,
389 grant: BenefitGrant,
390 *,
391 attempt: int = 1,
392 ) -> BenefitGrant:
393 # Don't update revoked benefits
394 if grant.is_revoked:
395 return grant
397 benefit = grant.benefit
399 customer_repository = CustomerRepository.from_session(session)
400 customer = await customer_repository.get_by_id(grant.customer_id)
401 # Deleted customer, don't update the grant
402 if customer is None:
403 return grant
405 previous_properties = grant.properties
406 benefit_strategy = get_benefit_strategy(benefit.type, session, redis)
407 try:
408 properties = await benefit_strategy.grant(
409 benefit,
410 customer,
411 grant.properties,
412 update=True,
413 attempt=attempt,
414 )
415 except BenefitActionRequiredError as e:
416 grant.set_grant_failed(e)
417 else:
418 grant.properties = properties
419 grant.set_granted()
421 session.add(grant)
423 await event_service.create_event(
424 session,
425 build_system_event(
426 SystemEvent.benefit_updated,
427 customer=customer,
428 organization=benefit.organization,
429 metadata={
430 "benefit_id": str(benefit.id),
431 "benefit_grant_id": str(grant.id),
432 "benefit_type": benefit.type,
433 },
434 ),
435 )
437 await self._send_webhook(
438 session,
439 benefit,
440 grant,
441 event_type=WebhookEventType.benefit_grant_updated,
442 previous_grant_properties=previous_properties,
443 )
444 return grant
446 async def enqueue_benefit_grant_cycles( 1a
447 self,
448 session: AsyncSession,
449 redis: Redis,
450 **scope: Unpack[BenefitGrantScope],
451 ) -> None:
452 repository = BenefitGrantRepository.from_session(session)
453 grants = await repository.list_granted_by_scope(**scope)
454 for grant in grants:
455 enqueue_job("benefit.cycle", benefit_grant_id=grant.id)
457 async def cycle_benefit_grant( 1a
458 self,
459 session: AsyncSession,
460 redis: Redis,
461 grant: BenefitGrant,
462 *,
463 attempt: int = 1,
464 ) -> BenefitGrant:
465 # Don't cycle revoked benefits
466 if grant.is_revoked:
467 return grant
469 benefit = grant.benefit
471 customer_repository = CustomerRepository.from_session(session)
472 customer = await customer_repository.get_by_id(grant.customer_id)
473 assert customer is not None
475 previous_properties = grant.properties
476 benefit_strategy = get_benefit_strategy(benefit.type, session, redis)
477 try:
478 properties = await benefit_strategy.cycle(
479 benefit,
480 customer,
481 grant.properties,
482 attempt=attempt,
483 )
484 except BenefitActionRequiredError as e:
485 grant.set_grant_failed(e)
486 else:
487 grant.properties = properties
489 grant.set_modified_at()
490 session.add(grant)
492 await event_service.create_event(
493 session,
494 build_system_event(
495 SystemEvent.benefit_cycled,
496 customer=customer,
497 organization=benefit.organization,
498 metadata={
499 "benefit_id": str(benefit.id),
500 "benefit_grant_id": str(grant.id),
501 "benefit_type": benefit.type,
502 },
503 ),
504 )
506 await self._send_webhook(
507 session,
508 benefit,
509 grant,
510 event_type=WebhookEventType.benefit_grant_cycled,
511 previous_grant_properties=previous_properties,
512 )
513 return grant
515 async def enqueue_benefit_grant_deletions( 1a
516 self, session: AsyncSession, benefit: Benefit
517 ) -> None:
518 repository = BenefitGrantRepository.from_session(session)
519 grants = await repository.list_granted_by_benefit(benefit)
520 for grant in grants:
521 enqueue_job("benefit.delete_grant", benefit_grant_id=grant.id)
523 async def enqueue_customer_grant_deletions( 1a
524 self, session: AsyncSession, customer: Customer
525 ) -> None:
526 repository = BenefitGrantRepository.from_session(session)
527 grants = await repository.list_granted_by_customer(customer.id)
528 for grant in grants:
529 enqueue_job("benefit.delete_grant", benefit_grant_id=grant.id)
531 async def delete_benefit_grant( 1a
532 self,
533 session: AsyncSession,
534 redis: Redis,
535 grant: BenefitGrant,
536 *,
537 attempt: int = 1,
538 ) -> BenefitGrant:
539 # Already revoked, nothing to do
540 if grant.is_revoked:
541 return grant
543 await session.refresh(grant, {"benefit"})
544 benefit = grant.benefit
546 customer_repository = CustomerRepository.from_session(session)
547 customer = await customer_repository.get_by_id(
548 grant.customer_id, include_deleted=True
549 )
550 assert customer is not None
552 previous_properties = grant.properties
553 benefit_strategy = get_benefit_strategy(benefit.type, session, redis)
554 properties = await benefit_strategy.revoke(
555 benefit,
556 customer,
557 grant.properties,
558 attempt=attempt,
559 )
561 grant.properties = properties
562 grant.set_revoked()
564 session.add(grant)
565 await self._send_webhook(
566 session,
567 benefit,
568 grant,
569 event_type=WebhookEventType.benefit_grant_revoked,
570 previous_grant_properties=previous_properties,
571 )
572 return grant
574 async def _send_webhook( 1a
575 self,
576 session: AsyncSession,
577 benefit: Benefit,
578 grant: BenefitGrant,
579 event_type: Literal[
580 WebhookEventType.benefit_grant_created,
581 WebhookEventType.benefit_grant_updated,
582 WebhookEventType.benefit_grant_cycled,
583 WebhookEventType.benefit_grant_revoked,
584 ],
585 previous_grant_properties: BenefitGrantProperties,
586 ) -> None:
587 loaded = await self.get(session, grant.id, loaded=True)
588 assert loaded is not None
589 loaded.previous_properties = previous_grant_properties
590 await webhook_service.send(session, benefit.organization, event_type, loaded)
591 enqueue_job(
592 "customer.webhook",
593 WebhookEventType.customer_state_changed,
594 grant.customer_id,
595 )
598benefit_grant = BenefitGrantService(BenefitGrant) 1a