Coverage for polar/benefit/tasks.py: 28%

142 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import uuid 1a

2from typing import Literal, Unpack 1a

3 

4import structlog 1a

5from dramatiq import Retry 1a

6 

7from polar.benefit.repository import BenefitRepository 1a

8from polar.customer.repository import CustomerRepository 1a

9from polar.exceptions import PolarTaskError 1a

10from polar.logging import Logger 1a

11from polar.models.benefit_grant import BenefitGrantScopeArgs 1a

12from polar.product.repository import ProductRepository 1a

13from polar.worker import ( 1a

14 AsyncSessionMaker, 

15 RedisMiddleware, 

16 TaskPriority, 

17 actor, 

18 get_retries, 

19) 

20 

21from .grant.scope import resolve_scope 1a

22from .grant.service import benefit_grant as benefit_grant_service 1a

23from .strategies import BenefitRetriableError 1a

24 

25log: Logger = structlog.get_logger() 1a

26 

27 

28class BenefitTaskError(PolarTaskError): ... 1a

29 

30 

31class CustomerDoesNotExist(BenefitTaskError): 1a

32 def __init__(self, customer_id: uuid.UUID) -> None: 1a

33 self.customer_id = customer_id 

34 message = f"The customer with id {customer_id} does not exist." 

35 super().__init__(message) 

36 

37 

38class ProductDoesNotExist(BenefitTaskError): 1a

39 def __init__(self, product_id: uuid.UUID) -> None: 1a

40 self.user_id = product_id 

41 message = f"The product with id {product_id} does not exist." 

42 super().__init__(message) 

43 

44 

45class BenefitDoesNotExist(BenefitTaskError): 1a

46 def __init__(self, benefit_id: uuid.UUID) -> None: 1a

47 self.benefit_id = benefit_id 

48 message = f"The benefit with id {benefit_id} does not exist." 

49 super().__init__(message) 

50 

51 

52class BenefitGrantDoesNotExist(BenefitTaskError): 1a

53 def __init__(self, benefit_grant_id: uuid.UUID) -> None: 1a

54 self.benefit_grant_id = benefit_grant_id 

55 message = f"The benefit grant with id {benefit_grant_id} does not exist." 

56 super().__init__(message) 

57 

58 

59class OrganizationDoesNotExist(BenefitTaskError): 1a

60 def __init__(self, organization_id: uuid.UUID) -> None: 1a

61 self.organization_id = organization_id 

62 message = f"The organization with id {organization_id} does not exist." 

63 super().__init__(message) 

64 

65 

66@actor(actor_name="benefit.enqueue_benefits_grants", priority=TaskPriority.MEDIUM) 1a

67async def enqueue_benefits_grants( 1a

68 task: Literal["grant", "revoke"], 

69 customer_id: uuid.UUID, 

70 product_id: uuid.UUID, 

71 **scope: Unpack[BenefitGrantScopeArgs], 

72) -> None: 

73 async with AsyncSessionMaker() as session: 

74 customer_repository = CustomerRepository.from_session(session) 

75 customer = await customer_repository.get_by_id( 

76 customer_id, 

77 # Allow deleted customers to be processed for revocation tasks 

78 include_deleted=task == "revoke", 

79 ) 

80 if customer is None: 

81 raise CustomerDoesNotExist(customer_id) 

82 

83 product_repository = ProductRepository.from_session(session) 

84 product = await product_repository.get_by_id(product_id) 

85 if product is None: 

86 raise ProductDoesNotExist(product_id) 

87 

88 resolved_scope = await resolve_scope(session, scope) 

89 

90 await benefit_grant_service.enqueue_benefits_grants( 

91 session, task, customer, product, **resolved_scope 

92 ) 

93 

94 

95@actor(actor_name="benefit.grant", priority=TaskPriority.MEDIUM) 1a

96async def benefit_grant( 1a

97 customer_id: uuid.UUID, 

98 benefit_id: uuid.UUID, 

99 **scope: Unpack[BenefitGrantScopeArgs], 

100) -> None: 

101 async with AsyncSessionMaker() as session: 

102 customer_repository = CustomerRepository.from_session(session) 

103 customer = await customer_repository.get_by_id(customer_id) 

104 if customer is None: 

105 raise CustomerDoesNotExist(customer_id) 

106 

107 benefit_repository = BenefitRepository.from_session(session) 

108 benefit = await benefit_repository.get_by_id( 

109 benefit_id, options=benefit_repository.get_eager_options() 

110 ) 

111 if benefit is None: 

112 raise BenefitDoesNotExist(benefit_id) 

113 

114 resolved_scope = await resolve_scope(session, scope) 

115 

116 try: 

117 await benefit_grant_service.grant_benefit( 

118 session, 

119 RedisMiddleware.get(), 

120 customer, 

121 benefit, 

122 attempt=get_retries(), 

123 **resolved_scope, 

124 ) 

125 except BenefitRetriableError as e: 

126 log.warning( 

127 "Retriable error encountered while granting benefit", 

128 error=str(e), 

129 defer_seconds=e.defer_seconds, 

130 benefit_id=str(benefit_id), 

131 customer_id=str(customer_id), 

132 ) 

133 raise Retry(delay=e.defer_milliseconds) from e 

134 

135 

136@actor(actor_name="benefit.revoke", priority=TaskPriority.MEDIUM) 1a

137async def benefit_revoke( 1a

138 customer_id: uuid.UUID, 

139 benefit_id: uuid.UUID, 

140 **scope: Unpack[BenefitGrantScopeArgs], 

141) -> None: 

142 async with AsyncSessionMaker() as session: 

143 customer_repository = CustomerRepository.from_session(session) 

144 customer = await customer_repository.get_by_id( 

145 customer_id, 

146 # Allow deleted customers to be processed for revocation tasks 

147 include_deleted=True, 

148 ) 

149 if customer is None: 

150 raise CustomerDoesNotExist(customer_id) 

151 

152 benefit_repository = BenefitRepository.from_session(session) 

153 benefit = await benefit_repository.get_by_id( 

154 benefit_id, options=benefit_repository.get_eager_options() 

155 ) 

156 if benefit is None: 

157 raise BenefitDoesNotExist(benefit_id) 

158 

159 resolved_scope = await resolve_scope(session, scope) 

160 

161 try: 

162 await benefit_grant_service.revoke_benefit( 

163 session, 

164 RedisMiddleware.get(), 

165 customer, 

166 benefit, 

167 attempt=get_retries(), 

168 **resolved_scope, 

169 ) 

170 except BenefitRetriableError as e: 

171 log.warning( 

172 "Retriable error encountered while revoking benefit", 

173 error=str(e), 

174 defer_seconds=e.defer_seconds, 

175 benefit_id=str(benefit_id), 

176 customer_id=str(customer_id), 

177 ) 

178 raise Retry(delay=e.defer_milliseconds) from e 

179 

180 

181@actor(actor_name="benefit.update", priority=TaskPriority.MEDIUM) 1a

182async def benefit_update(benefit_grant_id: uuid.UUID) -> None: 1a

183 async with AsyncSessionMaker() as session: 

184 benefit_grant = await benefit_grant_service.get( 

185 session, benefit_grant_id, loaded=True 

186 ) 

187 if benefit_grant is None: 

188 raise BenefitGrantDoesNotExist(benefit_grant_id) 

189 

190 try: 

191 await benefit_grant_service.update_benefit_grant( 

192 session, RedisMiddleware.get(), benefit_grant, attempt=get_retries() 

193 ) 

194 except BenefitRetriableError as e: 

195 log.warning( 

196 "Retriable error encountered while updating benefit", 

197 error=str(e), 

198 defer_seconds=e.defer_seconds, 

199 benefit_grant_id=str(benefit_grant_id), 

200 ) 

201 raise Retry(delay=e.defer_milliseconds) from e 

202 

203 

204@actor(actor_name="benefit.enqueue_benefit_grant_cycles", priority=TaskPriority.MEDIUM) 1a

205async def enqueue_benefit_grant_cycles(**scope: Unpack[BenefitGrantScopeArgs]) -> None: 1a

206 async with AsyncSessionMaker() as session: 

207 resolved_scope = await resolve_scope(session, scope) 

208 await benefit_grant_service.enqueue_benefit_grant_cycles( 

209 session, RedisMiddleware.get(), **resolved_scope 

210 ) 

211 

212 

213@actor(actor_name="benefit.cycle", priority=TaskPriority.MEDIUM) 1a

214async def benefit_cycle(benefit_grant_id: uuid.UUID) -> None: 1a

215 async with AsyncSessionMaker() as session: 

216 benefit_grant = await benefit_grant_service.get( 

217 session, benefit_grant_id, loaded=True 

218 ) 

219 if benefit_grant is None: 

220 raise BenefitGrantDoesNotExist(benefit_grant_id) 

221 

222 try: 

223 await benefit_grant_service.cycle_benefit_grant( 

224 session, RedisMiddleware.get(), benefit_grant, attempt=get_retries() 

225 ) 

226 except BenefitRetriableError as e: 

227 log.warning( 

228 "Retriable error encountered while cycling benefit", 

229 error=str(e), 

230 defer_seconds=e.defer_seconds, 

231 benefit_grant_id=str(benefit_grant_id), 

232 ) 

233 raise Retry(delay=e.defer_milliseconds) from e 

234 

235 

236@actor(actor_name="benefit.delete", priority=TaskPriority.MEDIUM) 1a

237async def benefit_delete(benefit_id: uuid.UUID) -> None: 1a

238 async with AsyncSessionMaker() as session: 

239 benefit_repository = BenefitRepository.from_session(session) 

240 benefit = await benefit_repository.get_by_id( 

241 benefit_id, 

242 options=benefit_repository.get_eager_options(), 

243 include_deleted=True, 

244 ) 

245 if benefit is None: 

246 raise BenefitDoesNotExist(benefit_id) 

247 

248 await benefit_grant_service.enqueue_benefit_grant_deletions(session, benefit) 

249 

250 

251@actor(actor_name="benefit.revoke_customer", priority=TaskPriority.MEDIUM) 1a

252async def benefit_revoke_customer(customer_id: uuid.UUID) -> None: 1a

253 async with AsyncSessionMaker() as session: 

254 customer_repository = CustomerRepository.from_session(session) 

255 customer = await customer_repository.get_by_id( 

256 customer_id, include_deleted=True 

257 ) 

258 if customer is None: 

259 raise CustomerDoesNotExist(customer_id) 

260 

261 await benefit_grant_service.enqueue_customer_grant_deletions(session, customer) 

262 

263 

264@actor(actor_name="benefit.delete_grant", priority=TaskPriority.MEDIUM) 1a

265async def benefit_delete_grant(benefit_grant_id: uuid.UUID) -> None: 1a

266 async with AsyncSessionMaker() as session: 

267 benefit_grant = await benefit_grant_service.get( 

268 session, benefit_grant_id, loaded=True 

269 ) 

270 if benefit_grant is None: 

271 raise BenefitGrantDoesNotExist(benefit_grant_id) 

272 

273 try: 

274 await benefit_grant_service.delete_benefit_grant( 

275 session, RedisMiddleware.get(), benefit_grant, attempt=get_retries() 

276 ) 

277 except BenefitRetriableError as e: 

278 log.warning( 

279 "Retriable error encountered while deleting benefit grant", 

280 error=str(e), 

281 defer_seconds=e.defer_seconds, 

282 benefit_grant_id=str(benefit_grant_id), 

283 ) 

284 raise Retry(delay=e.defer_milliseconds) from e