Coverage for polar/benefit/tasks.py: 28%
142 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import uuid 1a
2from typing import Literal, Unpack 1a
4import structlog 1a
5from dramatiq import Retry 1a
7from polar.benefit.repository import BenefitRepository 1a
8from polar.customer.repository import CustomerRepository 1a
9from polar.exceptions import PolarTaskError 1a
10from polar.logging import Logger 1a
11from polar.models.benefit_grant import BenefitGrantScopeArgs 1a
12from polar.product.repository import ProductRepository 1a
13from polar.worker import ( 1a
14 AsyncSessionMaker,
15 RedisMiddleware,
16 TaskPriority,
17 actor,
18 get_retries,
19)
21from .grant.scope import resolve_scope 1a
22from .grant.service import benefit_grant as benefit_grant_service 1a
23from .strategies import BenefitRetriableError 1a
25log: Logger = structlog.get_logger() 1a
28class BenefitTaskError(PolarTaskError): ... 1a
31class CustomerDoesNotExist(BenefitTaskError): 1a
32 def __init__(self, customer_id: uuid.UUID) -> None: 1a
33 self.customer_id = customer_id
34 message = f"The customer with id {customer_id} does not exist."
35 super().__init__(message)
38class ProductDoesNotExist(BenefitTaskError): 1a
39 def __init__(self, product_id: uuid.UUID) -> None: 1a
40 self.user_id = product_id
41 message = f"The product with id {product_id} does not exist."
42 super().__init__(message)
45class BenefitDoesNotExist(BenefitTaskError): 1a
46 def __init__(self, benefit_id: uuid.UUID) -> None: 1a
47 self.benefit_id = benefit_id
48 message = f"The benefit with id {benefit_id} does not exist."
49 super().__init__(message)
52class BenefitGrantDoesNotExist(BenefitTaskError): 1a
53 def __init__(self, benefit_grant_id: uuid.UUID) -> None: 1a
54 self.benefit_grant_id = benefit_grant_id
55 message = f"The benefit grant with id {benefit_grant_id} does not exist."
56 super().__init__(message)
59class OrganizationDoesNotExist(BenefitTaskError): 1a
60 def __init__(self, organization_id: uuid.UUID) -> None: 1a
61 self.organization_id = organization_id
62 message = f"The organization with id {organization_id} does not exist."
63 super().__init__(message)
66@actor(actor_name="benefit.enqueue_benefits_grants", priority=TaskPriority.MEDIUM) 1a
67async def enqueue_benefits_grants( 1a
68 task: Literal["grant", "revoke"],
69 customer_id: uuid.UUID,
70 product_id: uuid.UUID,
71 **scope: Unpack[BenefitGrantScopeArgs],
72) -> None:
73 async with AsyncSessionMaker() as session:
74 customer_repository = CustomerRepository.from_session(session)
75 customer = await customer_repository.get_by_id(
76 customer_id,
77 # Allow deleted customers to be processed for revocation tasks
78 include_deleted=task == "revoke",
79 )
80 if customer is None:
81 raise CustomerDoesNotExist(customer_id)
83 product_repository = ProductRepository.from_session(session)
84 product = await product_repository.get_by_id(product_id)
85 if product is None:
86 raise ProductDoesNotExist(product_id)
88 resolved_scope = await resolve_scope(session, scope)
90 await benefit_grant_service.enqueue_benefits_grants(
91 session, task, customer, product, **resolved_scope
92 )
95@actor(actor_name="benefit.grant", priority=TaskPriority.MEDIUM) 1a
96async def benefit_grant( 1a
97 customer_id: uuid.UUID,
98 benefit_id: uuid.UUID,
99 **scope: Unpack[BenefitGrantScopeArgs],
100) -> None:
101 async with AsyncSessionMaker() as session:
102 customer_repository = CustomerRepository.from_session(session)
103 customer = await customer_repository.get_by_id(customer_id)
104 if customer is None:
105 raise CustomerDoesNotExist(customer_id)
107 benefit_repository = BenefitRepository.from_session(session)
108 benefit = await benefit_repository.get_by_id(
109 benefit_id, options=benefit_repository.get_eager_options()
110 )
111 if benefit is None:
112 raise BenefitDoesNotExist(benefit_id)
114 resolved_scope = await resolve_scope(session, scope)
116 try:
117 await benefit_grant_service.grant_benefit(
118 session,
119 RedisMiddleware.get(),
120 customer,
121 benefit,
122 attempt=get_retries(),
123 **resolved_scope,
124 )
125 except BenefitRetriableError as e:
126 log.warning(
127 "Retriable error encountered while granting benefit",
128 error=str(e),
129 defer_seconds=e.defer_seconds,
130 benefit_id=str(benefit_id),
131 customer_id=str(customer_id),
132 )
133 raise Retry(delay=e.defer_milliseconds) from e
136@actor(actor_name="benefit.revoke", priority=TaskPriority.MEDIUM) 1a
137async def benefit_revoke( 1a
138 customer_id: uuid.UUID,
139 benefit_id: uuid.UUID,
140 **scope: Unpack[BenefitGrantScopeArgs],
141) -> None:
142 async with AsyncSessionMaker() as session:
143 customer_repository = CustomerRepository.from_session(session)
144 customer = await customer_repository.get_by_id(
145 customer_id,
146 # Allow deleted customers to be processed for revocation tasks
147 include_deleted=True,
148 )
149 if customer is None:
150 raise CustomerDoesNotExist(customer_id)
152 benefit_repository = BenefitRepository.from_session(session)
153 benefit = await benefit_repository.get_by_id(
154 benefit_id, options=benefit_repository.get_eager_options()
155 )
156 if benefit is None:
157 raise BenefitDoesNotExist(benefit_id)
159 resolved_scope = await resolve_scope(session, scope)
161 try:
162 await benefit_grant_service.revoke_benefit(
163 session,
164 RedisMiddleware.get(),
165 customer,
166 benefit,
167 attempt=get_retries(),
168 **resolved_scope,
169 )
170 except BenefitRetriableError as e:
171 log.warning(
172 "Retriable error encountered while revoking benefit",
173 error=str(e),
174 defer_seconds=e.defer_seconds,
175 benefit_id=str(benefit_id),
176 customer_id=str(customer_id),
177 )
178 raise Retry(delay=e.defer_milliseconds) from e
181@actor(actor_name="benefit.update", priority=TaskPriority.MEDIUM) 1a
182async def benefit_update(benefit_grant_id: uuid.UUID) -> None: 1a
183 async with AsyncSessionMaker() as session:
184 benefit_grant = await benefit_grant_service.get(
185 session, benefit_grant_id, loaded=True
186 )
187 if benefit_grant is None:
188 raise BenefitGrantDoesNotExist(benefit_grant_id)
190 try:
191 await benefit_grant_service.update_benefit_grant(
192 session, RedisMiddleware.get(), benefit_grant, attempt=get_retries()
193 )
194 except BenefitRetriableError as e:
195 log.warning(
196 "Retriable error encountered while updating benefit",
197 error=str(e),
198 defer_seconds=e.defer_seconds,
199 benefit_grant_id=str(benefit_grant_id),
200 )
201 raise Retry(delay=e.defer_milliseconds) from e
204@actor(actor_name="benefit.enqueue_benefit_grant_cycles", priority=TaskPriority.MEDIUM) 1a
205async def enqueue_benefit_grant_cycles(**scope: Unpack[BenefitGrantScopeArgs]) -> None: 1a
206 async with AsyncSessionMaker() as session:
207 resolved_scope = await resolve_scope(session, scope)
208 await benefit_grant_service.enqueue_benefit_grant_cycles(
209 session, RedisMiddleware.get(), **resolved_scope
210 )
213@actor(actor_name="benefit.cycle", priority=TaskPriority.MEDIUM) 1a
214async def benefit_cycle(benefit_grant_id: uuid.UUID) -> None: 1a
215 async with AsyncSessionMaker() as session:
216 benefit_grant = await benefit_grant_service.get(
217 session, benefit_grant_id, loaded=True
218 )
219 if benefit_grant is None:
220 raise BenefitGrantDoesNotExist(benefit_grant_id)
222 try:
223 await benefit_grant_service.cycle_benefit_grant(
224 session, RedisMiddleware.get(), benefit_grant, attempt=get_retries()
225 )
226 except BenefitRetriableError as e:
227 log.warning(
228 "Retriable error encountered while cycling benefit",
229 error=str(e),
230 defer_seconds=e.defer_seconds,
231 benefit_grant_id=str(benefit_grant_id),
232 )
233 raise Retry(delay=e.defer_milliseconds) from e
236@actor(actor_name="benefit.delete", priority=TaskPriority.MEDIUM) 1a
237async def benefit_delete(benefit_id: uuid.UUID) -> None: 1a
238 async with AsyncSessionMaker() as session:
239 benefit_repository = BenefitRepository.from_session(session)
240 benefit = await benefit_repository.get_by_id(
241 benefit_id,
242 options=benefit_repository.get_eager_options(),
243 include_deleted=True,
244 )
245 if benefit is None:
246 raise BenefitDoesNotExist(benefit_id)
248 await benefit_grant_service.enqueue_benefit_grant_deletions(session, benefit)
251@actor(actor_name="benefit.revoke_customer", priority=TaskPriority.MEDIUM) 1a
252async def benefit_revoke_customer(customer_id: uuid.UUID) -> None: 1a
253 async with AsyncSessionMaker() as session:
254 customer_repository = CustomerRepository.from_session(session)
255 customer = await customer_repository.get_by_id(
256 customer_id, include_deleted=True
257 )
258 if customer is None:
259 raise CustomerDoesNotExist(customer_id)
261 await benefit_grant_service.enqueue_customer_grant_deletions(session, customer)
264@actor(actor_name="benefit.delete_grant", priority=TaskPriority.MEDIUM) 1a
265async def benefit_delete_grant(benefit_grant_id: uuid.UUID) -> None: 1a
266 async with AsyncSessionMaker() as session:
267 benefit_grant = await benefit_grant_service.get(
268 session, benefit_grant_id, loaded=True
269 )
270 if benefit_grant is None:
271 raise BenefitGrantDoesNotExist(benefit_grant_id)
273 try:
274 await benefit_grant_service.delete_benefit_grant(
275 session, RedisMiddleware.get(), benefit_grant, attempt=get_retries()
276 )
277 except BenefitRetriableError as e:
278 log.warning(
279 "Retriable error encountered while deleting benefit grant",
280 error=str(e),
281 defer_seconds=e.defer_seconds,
282 benefit_grant_id=str(benefit_grant_id),
283 )
284 raise Retry(delay=e.defer_milliseconds) from e