Coverage for polar/order/tasks.py: 31%

126 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import uuid 1a

2 

3import stripe as stripe_lib 1a

4import structlog 1a

5from dramatiq import Retry 1a

6from sqlalchemy.orm import joinedload 1a

7 

8from polar.exceptions import PolarTaskError 1a

9from polar.logging import Logger 1a

10from polar.models import Customer, Order 1a

11from polar.models.order import OrderBillingReasonInternal 1a

12from polar.payment_method.repository import PaymentMethodRepository 1a

13from polar.product.repository import ProductRepository 1a

14from polar.subscription.repository import SubscriptionRepository 1a

15from polar.transaction.service.balance import PaymentTransactionForChargeDoesNotExist 1a

16from polar.worker import ( 1a

17 AsyncSessionMaker, 

18 CronTrigger, 

19 TaskPriority, 

20 actor, 

21 can_retry, 

22 enqueue_job, 

23) 

24 

25from .repository import OrderRepository 1a

26from .service import CardPaymentFailed, NoPendingBillingEntries 1a

27from .service import order as order_service 1a

28 

29log: Logger = structlog.get_logger() 1a

30 

31MAX_RETRIES = 10 1a

32 

33 

34class OrderTaskError(PolarTaskError): ... 1a

35 

36 

37class SubscriptionDoesNotExist(OrderTaskError): 1a

38 def __init__(self, subscription_id: uuid.UUID) -> None: 1a

39 self.subscription_id = subscription_id 

40 message = f"The subscription with id {subscription_id} does not exist." 

41 super().__init__(message) 

42 

43 

44class ProductDoesNotExist(OrderTaskError): 1a

45 def __init__(self, product_id: uuid.UUID) -> None: 1a

46 self.product_id = product_id 

47 message = f"The product with id {product_id} does not exist." 

48 super().__init__(message) 

49 

50 

51class OrderDoesNotExist(OrderTaskError): 1a

52 def __init__(self, order_id: uuid.UUID) -> None: 1a

53 self.order_id = order_id 

54 message = f"The order with id {order_id} does not exist." 

55 super().__init__(message) 

56 

57 

58class PaymentMethodDoesNotExist(OrderTaskError): 1a

59 def __init__(self, payment_method_id: uuid.UUID) -> None: 1a

60 self.payment_method_id = payment_method_id 

61 message = f"The payment method with id {payment_method_id} does not exist." 

62 super().__init__(message) 

63 

64 

65@actor(actor_name="order.create_subscription_order", priority=TaskPriority.LOW) 1a

66async def create_subscription_order( 1a

67 subscription_id: uuid.UUID, order_reason: OrderBillingReasonInternal 

68) -> None: 

69 async with AsyncSessionMaker() as session: 

70 repository = SubscriptionRepository.from_session(session) 

71 subscription = await repository.get_by_id( 

72 subscription_id, options=repository.get_eager_options() 

73 ) 

74 if subscription is None: 

75 raise SubscriptionDoesNotExist(subscription_id) 

76 

77 try: 

78 await order_service.create_subscription_order( 

79 session, subscription, order_reason 

80 ) 

81 except NoPendingBillingEntries: 

82 # Skip creating an order if there are no pending billing entries. 

83 # Usually happens if the subscription is now canceled, and no usage-based billing is pending 

84 pass 

85 

86 

87@actor(actor_name="order.trigger_payment", priority=TaskPriority.LOW) 1a

88async def trigger_payment(order_id: uuid.UUID, payment_method_id: uuid.UUID) -> None: 1a

89 async with AsyncSessionMaker() as session: 

90 repository = OrderRepository.from_session(session) 

91 order = await repository.get_by_id( 

92 order_id, options=repository.get_eager_options() 

93 ) 

94 if order is None: 

95 raise OrderDoesNotExist(order_id) 

96 

97 payment_method_repository = PaymentMethodRepository.from_session(session) 

98 payment_method = await payment_method_repository.get_by_id_and_customer( 

99 payment_method_id, order.customer_id 

100 ) 

101 if payment_method is None: 

102 log.info( 

103 "Payment method not found, triggering dunning process", 

104 order_id=order_id, 

105 payment_method_id=payment_method_id, 

106 ) 

107 await order_service.handle_payment_failure(session, order) 

108 return 

109 

110 try: 

111 await order_service.trigger_payment(session, order, payment_method) 

112 except CardPaymentFailed: 

113 # Card errors should not be retried - they will be handled by the dunning process 

114 # Log the failure but don't retry the task 

115 log.info( 

116 "Card payment failed, not retrying - will be handled by dunning", 

117 order_id=order_id, 

118 ) 

119 return 

120 except ( 

121 stripe_lib.APIConnectionError, 

122 stripe_lib.APIError, 

123 stripe_lib.RateLimitError, 

124 ) as e: 

125 # Network/availability errors should be retried 

126 log.error( 

127 "Stripe service error during payment trigger, retrying", 

128 order_id=order_id, 

129 error_type=type(e).__name__, 

130 error_message=str(e), 

131 ) 

132 if can_retry(): 

133 raise Retry() from e 

134 else: 

135 raise 

136 

137 

138@actor(actor_name="order.balance", priority=TaskPriority.LOW) 1a

139async def create_order_balance(order_id: uuid.UUID, charge_id: str) -> None: 1a

140 async with AsyncSessionMaker() as session: 

141 repository = OrderRepository.from_session(session) 

142 order = await repository.get_by_id( 

143 order_id, 

144 options=(joinedload(Order.customer).joinedload(Customer.organization),), 

145 ) 

146 if order is None: 

147 raise OrderDoesNotExist(order_id) 

148 

149 try: 

150 await order_service.create_order_balance(session, order, charge_id) 

151 except PaymentTransactionForChargeDoesNotExist as e: 

152 # Retry because Stripe webhooks order is not guaranteed, 

153 # so we might not have been able to handle subscription.created 

154 # or charge.succeeded yet! 

155 if can_retry(): 

156 raise Retry() from e 

157 # Raise the exception to be notified about it 

158 else: 

159 raise 

160 

161 

162@actor(actor_name="order.update_product_benefits_grants", priority=TaskPriority.MEDIUM) 1a

163async def update_product_benefits_grants(product_id: uuid.UUID) -> None: 1a

164 async with AsyncSessionMaker() as session: 

165 product_repository = ProductRepository.from_session(session) 

166 product = await product_repository.get_by_id(product_id) 

167 if product is None: 

168 raise ProductDoesNotExist(product_id) 

169 

170 await order_service.update_product_benefits_grants(session, product) 

171 

172 

173@actor(actor_name="order.confirmation_email", priority=TaskPriority.LOW) 1a

174async def order_confirmation_email(order_id: uuid.UUID) -> None: 1a

175 async with AsyncSessionMaker() as session: 

176 repository = OrderRepository.from_session(session) 

177 order = await repository.get_by_id( 

178 order_id, options=repository.get_eager_options() 

179 ) 

180 if order is None: 

181 raise OrderDoesNotExist(order_id) 

182 

183 await order_service.send_confirmation_email(session, order) 

184 

185 

186@actor(actor_name="order.invoice", priority=TaskPriority.LOW) 1a

187async def order_invoice(order_id: uuid.UUID) -> None: 1a

188 async with AsyncSessionMaker() as session: 

189 repository = OrderRepository.from_session(session) 

190 order = await repository.get_by_id( 

191 order_id, options=repository.get_eager_options() 

192 ) 

193 if order is None: 

194 raise OrderDoesNotExist(order_id) 

195 

196 await order_service.generate_invoice(session, order) 

197 

198 

199@actor( 1a

200 actor_name="order.process_dunning", 

201 cron_trigger=CronTrigger.from_crontab("0 * * * *"), 

202 priority=TaskPriority.MEDIUM, 

203) 

204async def process_dunning() -> None: 1a

205 """Process all orders that are due for dunning (payment retry).""" 

206 async with AsyncSessionMaker() as session: 

207 order_repository = OrderRepository.from_session(session) 

208 due_orders = await order_repository.get_due_dunning_orders() 

209 

210 for order in due_orders: 

211 enqueue_job("order.process_dunning_order", order.id) 

212 

213 

214@actor(actor_name="order.process_dunning_order", priority=TaskPriority.MEDIUM) 1a

215async def process_dunning_order(order_id: uuid.UUID) -> None: 1a

216 """Process a single order due for dunning (payment retry).""" 

217 async with AsyncSessionMaker() as session: 

218 order_repository = OrderRepository.from_session(session) 

219 order = await order_repository.get_by_id( 

220 order_id, options=order_repository.get_eager_options() 

221 ) 

222 if order is None: 

223 raise OrderDoesNotExist(order_id) 

224 

225 await order_service.process_dunning_order(session, order)