Coverage for polar/subscription/service.py: 11%
1039 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1import contextlib 1a
2import uuid 1a
3from collections.abc import AsyncGenerator, Sequence 1a
4from datetime import UTC, datetime, timedelta 1a
5from decimal import Decimal 1a
6from typing import Any, Literal, cast, overload 1a
7from urllib.parse import urlencode 1a
9import stripe as stripe_lib 1a
10import structlog 1a
11from sqlalchemy import select 1a
12from sqlalchemy.orm import contains_eager, selectinload 1a
14from polar.auth.models import AuthSubject 1a
15from polar.billing_entry.repository import BillingEntryRepository 1a
16from polar.billing_entry.service import MeteredLineItem 1a
17from polar.billing_entry.service import billing_entry as billing_entry_service 1a
18from polar.checkout.eventstream import CheckoutEvent, publish_checkout_event 1a
19from polar.checkout.guard import has_product_checkout 1a
20from polar.config import settings 1a
21from polar.customer.repository import CustomerRepository 1a
22from polar.customer_meter.service import customer_meter as customer_meter_service 1a
23from polar.customer_seat.service import seat_service 1a
24from polar.customer_session.service import customer_session as customer_session_service 1a
25from polar.discount.repository import DiscountRedemptionRepository 1a
26from polar.discount.service import discount as discount_service 1a
27from polar.email.react import render_email_template 1a
28from polar.email.schemas import EmailAdapter 1a
29from polar.email.sender import enqueue_email 1a
30from polar.enums import SubscriptionProrationBehavior, SubscriptionRecurringInterval 1a
31from polar.event.service import event as event_service 1a
32from polar.event.system import SystemEvent, build_system_event 1a
33from polar.exceptions import ( 1a
34 BadRequest,
35 PolarError,
36 PolarRequestValidationError,
37 ResourceUnavailable,
38 ValidationError,
39)
40from polar.integrations.stripe.schemas import ProductType 1a
41from polar.integrations.stripe.service import stripe as stripe_service 1a
42from polar.integrations.stripe.utils import get_expandable_id 1a
43from polar.kit.db.postgres import AsyncReadSession, AsyncSession 1a
44from polar.kit.metadata import MetadataQuery, apply_metadata_clause 1a
45from polar.kit.pagination import PaginationParams 1a
46from polar.kit.sorting import Sorting 1a
47from polar.kit.tax import calculate_tax 1a
48from polar.kit.utils import utc_now 1a
49from polar.locker import Locker 1a
50from polar.logging import Logger 1a
51from polar.models import ( 1a
52 Benefit,
53 BenefitGrant,
54 BillingEntry,
55 Checkout,
56 Customer,
57 Discount,
58 Event,
59 Organization,
60 Payment,
61 PaymentMethod,
62 Product,
63 ProductBenefit,
64 Subscription,
65 SubscriptionMeter,
66 SubscriptionProductPrice,
67 User,
68)
69from polar.models.billing_entry import BillingEntryDirection, BillingEntryType 1a
70from polar.models.order import OrderBillingReasonInternal 1a
71from polar.models.product_price import ProductPriceSeatUnit 1a
72from polar.models.subscription import CustomerCancellationReason, SubscriptionStatus 1a
73from polar.models.webhook_endpoint import WebhookEventType 1a
74from polar.notifications.notification import ( 1a
75 MaintainerNewPaidSubscriptionNotificationPayload,
76 NotificationType,
77)
78from polar.notifications.service import PartialNotification 1a
79from polar.notifications.service import notifications as notifications_service 1a
80from polar.organization.repository import OrganizationRepository 1a
81from polar.payment_method.repository import PaymentMethodRepository 1a
82from polar.payment_method.service import payment_method as payment_method_service 1a
83from polar.product.guard import ( 1a
84 is_custom_price,
85 is_fixed_price,
86 is_free_price,
87 is_static_price,
88)
89from polar.product.repository import ProductRepository 1a
90from polar.product.service import product as product_service 1a
91from polar.webhook.service import webhook as webhook_service 1a
92from polar.worker import enqueue_job 1a
94from .repository import SubscriptionRepository 1a
95from .schemas import ( 1a
96 SubscriptionCancel,
97 SubscriptionChargePreview,
98 SubscriptionCreate,
99 SubscriptionCreateCustomer,
100 SubscriptionRevoke,
101 SubscriptionUpdate,
102 SubscriptionUpdateBillingPeriod,
103 SubscriptionUpdateDiscount,
104 SubscriptionUpdateProduct,
105 SubscriptionUpdateSeats,
106 SubscriptionUpdateTrial,
107)
108from .sorting import SubscriptionSortProperty 1a
110log: Logger = structlog.get_logger() 1a
113class SubscriptionError(PolarError): ... 1a
116class NotARecurringProduct(SubscriptionError): 1a
117 def __init__(self, checkout: Checkout, product: Product) -> None: 1a
118 self.checkout = checkout
119 self.product = product
120 message = (
121 f"Checkout {checkout.id} is for product {product.id}, "
122 "which is not a recurring product."
123 )
124 super().__init__(message)
127class MissingCheckoutCustomer(SubscriptionError): 1a
128 def __init__(self, checkout: Checkout) -> None: 1a
129 self.checkout = checkout
130 message = f"Checkout {checkout.id} is missing a customer."
131 super().__init__(message)
134class InactiveSubscription(SubscriptionError): 1a
135 def __init__(self, subscription: Subscription) -> None: 1a
136 self.subscription = subscription
137 message = f"Subscription {subscription.id} is not active."
138 super().__init__(message)
141class SubscriptionDoesNotExist(SubscriptionError): 1a
142 def __init__(self, stripe_subscription_id: str) -> None: 1a
143 self.stripe_subscription_id = stripe_subscription_id
144 message = (
145 f"Received a subscription update from Stripe for {stripe_subscription_id}, "
146 f"but no associated Subscription exists."
147 )
148 super().__init__(message)
151class AlreadyCanceledSubscription(SubscriptionError): 1a
152 def __init__(self, subscription: Subscription) -> None: 1a
153 self.subscription = subscription
154 message = (
155 "This subscription is already canceled or will be at the end of the period."
156 )
157 super().__init__(message, 403)
160class TrialingSubscription(SubscriptionError): 1a
161 def __init__(self, subscription: Subscription) -> None: 1a
162 self.subscription = subscription
163 message = (
164 "This subscription is currently in a trial period and cannot be updated."
165 )
166 super().__init__(message, 403)
169class SubscriptionNotActiveOnStripe(SubscriptionError): 1a
170 def __init__(self, subscription: Subscription) -> None: 1a
171 self.subscription = subscription
172 message = "This subscription is not active on Stripe."
173 super().__init__(message, 400)
176class SubscriptionLocked(SubscriptionError): 1a
177 def __init__(self, subscription: Subscription) -> None: 1a
178 self.subscription = subscription
179 message = "This subscription is pending an update."
180 super().__init__(message, 409)
183class MissingStripeCustomerID(SubscriptionError): 1a
184 def __init__(self, checkout: Checkout, customer: Customer) -> None: 1a
185 self.checkout = checkout
186 self.customer = customer
187 message = (
188 f"Checkout {checkout.id}'s customer {customer.id} "
189 "is missing a Stripe customer ID."
190 )
191 super().__init__(message)
194class SubscriptionManagedByStripe(SubscriptionError): 1a
195 def __init__(self, subscription: Subscription) -> None: 1a
196 self.subscription = subscription
197 message = "This feature is not available for this subscription."
198 super().__init__(message, 403)
201class SubscriptionNotReadyForMigration(SubscriptionError): 1a
202 def __init__(self, subscription: Subscription) -> None: 1a
203 self.subscription = subscription
204 message = "This subscription is not ready for migration."
205 super().__init__(message)
208class NotASeatBasedSubscription(SubscriptionError): 1a
209 def __init__(self, subscription: Subscription) -> None: 1a
210 self.subscription = subscription
211 message = "This subscription does not support seat-based pricing."
212 super().__init__(message, 400)
215class SeatsAlreadyAssigned(SubscriptionError): 1a
216 def __init__( 1a
217 self, subscription: Subscription, assigned_count: int, requested_seats: int
218 ) -> None:
219 self.subscription = subscription
220 self.assigned_count = assigned_count
221 self.requested_seats = requested_seats
222 message = (
223 f"Cannot decrease seats to {requested_seats}. "
224 f"Currently {assigned_count} seats are assigned. "
225 f"Revoke seats first."
226 )
227 super().__init__(message, 400)
230class BelowMinimumSeats(SubscriptionError): 1a
231 def __init__( 1a
232 self, subscription: Subscription, minimum_seats: int, requested_seats: int
233 ) -> None:
234 self.subscription = subscription
235 self.minimum_seats = minimum_seats
236 self.requested_seats = requested_seats
237 message = f"Minimum seat count is {minimum_seats} based on pricing tiers."
238 super().__init__(message, 400)
241class OneTimeOrderNotSupported(SubscriptionError): 1a
242 def __init__( 1a
243 self, message: str = "This operation is not supported for one-time orders"
244 ) -> None:
245 super().__init__(message, 403)
248@overload 1a
249def _from_timestamp(t: int) -> datetime: ... 249 ↛ exitline 249 didn't return from function '_from_timestamp' because 1a
252@overload 1a
253def _from_timestamp(t: None) -> None: ... 253 ↛ exitline 253 didn't return from function '_from_timestamp' because 1a
256def _from_timestamp(t: int | None) -> datetime | None: 1a
257 if t is None:
258 return None
259 return datetime.fromtimestamp(t, UTC)
262class SubscriptionService: 1a
263 def _get_seat_based_price( 1a
264 self, subscription: Subscription
265 ) -> ProductPriceSeatUnit | None:
266 """Get the seat-based price from subscription, if any."""
267 for spp in subscription.subscription_product_prices:
268 if isinstance(spp.product_price, ProductPriceSeatUnit):
269 return spp.product_price
270 return None
272 def _get_minimum_seats_from_tiers(self, seat_price: ProductPriceSeatUnit) -> int: 1a
273 """Get the absolute minimum seats from the first tier."""
274 if seat_price.seat_tiers is None:
275 return 1
276 tiers = seat_price.seat_tiers["tiers"]
277 if not tiers:
278 return 1
279 sorted_tiers = sorted(tiers, key=lambda t: t["min_seats"])
280 return sorted_tiers[0]["min_seats"]
282 @staticmethod 1a
283 def _calculate_time_proration( 1a
284 period_start: datetime, period_end: datetime, now: datetime
285 ) -> Decimal | None:
286 """
287 Calculate proration factor for a time period.
289 Returns:
290 Decimal between 0 and 1 representing percentage of time remaining,
291 or None if no time is remaining.
292 """
293 period_total = (period_end - period_start).total_seconds()
294 time_remaining = (period_end - now).total_seconds()
296 if time_remaining <= 0:
297 return None
299 return Decimal(time_remaining) / Decimal(period_total)
301 def _calculate_proration_factor( 1a
302 self, subscription: Subscription, *, now: datetime | None = None
303 ) -> Decimal | None:
304 """
305 Calculate proration factor for subscription's current billing period.
307 Returns:
308 Decimal between 0 and 1 representing percentage of time remaining,
309 or None if period has ended or no period_end exists.
310 """
311 if now is None:
312 now = datetime.now(UTC)
314 period_end = subscription.current_period_end
315 if period_end is None:
316 return None
318 period_start = subscription.current_period_start
319 return self._calculate_time_proration(period_start, period_end, now)
321 async def list( 1a
322 self,
323 session: AsyncReadSession,
324 auth_subject: AuthSubject[User | Organization],
325 *,
326 organization_id: Sequence[uuid.UUID] | None = None,
327 product_id: Sequence[uuid.UUID] | None = None,
328 customer_id: Sequence[uuid.UUID] | None = None,
329 external_customer_id: Sequence[str] | None = None,
330 discount_id: Sequence[uuid.UUID] | None = None,
331 active: bool | None = None,
332 cancel_at_period_end: bool | None = None,
333 metadata: MetadataQuery | None = None,
334 pagination: PaginationParams,
335 sorting: list[Sorting[SubscriptionSortProperty]] = [
336 (SubscriptionSortProperty.started_at, True)
337 ],
338 ) -> tuple[Sequence[Subscription], int]:
339 repository = SubscriptionRepository.from_session(session)
340 statement = (
341 repository.get_readable_statement(auth_subject)
342 .where(Subscription.started_at.is_not(None))
343 .join(Subscription.customer)
344 .join(Subscription.discount, isouter=True)
345 )
347 if organization_id is not None:
348 statement = statement.where(Product.organization_id.in_(organization_id))
350 if product_id is not None:
351 statement = statement.where(Product.id.in_(product_id))
353 if customer_id is not None:
354 statement = statement.where(Subscription.customer_id.in_(customer_id))
356 if external_customer_id is not None:
357 statement = statement.where(Customer.external_id.in_(external_customer_id))
359 if discount_id is not None:
360 statement = statement.where(Subscription.discount_id.in_(discount_id))
362 if active is not None:
363 if active:
364 statement = statement.where(Subscription.active.is_(True))
365 else:
366 statement = statement.where(Subscription.revoked.is_(True))
368 if cancel_at_period_end is not None:
369 statement = statement.where(
370 Subscription.cancel_at_period_end.is_(cancel_at_period_end)
371 )
373 if metadata is not None:
374 statement = apply_metadata_clause(Subscription, statement, metadata)
376 statement = repository.apply_sorting(statement, sorting)
378 statement = statement.options(
379 contains_eager(Subscription.product).options(
380 selectinload(Product.product_medias),
381 selectinload(Product.attached_custom_fields),
382 ),
383 contains_eager(Subscription.discount),
384 contains_eager(Subscription.customer),
385 selectinload(Subscription.meters).joinedload(SubscriptionMeter.meter),
386 )
388 return await repository.paginate(
389 statement, limit=pagination.limit, page=pagination.page
390 )
392 async def get( 1a
393 self,
394 session: AsyncReadSession,
395 auth_subject: AuthSubject[User | Organization],
396 id: uuid.UUID,
397 ) -> Subscription | None:
398 repository = SubscriptionRepository.from_session(session)
399 statement = (
400 repository.get_readable_statement(auth_subject)
401 .where(
402 Subscription.id == id,
403 Subscription.started_at.is_not(None),
404 )
405 .options(
406 *repository.get_eager_options(
407 product_load=contains_eager(Subscription.product)
408 )
409 )
410 )
411 return await repository.get_one_or_none(statement)
413 async def create( 1a
414 self,
415 session: AsyncSession,
416 subscription_create: SubscriptionCreate,
417 auth_subject: AuthSubject[User | Organization],
418 ) -> Subscription:
419 errors: list[ValidationError] = []
421 product = await product_service.get(
422 session, auth_subject, subscription_create.product_id
423 )
424 if product is None:
425 errors.append(
426 {
427 "type": "value_error",
428 "loc": ("body", "product_id"),
429 "msg": "Product does not exist.",
430 "input": subscription_create.product_id,
431 }
432 )
433 elif not product.is_recurring:
434 errors.append(
435 {
436 "type": "value_error",
437 "loc": ("body", "product_id"),
438 "msg": "Product is not a recurring product.",
439 "input": subscription_create.product_id,
440 }
441 )
442 elif product.is_legacy_recurring_price:
443 errors.append(
444 {
445 "type": "value_error",
446 "loc": ("body", "product_id"),
447 "msg": "Legacy recurring products are not supported.",
448 "input": subscription_create.product_id,
449 }
450 )
451 elif (static_price := product.get_static_price()) and not is_free_price(
452 static_price
453 ):
454 errors.append(
455 {
456 "type": "value_error",
457 "loc": ("body", "product_id"),
458 "msg": (
459 "Product is not free. "
460 "The customer should go through a checkout to create a paid subscription."
461 ),
462 "input": subscription_create.product_id,
463 }
464 )
466 customer: Customer | None = None
467 customer_repository = CustomerRepository.from_session(session)
468 error_loc: str
469 input_value: uuid.UUID | str
470 if isinstance(subscription_create, SubscriptionCreateCustomer):
471 error_loc = "customer_id"
472 input_value = subscription_create.customer_id
473 customer = await customer_repository.get_readable_by_id(
474 auth_subject, input_value
475 )
476 else:
477 error_loc = "external_customer_id"
478 input_value = subscription_create.external_customer_id
479 customer = await customer_repository.get_readable_by_external_id(
480 auth_subject, input_value
481 )
483 if customer is None:
484 errors.append(
485 {
486 "type": "value_error",
487 "loc": ("body", error_loc),
488 "msg": "Customer does not exist.",
489 "input": input_value,
490 }
491 )
493 if len(errors) > 0:
494 raise PolarRequestValidationError(errors)
496 assert product is not None
497 assert customer is not None
499 prices = product.prices
500 assert product.recurring_interval is not None
501 assert product.recurring_interval_count is not None
502 recurring_interval = product.recurring_interval
503 recurring_interval_count = product.recurring_interval_count
505 subscription_product_prices: list[SubscriptionProductPrice] = []
506 for price in prices:
507 subscription_product_prices.append(
508 SubscriptionProductPrice.from_price(price)
509 )
511 status = SubscriptionStatus.active
512 current_period_start = utc_now()
513 current_period_end = recurring_interval.get_next_period(
514 current_period_start, recurring_interval_count
515 )
517 subscription = Subscription(
518 status=SubscriptionStatus.active,
519 started_at=current_period_start,
520 current_period_start=current_period_start,
521 current_period_end=current_period_end,
522 cancel_at_period_end=False,
523 recurring_interval=recurring_interval,
524 recurring_interval_count=recurring_interval_count,
525 product=product,
526 customer=customer,
527 subscription_product_prices=subscription_product_prices,
528 user_metadata=subscription_create.metadata,
529 )
531 repository = SubscriptionRepository.from_session(session)
532 subscription = await repository.create(subscription, flush=True)
534 await self._after_subscription_created(session, subscription)
535 # ⚠️ Some users are relying on `subscription.updated` for everything
536 # It was working before with Stripe since it always triggered an update
537 # after creation.
538 # But that's not the case with our new engine.
539 # So we manually trigger it here to keep the same behavior.
540 await self._on_subscription_updated(session, subscription)
542 # Reset the subscription meters to start fresh
543 await self.reset_meters(session, subscription)
545 # Enqueue the benefits grants for the subscription
546 await self.enqueue_benefits_grants(session, subscription)
548 return subscription
550 async def create_or_update_from_checkout( 1a
551 self,
552 session: AsyncSession,
553 checkout: Checkout,
554 payment_method: PaymentMethod | None = None,
555 ) -> tuple[Subscription, bool]:
556 assert has_product_checkout(checkout)
558 product = checkout.product
559 if not product.is_recurring:
560 raise NotARecurringProduct(checkout, product)
562 customer = checkout.customer
563 if customer is None:
564 raise MissingCheckoutCustomer(checkout)
566 prices = checkout.prices[product.id]
567 recurring_interval: SubscriptionRecurringInterval
568 recurring_interval_count: int
569 if product.is_legacy_recurring_price:
570 prices = [checkout.product_price]
571 recurring_interval = prices[0].recurring_interval
572 recurring_interval_count = 1
573 else:
574 assert product.recurring_interval is not None
575 assert product.recurring_interval_count is not None
576 recurring_interval = product.recurring_interval
577 recurring_interval_count = product.recurring_interval_count
579 subscription_product_prices: list[SubscriptionProductPrice] = []
580 for price in prices:
581 subscription_product_prices.append(
582 SubscriptionProductPrice.from_price(
583 price, checkout.amount, checkout.seats
584 )
585 )
587 subscription = checkout.subscription
588 created = False
589 previous_is_canceled = subscription.canceled if subscription else False
590 previous_status = subscription.status if subscription else None
592 status = SubscriptionStatus.active
593 current_period_start = utc_now()
594 trial_start: datetime | None = None
595 trial_end = checkout.trial_end
596 if trial_end is not None:
597 status = SubscriptionStatus.trialing
598 trial_start = current_period_start
599 current_period_end = trial_end
600 else:
601 current_period_end = recurring_interval.get_next_period(
602 current_period_start, recurring_interval_count
603 )
605 # New subscription
606 if subscription is None:
607 subscription = Subscription(
608 started_at=current_period_start,
609 cancel_at_period_end=False,
610 customer=customer,
611 )
612 created = True
614 # Even when updating from a free subscription, we change the current period:
615 # we start a billing cycle from the checkout date.
616 subscription.current_period_start = current_period_start
617 subscription.current_period_end = current_period_end
618 subscription.trial_start = trial_start
619 subscription.trial_end = trial_end
621 subscription.recurring_interval = recurring_interval
622 subscription.recurring_interval_count = recurring_interval_count
623 subscription.status = status
624 subscription.payment_method = payment_method
625 subscription.product = product
626 subscription.subscription_product_prices = subscription_product_prices
627 subscription.discount = checkout.discount
628 subscription.checkout = checkout
629 subscription.user_metadata = checkout.user_metadata
630 subscription.custom_field_data = checkout.custom_field_data
631 subscription.seats = checkout.seats
633 repository = SubscriptionRepository.from_session(session)
634 if created:
635 subscription = await repository.create(subscription, flush=True)
636 await self._after_subscription_created(session, subscription)
637 # ⚠️ Some users are relying on `subscription.updated` for everything
638 # It was working before with Stripe since it always triggered an update
639 # after creation.
640 # But that's not the case with our new engine.
641 # So we manually trigger it here to keep the same behavior.
642 await self._on_subscription_updated(session, subscription)
644 else:
645 subscription = await repository.update(subscription, flush=True)
646 assert previous_status is not None
647 await self._after_subscription_updated(
648 session,
649 subscription,
650 previous_status=previous_status,
651 previous_is_canceled=previous_is_canceled,
652 )
654 # Link potential discount redemption to the subscription
655 if subscription.discount is not None:
656 discount_redemption_repository = DiscountRedemptionRepository.from_session(
657 session
658 )
659 await discount_redemption_repository.set_subscription_by_checkout(
660 checkout.id, subscription.id
661 )
663 # Reset the subscription meters to start fresh
664 await self.reset_meters(session, subscription)
666 # Enqueue the benefits grants for the subscription
667 await self.enqueue_benefits_grants(session, subscription)
669 # Notify checkout channel that a subscription has been created from it
670 await publish_checkout_event(
671 checkout.client_secret, CheckoutEvent.subscription_created
672 )
674 return subscription, created
676 async def create_or_update_from_checkout_stripe( 1a
677 self,
678 session: AsyncSession,
679 checkout: Checkout,
680 payment: Payment | None = None,
681 payment_method: PaymentMethod | None = None,
682 ) -> tuple[Subscription, bool]:
683 assert has_product_checkout(checkout)
685 idempotency_key = f"subscription_{checkout.id}{'' if payment is None else f'_{payment.processor_id}'}"
686 product = checkout.product
687 if not product.is_recurring:
688 raise NotARecurringProduct(checkout, product)
690 customer = checkout.customer
691 if customer is None:
692 raise MissingCheckoutCustomer(checkout)
694 stripe_customer_id = customer.stripe_customer_id
695 if stripe_customer_id is None:
696 raise MissingStripeCustomerID(checkout, customer)
698 metadata = {
699 "type": ProductType.product,
700 "product_id": str(checkout.product_id),
701 "checkout_id": str(checkout.id),
702 }
703 invoice_metadata = {
704 "checkout_id": str(checkout.id),
705 }
706 if payment is not None:
707 invoice_metadata["charge_id"] = payment.processor_id
709 stripe_price_ids: list[str] = []
710 subscription_product_prices: list[SubscriptionProductPrice] = []
712 prices = checkout.prices[product.id]
713 if product.is_legacy_recurring_price:
714 prices = [checkout.product_price]
716 free_pricing = True
717 for price in prices:
718 # For pay-what-you-want prices, we need to generate a dedicated price in Stripe
719 if is_custom_price(price):
720 ad_hoc_price = await stripe_service.create_ad_hoc_custom_price(
721 product,
722 price,
723 amount=checkout.amount,
724 currency=checkout.currency,
725 idempotency_key=f"{idempotency_key}_{price.id}",
726 )
727 stripe_price_ids.append(ad_hoc_price.id)
728 subscription_product_prices.append(
729 SubscriptionProductPrice.from_price(
730 price, checkout.amount, checkout.seats
731 )
732 )
733 free_pricing = False
734 else:
735 if is_static_price(price):
736 stripe_price_ids.append(price.stripe_price_id)
737 if not is_free_price(price):
738 free_pricing = False
739 subscription_product_prices.append(
740 SubscriptionProductPrice.from_price(price, seats=checkout.seats)
741 )
743 # We always need at least one price to create a subscription on Stripe
744 # It happens if we only have metered prices on the product
745 if len(stripe_price_ids) == 0:
746 placeholder_price = await stripe_service.create_placeholder_price(
747 product,
748 checkout.currency,
749 idempotency_key=f"{idempotency_key}_placeholder",
750 )
751 stripe_price_ids.append(placeholder_price.id)
753 subscription = checkout.subscription
754 new_subscription = False
755 previous_is_canceled = subscription.canceled if subscription else False
756 previous_status = subscription.status if subscription else None
758 # Disable automatic tax for free pricing, since we don't collect customer address in that case
759 automatic_tax = product.is_tax_applicable and not free_pricing
761 # New subscription
762 if subscription is None:
763 assert product.stripe_product_id is not None
764 (
765 stripe_subscription,
766 stripe_invoice,
767 ) = await stripe_service.create_out_of_band_subscription(
768 customer=stripe_customer_id,
769 currency=checkout.currency,
770 prices=stripe_price_ids,
771 coupon=(
772 checkout.discount.stripe_coupon_id if checkout.discount else None
773 ),
774 automatic_tax=automatic_tax,
775 metadata=metadata,
776 invoice_metadata=invoice_metadata,
777 idempotency_key=f"{idempotency_key}_create",
778 )
779 subscription = Subscription()
780 new_subscription = True
781 # Subscription upgrade
782 else:
783 assert subscription.stripe_subscription_id is not None
784 (
785 stripe_subscription,
786 stripe_invoice,
787 ) = await stripe_service.update_out_of_band_subscription(
788 subscription_id=subscription.stripe_subscription_id,
789 new_prices=stripe_price_ids,
790 coupon=(
791 checkout.discount.stripe_coupon_id if checkout.discount else None
792 ),
793 automatic_tax=automatic_tax,
794 metadata=metadata,
795 invoice_metadata=invoice_metadata,
796 idempotency_key=f"{idempotency_key}_update",
797 )
798 await stripe_service.set_automatically_charged_subscription(
799 stripe_subscription.id,
800 payment_method.processor_id if payment_method else None,
801 idempotency_key=f"{idempotency_key}_payment_method",
802 )
804 subscription.stripe_subscription_id = stripe_subscription.id
805 subscription.status = SubscriptionStatus(stripe_subscription.status)
806 subscription.current_period_start = _from_timestamp(
807 stripe_subscription.current_period_start
808 )
809 subscription.current_period_end = _from_timestamp(
810 stripe_subscription.current_period_end
811 )
812 subscription.discount = checkout.discount
813 subscription.customer = customer
814 subscription.payment_method = payment_method
815 subscription.product = product
816 subscription.subscription_product_prices = subscription_product_prices
817 subscription.checkout = checkout
818 subscription.user_metadata = checkout.user_metadata
819 subscription.custom_field_data = checkout.custom_field_data
820 subscription.seats = checkout.seats
821 subscription.set_started_at()
822 self.update_cancellation_from_stripe(subscription, stripe_subscription)
824 if product.is_legacy_recurring_price:
825 subscription.recurring_interval = prices[0].recurring_interval
826 subscription.recurring_interval_count = 1
827 else:
828 assert product.recurring_interval is not None
829 assert product.recurring_interval_count is not None
830 subscription.recurring_interval = product.recurring_interval
831 subscription.recurring_interval_count = product.recurring_interval_count
833 repository = SubscriptionRepository.from_session(session)
834 if new_subscription:
835 subscription = await repository.create(subscription, flush=True)
836 await self._after_subscription_created(session, subscription)
837 else:
838 subscription = await repository.update(subscription, flush=True)
839 assert previous_status is not None
840 await self._after_subscription_updated(
841 session,
842 subscription,
843 previous_status=previous_status,
844 previous_is_canceled=previous_is_canceled,
845 )
847 # Link potential discount redemption to the subscription
848 if subscription.discount is not None:
849 discount_redemption_repository = DiscountRedemptionRepository.from_session(
850 session
851 )
852 await discount_redemption_repository.set_subscription_by_checkout(
853 checkout.id, subscription.id
854 )
856 # Reset the subscription meters to start fresh
857 await self.reset_meters(session, subscription)
859 # Notify checkout channel that a subscription has been created from it
860 await publish_checkout_event(
861 checkout.client_secret, CheckoutEvent.subscription_created
862 )
864 return subscription, new_subscription
866 async def cycle( 1a
867 self,
868 session: AsyncSession,
869 subscription: Subscription,
870 update_cycle_dates: bool = True,
871 ) -> Subscription:
872 if not subscription.active:
873 raise InactiveSubscription(subscription)
875 revoke = subscription.cancel_at_period_end
876 previous_status = subscription.status
877 previous_canceled = subscription.canceled
879 # Subscription is due to cancel, revoke it
880 if revoke:
881 subscription.ended_at = subscription.ends_at
882 subscription.status = SubscriptionStatus.canceled
884 event = await event_service.create_event(
885 session,
886 build_system_event(
887 SystemEvent.subscription_revoked,
888 customer=subscription.customer,
889 organization=subscription.organization,
890 metadata={"subscription_id": str(subscription.id)},
891 ),
892 )
893 await self.enqueue_benefits_grants(session, subscription)
894 # Normal cycle
895 else:
896 if update_cycle_dates:
897 current_period_end = subscription.current_period_end
898 assert current_period_end is not None
899 subscription.current_period_start = current_period_end
900 subscription.current_period_end = (
901 subscription.recurring_interval.get_next_period(
902 current_period_end, subscription.recurring_interval_count
903 )
904 )
906 # Check if discount is still applicable
907 if subscription.discount is not None:
908 assert subscription.started_at is not None
909 if subscription.discount.is_repetition_expired(
910 subscription.started_at,
911 subscription.current_period_start,
912 previous_status == SubscriptionStatus.trialing,
913 ):
914 subscription.discount = None
916 event = event = await event_service.create_event(
917 session,
918 build_system_event(
919 SystemEvent.subscription_cycled,
920 customer=subscription.customer,
921 organization=subscription.organization,
922 metadata={"subscription_id": str(subscription.id)},
923 ),
924 )
925 # Add a billing entry for a new period
926 billing_entry_repository = BillingEntryRepository.from_session(session)
927 for subscription_product_price in subscription.subscription_product_prices:
928 product_price = subscription_product_price.product_price
929 if is_static_price(product_price):
930 discount_amount = 0
931 if subscription.discount:
932 discount_amount = subscription.discount.get_discount_amount(
933 subscription_product_price.amount
934 )
936 await billing_entry_repository.create(
937 BillingEntry(
938 start_timestamp=subscription.current_period_start,
939 end_timestamp=subscription.current_period_end,
940 type=BillingEntryType.cycle,
941 direction=BillingEntryDirection.debit,
942 amount=subscription_product_price.amount,
943 currency=subscription.currency,
944 customer=subscription.customer,
945 product_price=product_price,
946 discount=subscription.discount,
947 discount_amount=discount_amount,
948 subscription=subscription,
949 event=event,
950 ),
951 )
953 if previous_status == SubscriptionStatus.trialing:
954 subscription.status = SubscriptionStatus.active
956 repository = SubscriptionRepository.from_session(session)
957 subscription = await repository.update(
958 subscription, update_dict={"scheduler_locked_at": None}
959 )
961 billing_reason = (
962 OrderBillingReasonInternal.subscription_cycle_after_trial
963 if previous_status == SubscriptionStatus.trialing
964 else OrderBillingReasonInternal.subscription_cycle
965 )
966 enqueue_job(
967 "order.create_subscription_order",
968 subscription.id,
969 billing_reason,
970 )
972 await self._after_subscription_updated(
973 session,
974 subscription,
975 previous_status=previous_status,
976 previous_is_canceled=previous_canceled,
977 )
979 return subscription
981 async def reset_meters( 1a
982 self, session: AsyncSession, subscription: Subscription
983 ) -> None:
984 """
985 Resets all the subscription meters to start fresh, optionally reporting
986 rollover units if applicable.
988 This should be called when creating a new subscription or cycling an
989 existing one.
990 """
991 customer = subscription.customer
992 for subscription_meter in subscription.meters:
993 rollover_units = await customer_meter_service.get_rollover_units(
994 session, customer, subscription_meter.meter
995 )
996 await event_service.create_event(
997 session,
998 build_system_event(
999 SystemEvent.meter_reset,
1000 customer=customer,
1001 organization=subscription.organization,
1002 metadata={"meter_id": str(subscription_meter.meter_id)},
1003 ),
1004 )
1005 if rollover_units > 0:
1006 await event_service.create_event(
1007 session,
1008 build_system_event(
1009 SystemEvent.meter_credited,
1010 customer=customer,
1011 organization=subscription.organization,
1012 metadata={
1013 "meter_id": str(subscription_meter.meter_id),
1014 "units": rollover_units,
1015 "rollover": True,
1016 },
1017 ),
1018 )
1020 async def _after_subscription_created( 1a
1021 self, session: AsyncSession, subscription: Subscription
1022 ) -> None:
1023 await self._send_webhook(
1024 session, subscription, WebhookEventType.subscription_created
1025 )
1026 # ⚠️ In some cases, the subscription is immediately active
1027 # Make sure then to perform all the operations required!
1028 if subscription.active:
1029 await self._on_subscription_activated(session, subscription, False)
1031 enqueue_job(
1032 "customer.webhook",
1033 WebhookEventType.customer_state_changed,
1034 subscription.customer_id,
1035 )
1037 @contextlib.asynccontextmanager 1a
1038 async def lock( 1a
1039 self, locker: Locker, subscription: Subscription
1040 ) -> AsyncGenerator[Subscription]:
1041 lock_name = f"subscription:{subscription.id}"
1042 if await locker.is_locked(lock_name):
1043 raise SubscriptionLocked(subscription)
1044 async with locker.lock(
1045 lock_name,
1046 timeout=10.0, # Quite long, but we've experienced slow responses from Stripe in test mode
1047 blocking_timeout=1,
1048 ):
1049 yield subscription
1051 async def update( 1a
1052 self,
1053 session: AsyncSession,
1054 locker: Locker,
1055 subscription: Subscription,
1056 *,
1057 update: SubscriptionUpdate,
1058 ) -> Subscription:
1059 if isinstance(update, SubscriptionUpdateProduct):
1060 return await self.update_product(
1061 session,
1062 subscription,
1063 product_id=update.product_id,
1064 proration_behavior=update.proration_behavior,
1065 )
1067 if isinstance(update, SubscriptionUpdateDiscount):
1068 return await self.update_discount(
1069 session,
1070 locker,
1071 subscription,
1072 discount_id=update.discount_id,
1073 )
1075 if isinstance(update, SubscriptionUpdateTrial):
1076 return await self.update_trial(
1077 session, subscription, trial_end=update.trial_end
1078 )
1080 if isinstance(update, SubscriptionUpdateSeats):
1081 return await self.update_seats(
1082 session,
1083 subscription,
1084 seats=update.seats,
1085 proration_behavior=update.proration_behavior,
1086 )
1088 if isinstance(update, SubscriptionUpdateBillingPeriod):
1089 return await self.update_currrent_billing_period_end(
1090 session,
1091 subscription,
1092 new_period_end=update.current_billing_period_end,
1093 )
1095 if isinstance(update, SubscriptionCancel):
1096 uncancel = update.cancel_at_period_end is False
1098 if uncancel:
1099 return await self.uncancel(session, subscription)
1101 return await self.cancel(
1102 session,
1103 subscription,
1104 customer_reason=update.customer_cancellation_reason,
1105 customer_comment=update.customer_cancellation_comment,
1106 )
1108 if isinstance(update, SubscriptionRevoke):
1109 return await self._perform_cancellation(
1110 session,
1111 subscription,
1112 customer_reason=update.customer_cancellation_reason,
1113 customer_comment=update.customer_cancellation_comment,
1114 immediately=True,
1115 )
1117 async def update_product( 1a
1118 self,
1119 session: AsyncSession,
1120 subscription: Subscription,
1121 *,
1122 product_id: uuid.UUID,
1123 proration_behavior: SubscriptionProrationBehavior | None = None,
1124 ) -> Subscription:
1125 if subscription.revoked or subscription.cancel_at_period_end:
1126 raise AlreadyCanceledSubscription(subscription)
1128 if subscription.trialing:
1129 raise TrialingSubscription(subscription)
1131 previous_product = subscription.product
1132 previous_status = subscription.status
1133 previous_is_canceled = subscription.canceled
1134 previous_prices = [*subscription.prices]
1136 product_repository = ProductRepository.from_session(session)
1137 product = await product_repository.get_by_id_and_organization(
1138 product_id,
1139 subscription.product.organization_id,
1140 options=product_repository.get_eager_options(),
1141 )
1143 if product is None:
1144 raise PolarRequestValidationError(
1145 [
1146 {
1147 "type": "value_error",
1148 "loc": ("body", "product_id"),
1149 "msg": "Product does not exist.",
1150 "input": product_id,
1151 }
1152 ]
1153 )
1155 if product.is_archived:
1156 raise PolarRequestValidationError(
1157 [
1158 {
1159 "type": "value_error",
1160 "loc": ("body", "product_id"),
1161 "msg": "Product is archived.",
1162 "input": product_id,
1163 }
1164 ]
1165 )
1167 if not product.is_recurring:
1168 raise PolarRequestValidationError(
1169 [
1170 {
1171 "type": "value_error",
1172 "loc": ("body", "product_id"),
1173 "msg": "Product is not recurring.",
1174 "input": product_id,
1175 }
1176 ]
1177 )
1179 if product.is_legacy_recurring_price:
1180 raise PolarRequestValidationError(
1181 [
1182 {
1183 "type": "value_error",
1184 "loc": ("body", "product_id"),
1185 "msg": "Product has legacy recurring prices.",
1186 "input": product_id,
1187 }
1188 ]
1189 )
1190 assert previous_product.recurring_interval is not None
1191 assert product.recurring_interval is not None
1193 prices = product.prices
1195 for price in prices:
1196 if is_custom_price(price):
1197 raise PolarRequestValidationError(
1198 [
1199 {
1200 "type": "value_error",
1201 "loc": ("body", "product_id"),
1202 "msg": "Can't update to a product with custom prices.",
1203 "input": product_id,
1204 }
1205 ]
1206 )
1208 # Add event for the subscription plan change
1209 event = await event_service.create_event(
1210 session,
1211 build_system_event(
1212 SystemEvent.subscription_product_updated,
1213 customer=subscription.customer,
1214 organization=subscription.organization,
1215 metadata={
1216 "subscription_id": str(subscription.id),
1217 "old_product_id": str(previous_product.id),
1218 "new_product_id": str(product.id),
1219 },
1220 ),
1221 )
1223 organization_repository = OrganizationRepository.from_session(session)
1224 organization = await organization_repository.get_by_id(product.organization_id)
1225 assert organization is not None
1227 subscription.product = product
1228 subscription.subscription_product_prices = [
1229 SubscriptionProductPrice.from_price(price, seats=subscription.seats)
1230 for price in prices
1231 ]
1232 assert product.recurring_interval is not None
1233 assert product.recurring_interval_count is not None
1234 subscription.recurring_interval = product.recurring_interval
1235 subscription.recurring_interval_count = product.recurring_interval_count
1237 if proration_behavior is None:
1238 proration_behavior = organization.proration_behavior
1240 if subscription.stripe_subscription_id:
1241 # Stripe behavior
1242 stripe_price_ids = [
1243 price.stripe_price_id for price in prices if is_static_price(price)
1244 ]
1246 # If no static prices (only metered), create a placeholder price
1247 if len(stripe_price_ids) == 0:
1248 placeholder_price = await stripe_service.create_placeholder_price(
1249 product,
1250 subscription.currency,
1251 idempotency_key=f"subscription_update_{subscription.id}_placeholder",
1252 )
1253 stripe_price_ids.append(placeholder_price.id)
1255 await stripe_service.update_subscription_price(
1256 subscription.stripe_subscription_id,
1257 new_prices=stripe_price_ids,
1258 proration_behavior=proration_behavior.to_stripe(),
1259 metadata={
1260 "type": ProductType.product,
1261 "product_id": str(product.id),
1262 },
1263 )
1265 session.add(subscription)
1266 await session.flush()
1267 else:
1268 now = datetime.now(UTC)
1270 # Cycle end can change in the case of e.g. monthly to yearly
1271 old_cycle_start = subscription.current_period_start
1272 old_cycle_end = previous_product.recurring_interval.get_next_period(
1273 subscription.current_period_start, subscription.recurring_interval_count
1274 )
1276 if previous_product.recurring_interval != product.recurring_interval:
1277 # If switching from monthly to yearly or yearly to monthly, we
1278 # set the cycle start to now
1279 subscription.current_period_start = now
1281 new_cycle_start = subscription.current_period_start
1282 new_cycle_end = subscription.recurring_interval.get_next_period(
1283 subscription.current_period_start, subscription.recurring_interval_count
1284 )
1286 old_cycle_pct_remaining = self._calculate_time_proration(
1287 old_cycle_start, old_cycle_end, now
1288 )
1289 new_cycle_pct_remaining = self._calculate_time_proration(
1290 new_cycle_start, new_cycle_end, now
1291 )
1293 # If no time remaining, skip prorations
1294 if old_cycle_pct_remaining is None or new_cycle_pct_remaining is None:
1295 old_cycle_pct_remaining = Decimal(0)
1296 new_cycle_pct_remaining = Decimal(0)
1298 subscription.current_period_end = new_cycle_end
1300 # Admittedly, this gets a little crazy, but in theory you could go
1301 # from a product with 1 static price to one with 2 static prices or
1302 # the other way around. We don't generally support multiple static
1303 # prices.
1304 #
1305 # But should we get there, we'll debit you for both of those prices.
1306 # Similarly, if going from 2 static prices to 1 static price, we'll
1307 # credit you for both prices and debit you for the 1 price.
1308 #
1309 # Metered prices are ignored for prorations.
1310 old_static_prices = [p for p in previous_prices if is_static_price(p)]
1311 new_static_prices = [p for p in product.prices if is_static_price(p)]
1313 for old_price in old_static_prices:
1314 # Free prices don't get prorated
1315 if not is_fixed_price(old_price):
1316 continue
1318 base_amount = old_price.price_amount
1319 discount_amount = 0
1320 if subscription.discount:
1321 discount_amount = subscription.discount.get_discount_amount(
1322 base_amount
1323 )
1325 # Prorations have discounts applied to the `BillingEntry.amount`
1326 # immediately.
1327 # This is because we're really applying the discount from "this" cycle
1328 # whereas the `cycle` and `meter` BillingEntries should use the
1329 # discount from the _next_ cycle -- the discount that applies to
1330 # that upcoming order. applies to next order applies to the
1331 # For example, if you have a flat "$20 off" discount, part of that
1332 # $20 discount should _not_ apply to the prorations because the
1333 # prorations are happening "this cycle" and shouldn't take away
1334 # from next cycle's discount.
1335 entry_unused_time = BillingEntry(
1336 type=BillingEntryType.proration,
1337 direction=BillingEntryDirection.credit,
1338 start_timestamp=now,
1339 end_timestamp=old_cycle_end,
1340 amount=round(
1341 (base_amount - discount_amount) * old_cycle_pct_remaining
1342 ),
1343 discount_amount=discount_amount,
1344 currency=subscription.currency,
1345 customer=subscription.customer,
1346 product_price=old_price,
1347 subscription=subscription,
1348 event=event,
1349 )
1350 session.add(entry_unused_time)
1352 if previous_product.recurring_interval == product.recurring_interval:
1353 # If switching from monthly to yearly or yearly to monthly, we trigger a cycle immediately
1354 # that means a debit billing entry for the new cycle will be added automatically.
1355 # So debit prorations only apply when the cycle interval is the same.
1356 for new_price in new_static_prices:
1357 # Free prices don't get prorated
1358 if not is_fixed_price(new_price):
1359 continue
1361 base_amount = new_price.price_amount
1362 discount_amount = 0
1363 if subscription.discount and subscription.discount.is_applicable(
1364 new_price.product
1365 ):
1366 discount_amount = subscription.discount.get_discount_amount(
1367 base_amount
1368 )
1369 entry_remaining_time = BillingEntry(
1370 type=BillingEntryType.proration,
1371 direction=BillingEntryDirection.debit,
1372 start_timestamp=now,
1373 end_timestamp=new_cycle_end,
1374 amount=round(
1375 (base_amount - discount_amount) * new_cycle_pct_remaining
1376 ),
1377 discount_amount=discount_amount,
1378 currency=subscription.currency,
1379 customer=subscription.customer,
1380 product_price=new_price,
1381 subscription=subscription,
1382 event=event,
1383 )
1384 session.add(entry_remaining_time)
1386 session.add(subscription)
1387 await session.flush()
1389 if previous_product.recurring_interval != product.recurring_interval:
1390 # If switching from monthly to yearly or yearly to monthly, we trigger a cycle immediately
1391 await self.cycle(session, subscription, update_cycle_dates=False)
1392 elif proration_behavior == SubscriptionProrationBehavior.invoice:
1393 # Invoice immediately
1394 enqueue_job(
1395 "order.create_subscription_order",
1396 subscription.id,
1397 OrderBillingReasonInternal.subscription_update,
1398 )
1399 elif proration_behavior == SubscriptionProrationBehavior.prorate:
1400 # Add prorations to next invoice
1401 pass
1403 await self.enqueue_benefits_grants(session, subscription)
1405 # Send product change email notification
1406 await self.send_subscription_updated_email(
1407 session, subscription, product, proration_behavior
1408 )
1410 # Trigger subscription updated events and re-evaluate benefits
1411 await self._after_subscription_updated(
1412 session,
1413 subscription,
1414 previous_status=previous_status,
1415 previous_is_canceled=previous_is_canceled,
1416 )
1418 return subscription
1420 async def update_discount( 1a
1421 self,
1422 session: AsyncSession,
1423 locker: Locker,
1424 subscription: Subscription,
1425 *,
1426 discount_id: uuid.UUID | None = None,
1427 ) -> Subscription:
1428 discount: Discount | None = None
1430 if discount_id is not None:
1431 discount = await discount_service.get_by_id_and_organization(
1432 session,
1433 discount_id,
1434 subscription.organization,
1435 products=[subscription.product],
1436 )
1437 if discount is None:
1438 raise PolarRequestValidationError(
1439 [
1440 {
1441 "type": "value_error",
1442 "loc": ("body", "discount_id"),
1443 "msg": (
1444 "Discount does not exist, "
1445 "is not applicable to this product "
1446 "or is not redeemable."
1447 ),
1448 "input": discount_id,
1449 }
1450 ]
1451 )
1452 if discount == subscription.discount:
1453 raise PolarRequestValidationError(
1454 [
1455 {
1456 "type": "value_error",
1457 "loc": ("body", "discount_id"),
1458 "msg": "This discount is already applied to the subscription.",
1459 "input": discount_id,
1460 }
1461 ]
1462 )
1464 async def _update_discount(
1465 session: AsyncSession,
1466 subscription: Subscription,
1467 discount: Discount | None,
1468 ) -> Subscription:
1469 if subscription.stripe_subscription_id is not None:
1470 old_coupon_id = (
1471 subscription.discount.stripe_coupon_id
1472 if subscription.discount is not None
1473 else None
1474 )
1475 new_coupon_id = (
1476 discount.stripe_coupon_id if discount is not None else None
1477 )
1478 await stripe_service.update_subscription_discount(
1479 subscription.stripe_subscription_id, old_coupon_id, new_coupon_id
1480 )
1481 repository = SubscriptionRepository.from_session(session)
1482 return await repository.update(
1483 subscription, update_dict={"discount": discount}, flush=True
1484 )
1486 if discount is None:
1487 return await _update_discount(session, subscription, None)
1489 async with discount_service.redeem_discount(
1490 session, locker, discount
1491 ) as discount_redemption:
1492 discount_redemption.subscription = subscription
1493 return await _update_discount(session, subscription, discount)
1495 async def update_trial( 1a
1496 self,
1497 session: AsyncSession,
1498 subscription: Subscription,
1499 *,
1500 trial_end: datetime | Literal["now"],
1501 ) -> Subscription:
1502 if subscription.stripe_subscription_id is not None:
1503 raise SubscriptionManagedByStripe(subscription)
1505 if not subscription.active:
1506 raise InactiveSubscription(subscription)
1508 previous_status = subscription.status
1509 previous_is_canceled = subscription.canceled
1511 # Already trialing
1512 if subscription.trialing:
1513 # End trial immediately
1514 if trial_end == "now":
1515 subscription.trial_end = subscription.current_period_end = utc_now()
1516 # Make sure to cycle the subscription immediately to update status and trigger order
1517 subscription = await self.cycle(session, subscription)
1518 # Set new trial end date
1519 else:
1520 subscription.trial_end = subscription.current_period_end = cast(
1521 datetime, trial_end
1522 )
1523 # Active subscription
1524 else:
1525 # Can't end trial if not trialing
1526 if trial_end == "now":
1527 raise PolarRequestValidationError(
1528 [
1529 {
1530 "type": "value_error",
1531 "loc": ("body", "trial_end"),
1532 "msg": "The subscription is not currently trialing.",
1533 "input": trial_end,
1534 }
1535 ]
1536 )
1537 # Set a new trial
1538 else:
1539 trial_end_datetime = cast(datetime, trial_end)
1540 # Ensure trial_end is after current_period_end to prevent customer loss
1541 if (
1542 subscription.current_period_end is not None
1543 and trial_end_datetime <= subscription.current_period_end
1544 ):
1545 raise PolarRequestValidationError(
1546 [
1547 {
1548 "type": "value_error",
1549 "loc": ("body", "trial_end"),
1550 "msg": "Trial end must be after the current period end.",
1551 "input": trial_end_datetime,
1552 }
1553 ]
1554 )
1555 subscription.status = SubscriptionStatus.trialing
1556 subscription.trial_end = subscription.current_period_end = (
1557 trial_end_datetime
1558 )
1560 repository = SubscriptionRepository.from_session(session)
1561 subscription = await repository.update(subscription)
1563 await self._after_subscription_updated(
1564 session,
1565 subscription,
1566 previous_status=previous_status,
1567 previous_is_canceled=previous_is_canceled,
1568 )
1570 return subscription
1572 async def update_seats( 1a
1573 self,
1574 session: AsyncSession,
1575 subscription: Subscription,
1576 *,
1577 seats: int,
1578 proration_behavior: SubscriptionProrationBehavior | None = None,
1579 ) -> Subscription:
1580 """
1581 Update the number of seats for a seat-based subscription.
1583 Validates:
1584 - Subscription is seat-based
1585 - Subscription is active (not canceled/trialing)
1586 - New seat count >= minimum from pricing tiers
1587 - New seat count >= currently assigned seats
1589 Creates proration billing entry for the cost difference.
1590 """
1591 if subscription.stripe_subscription_id is not None:
1592 raise SubscriptionManagedByStripe(subscription)
1594 if subscription.revoked or subscription.cancel_at_period_end:
1595 raise AlreadyCanceledSubscription(subscription)
1597 if subscription.trialing:
1598 raise TrialingSubscription(subscription)
1600 seat_price = self._get_seat_based_price(subscription)
1601 if seat_price is None:
1602 raise NotASeatBasedSubscription(subscription)
1604 minimum_seats = self._get_minimum_seats_from_tiers(seat_price)
1605 if seats < minimum_seats:
1606 raise BelowMinimumSeats(subscription, minimum_seats, seats)
1608 assigned_count = await seat_service.count_assigned_seats_for_subscription(
1609 session, subscription
1610 )
1612 if seats < assigned_count:
1613 raise SeatsAlreadyAssigned(subscription, assigned_count, seats)
1615 old_seats = subscription.seats or 1
1616 old_amount = subscription.amount
1618 subscription.seats = seats
1620 subscription.subscription_product_prices = [
1621 SubscriptionProductPrice.from_price(spp.product_price, seats=seats)
1622 for spp in subscription.subscription_product_prices
1623 ]
1625 organization_repository = OrganizationRepository.from_session(session)
1626 organization = await organization_repository.get_by_id(
1627 subscription.product.organization_id
1628 )
1629 assert organization is not None
1631 if proration_behavior is None:
1632 proration_behavior = organization.proration_behavior
1634 event = await event_service.create_event(
1635 session,
1636 build_system_event(
1637 SystemEvent.subscription_seats_updated,
1638 customer=subscription.customer,
1639 organization=subscription.organization,
1640 metadata={
1641 "subscription_id": str(subscription.id),
1642 "old_seats": old_seats,
1643 "new_seats": seats,
1644 "proration_behavior": proration_behavior.value,
1645 },
1646 ),
1647 )
1649 await self._create_seat_proration_entry(
1650 session,
1651 subscription,
1652 old_seats=old_seats,
1653 new_seats=seats,
1654 old_amount=old_amount,
1655 new_amount=subscription.amount,
1656 proration_behavior=proration_behavior,
1657 event=event,
1658 )
1660 session.add(subscription)
1661 await session.flush()
1663 log.info(
1664 "subscription.seats_updated",
1665 subscription_id=subscription.id,
1666 old_seats=old_seats,
1667 new_seats=seats,
1668 old_amount=old_amount,
1669 new_amount=subscription.amount,
1670 )
1672 # Send webhooks and notifications
1673 previous_status = subscription.status
1674 previous_is_canceled = subscription.canceled
1676 await self._after_subscription_updated(
1677 session,
1678 subscription,
1679 previous_status=previous_status,
1680 previous_is_canceled=previous_is_canceled,
1681 )
1683 return subscription
1685 async def update_currrent_billing_period_end( 1a
1686 self,
1687 session: AsyncSession,
1688 subscription: Subscription,
1689 *,
1690 new_period_end: datetime,
1691 ) -> Subscription:
1692 if subscription.revoked:
1693 raise AlreadyCanceledSubscription(subscription)
1695 if not subscription.active:
1696 raise InactiveSubscription(subscription)
1698 if subscription.cancel_at_period_end:
1699 raise AlreadyCanceledSubscription(subscription)
1701 if subscription.current_period_end is None:
1702 raise PolarRequestValidationError(
1703 [
1704 {
1705 "type": "value_error",
1706 "loc": ("body", "current_billing_period_end"),
1707 "msg": "Subscription has no current period end",
1708 "input": new_period_end,
1709 }
1710 ]
1711 )
1713 if new_period_end < subscription.current_period_end:
1714 raise PolarRequestValidationError(
1715 [
1716 {
1717 "type": "value_error",
1718 "loc": ("body", "current_billing_period_end"),
1719 "msg": "New period end is earlier than the current period end",
1720 "input": new_period_end,
1721 }
1722 ]
1723 )
1725 previous_status = subscription.status
1726 previous_is_canceled = subscription.canceled
1727 old_period_end = subscription.current_period_end
1729 subscription.current_period_end = new_period_end
1731 await event_service.create_event(
1732 session,
1733 build_system_event(
1734 SystemEvent.subscription_billing_period_updated,
1735 customer=subscription.customer,
1736 organization=subscription.organization,
1737 metadata={
1738 "subscription_id": str(subscription.id),
1739 "old_period_end": old_period_end.isoformat(),
1740 "new_period_end": new_period_end.isoformat(),
1741 },
1742 ),
1743 )
1745 repository = SubscriptionRepository.from_session(session)
1746 subscription = await repository.update(subscription)
1748 await self._after_subscription_updated(
1749 session,
1750 subscription,
1751 previous_status=previous_status,
1752 previous_is_canceled=previous_is_canceled,
1753 )
1755 return subscription
1757 async def _create_seat_proration_entry( 1a
1758 self,
1759 session: AsyncSession,
1760 subscription: Subscription,
1761 *,
1762 old_seats: int,
1763 new_seats: int,
1764 old_amount: int,
1765 new_amount: int,
1766 proration_behavior: SubscriptionProrationBehavior,
1767 event: "Event",
1768 ) -> None:
1769 """
1770 Create a billing entry for the seat quantity change proration.
1772 Prorates based on remaining time in current billing period.
1773 """
1774 now = datetime.now(UTC)
1775 proration_factor = self._calculate_proration_factor(subscription, now=now)
1777 if proration_factor is None:
1778 log.warning(
1779 "subscription.seats_proration_skipped",
1780 subscription_id=subscription.id,
1781 reason="no_time_remaining",
1782 )
1783 return
1785 period_end = subscription.current_period_end
1786 assert period_end is not None # Already checked by _calculate_proration_factor
1788 # Calculate the raw amounts for the seat counts (before discount)
1789 seat_price = self._get_seat_based_price(subscription)
1790 assert seat_price is not None
1792 old_base_amount = seat_price.calculate_amount(old_seats)
1793 new_base_amount = seat_price.calculate_amount(new_seats)
1794 base_amount_delta = new_base_amount - old_base_amount
1796 # Calculate discount on the delta amount
1797 discount_amount = 0
1798 if subscription.discount and subscription.discount.is_applicable(
1799 subscription.product
1800 ):
1801 discount_amount = subscription.discount.get_discount_amount(
1802 abs(base_amount_delta)
1803 )
1805 # Calculate the net amount delta after discount
1806 if base_amount_delta > 0:
1807 # Increase: reduce the charge by discount
1808 amount_delta = base_amount_delta - discount_amount
1809 else:
1810 # Decrease: reduce the credit by discount
1811 amount_delta = base_amount_delta + discount_amount
1813 prorated_amount = int(Decimal(amount_delta) * proration_factor)
1815 if prorated_amount == 0:
1816 return
1818 if prorated_amount > 0:
1819 direction = BillingEntryDirection.debit
1820 entry_type = BillingEntryType.subscription_seats_increase
1821 else:
1822 direction = BillingEntryDirection.credit
1823 entry_type = BillingEntryType.subscription_seats_decrease
1824 prorated_amount = abs(prorated_amount)
1826 # Calculate prorated discount amount
1827 prorated_discount_amount = 0
1828 if discount_amount > 0:
1829 prorated_discount_amount = int(Decimal(discount_amount) * proration_factor)
1831 billing_entry = BillingEntry(
1832 start_timestamp=now,
1833 end_timestamp=period_end,
1834 subscription=subscription,
1835 customer=subscription.customer,
1836 product_price=seat_price,
1837 amount=prorated_amount,
1838 discount_amount=prorated_discount_amount
1839 if prorated_discount_amount > 0
1840 else None,
1841 discount=subscription.discount if discount_amount > 0 else None,
1842 currency=subscription.currency,
1843 direction=direction,
1844 type=entry_type,
1845 event=event,
1846 )
1848 session.add(billing_entry)
1850 if proration_behavior == SubscriptionProrationBehavior.invoice:
1851 enqueue_job(
1852 "order.create_subscription_order",
1853 subscription.id,
1854 OrderBillingReasonInternal.subscription_update,
1855 )
1857 async def uncancel( 1a
1858 self, session: AsyncSession, subscription: Subscription
1859 ) -> Subscription:
1860 if subscription.ended_at:
1861 raise ResourceUnavailable()
1863 if not (
1864 subscription.status in SubscriptionStatus.billable_statuses()
1865 and subscription.cancel_at_period_end
1866 ):
1867 raise BadRequest()
1869 previous_status = subscription.status
1870 previous_is_canceled = subscription.canceled
1872 # Managed by Stripe
1873 if subscription.stripe_subscription_id is not None:
1874 stripe_subscription = await stripe_service.uncancel_subscription(
1875 subscription.stripe_subscription_id,
1876 )
1877 self.update_cancellation_from_stripe(subscription, stripe_subscription)
1878 # Managed by our billing
1879 else:
1880 subscription.cancel_at_period_end = False
1881 subscription.ends_at = None
1883 subscription.canceled_at = None
1884 subscription.customer_cancellation_reason = None
1885 subscription.customer_cancellation_comment = None
1886 session.add(subscription)
1888 await self._after_subscription_updated(
1889 session,
1890 subscription,
1891 previous_status=previous_status,
1892 previous_is_canceled=previous_is_canceled,
1893 )
1894 return subscription
1896 async def revoke( 1a
1897 self,
1898 session: AsyncSession,
1899 subscription: Subscription,
1900 *,
1901 customer_reason: CustomerCancellationReason | None = None,
1902 customer_comment: str | None = None,
1903 ) -> Subscription:
1904 return await self._perform_cancellation(
1905 session,
1906 subscription,
1907 customer_reason=customer_reason,
1908 customer_comment=customer_comment,
1909 immediately=True,
1910 )
1912 async def cancel( 1a
1913 self,
1914 session: AsyncSession,
1915 subscription: Subscription,
1916 *,
1917 customer_reason: CustomerCancellationReason | None = None,
1918 customer_comment: str | None = None,
1919 ) -> Subscription:
1920 return await self._perform_cancellation(
1921 session,
1922 subscription,
1923 customer_reason=customer_reason,
1924 customer_comment=customer_comment,
1925 )
1927 async def cancel_customer( 1a
1928 self, session: AsyncSession, customer_id: uuid.UUID
1929 ) -> None:
1930 subscription_repository = SubscriptionRepository.from_session(session)
1931 subscriptions = await subscription_repository.list_active_by_customer(
1932 customer_id
1933 )
1934 for subscription in subscriptions:
1935 await self._perform_cancellation(session, subscription, immediately=True)
1937 async def update_from_stripe( 1a
1938 self,
1939 session: AsyncSession,
1940 locker: Locker,
1941 *,
1942 stripe_subscription: stripe_lib.Subscription,
1943 ) -> Subscription:
1944 """
1945 Since Stripe manages the billing cycle, listen for their webhooks and update the
1946 status and dates accordingly.
1947 """
1948 repository = SubscriptionRepository.from_session(session)
1949 subscription = await repository.get_by_stripe_subscription_id(
1950 stripe_subscription.id, options=repository.get_eager_options()
1951 )
1953 if subscription is None:
1954 raise SubscriptionDoesNotExist(stripe_subscription.id)
1956 # Subscription that has been migrated to Polar, ignore the update
1957 if subscription.stripe_subscription_id is None:
1958 log.info(
1959 "Received Stripe update for a subscription that has been migrated to Polar, ignoring.",
1960 id=subscription.id,
1961 )
1962 return subscription
1964 async with self.lock(locker, subscription):
1965 previous_status = subscription.status
1966 previous_is_canceled = subscription.canceled
1968 subscription.status = SubscriptionStatus(stripe_subscription.status)
1969 subscription.current_period_start = _from_timestamp(
1970 stripe_subscription.current_period_start
1971 )
1972 subscription.current_period_end = _from_timestamp(
1973 stripe_subscription.current_period_end
1974 )
1975 subscription.set_started_at()
1976 self.update_cancellation_from_stripe(subscription, stripe_subscription)
1977 # Reset discount if it has expired
1978 if (
1979 len(stripe_subscription.discounts) == 0
1980 and subscription.discount is not None
1981 ):
1982 subscription.discount = None
1984 # Update payment method
1985 if stripe_subscription.default_payment_method is not None:
1986 stripe_payment_method = await stripe_service.get_payment_method(
1987 get_expandable_id(stripe_subscription.default_payment_method)
1988 )
1989 payment_method = await payment_method_service.upsert_from_stripe(
1990 session, subscription.customer, stripe_payment_method
1991 )
1992 subscription.payment_method = payment_method
1994 subscription = await repository.update(subscription)
1996 await self.enqueue_benefits_grants(session, subscription)
1997 await self._after_subscription_updated(
1998 session,
1999 subscription,
2000 previous_status=previous_status,
2001 previous_is_canceled=previous_is_canceled,
2002 )
2003 return subscription
2005 async def _perform_cancellation( 1a
2006 self,
2007 session: AsyncSession,
2008 subscription: Subscription,
2009 *,
2010 customer_reason: CustomerCancellationReason | None = None,
2011 customer_comment: str | None = None,
2012 immediately: bool = False,
2013 ) -> Subscription:
2014 if not subscription.can_cancel(immediately):
2015 raise AlreadyCanceledSubscription(subscription)
2017 previous_status = subscription.status
2018 previous_is_canceled = subscription.canceled
2020 now = utc_now()
2021 subscription.canceled_at = now
2023 if customer_reason:
2024 subscription.customer_cancellation_reason = customer_reason
2026 if customer_comment:
2027 subscription.customer_cancellation_comment = customer_comment
2029 # Managed by Stripe
2030 if subscription.stripe_subscription_id is not None:
2031 reason = customer_reason.value if customer_reason else None
2032 if immediately:
2033 stripe_subscription = await stripe_service.revoke_subscription(
2034 subscription.stripe_subscription_id,
2035 customer_reason=reason, # type: ignore
2036 customer_comment=customer_comment,
2037 )
2038 else:
2039 stripe_subscription = await stripe_service.cancel_subscription(
2040 subscription.stripe_subscription_id,
2041 customer_reason=reason, # type: ignore
2042 customer_comment=customer_comment,
2043 )
2045 subscription.status = SubscriptionStatus(stripe_subscription.status)
2046 self.update_cancellation_from_stripe(subscription, stripe_subscription)
2047 # Managed by our billing
2048 else:
2049 if immediately:
2050 subscription.ends_at = now
2051 subscription.ended_at = now
2052 subscription.status = SubscriptionStatus.canceled
2053 await self.enqueue_benefits_grants(session, subscription)
2054 else:
2055 subscription.cancel_at_period_end = True
2056 subscription.ends_at = subscription.current_period_end
2058 log.info(
2059 "subscription.canceled",
2060 id=subscription.id,
2061 status=subscription.status,
2062 immediately=immediately,
2063 ends_at=subscription.ends_at,
2064 ended_at=subscription.ended_at,
2065 reason=customer_reason,
2066 )
2067 session.add(subscription)
2069 # Trigger hooks since we update subscriptions directly upon cancellation
2070 # Doing so upon Stripe webhooks would prevent us from truly
2071 # knowing/identifying changes made, i.e cancellations.
2072 await self._after_subscription_updated(
2073 session,
2074 subscription,
2075 previous_status=previous_status,
2076 previous_is_canceled=previous_is_canceled,
2077 )
2078 return subscription
2080 def update_cancellation_from_stripe( 1a
2081 self, subscription: Subscription, stripe_subscription: stripe_lib.Subscription
2082 ) -> None:
2083 previous_ends_at = subscription.ends_at
2085 subscription.cancel_at_period_end = stripe_subscription.cancel_at_period_end
2086 subscription.ended_at = _from_timestamp(stripe_subscription.ended_at)
2088 is_canceled = subscription.cancel_at_period_end or subscription.ended_at
2089 is_uncanceled = previous_ends_at and not is_canceled
2090 if not is_canceled or is_uncanceled:
2091 subscription.ends_at = None
2092 subscription.canceled_at = None
2093 return
2095 if subscription.ended_at:
2096 subscription.ends_at = subscription.ended_at
2097 elif subscription.cancel_at_period_end:
2098 subscription.ends_at = subscription.current_period_end
2100 # Use our own if set already (more accurate).
2101 canceled_at = _from_timestamp(stripe_subscription.canceled_at)
2102 if canceled_at and not subscription.canceled_at:
2103 subscription.canceled_at = canceled_at
2105 async def update_meters( 1a
2106 self, session: AsyncSession, subscription: Subscription
2107 ) -> Subscription:
2108 # First reset all meters, since we're computing from every entry
2109 for subscription_meter in subscription.meters:
2110 subscription_meter.reset()
2112 async for (
2113 line_item,
2114 _,
2115 ) in billing_entry_service.compute_pending_subscription_line_items(
2116 session, subscription
2117 ):
2118 if not isinstance(line_item, MeteredLineItem):
2119 continue
2120 subscription_meter_line = subscription.get_meter(line_item.price.meter)
2121 if subscription_meter_line is not None:
2122 subscription_meter_line.consumed_units += Decimal(
2123 line_item.consumed_units
2124 )
2125 subscription_meter_line.credited_units += line_item.credited_units
2126 subscription_meter_line.amount += line_item.amount
2128 session.add(subscription)
2129 await self._after_subscription_updated(
2130 session,
2131 subscription,
2132 previous_status=subscription.status,
2133 previous_is_canceled=subscription.canceled,
2134 )
2136 return subscription
2138 async def calculate_charge_preview( 1a
2139 self, session: AsyncSession, subscription: Subscription
2140 ) -> SubscriptionChargePreview:
2141 """
2142 Calculate a preview of the next charge for a subscription.
2144 Args:
2145 session: Database session
2146 subscription: The subscription to calculate the preview for
2148 Returns:
2149 SubscriptionChargePreview with breakdown of charges
2150 """
2151 # If subscription is set to cancel at period end, there's no base charge
2152 # Only metered charges accumulated during the period will be billed
2153 if subscription.cancel_at_period_end or subscription.ends_at:
2154 base_price = 0
2155 else:
2156 base_price = sum(p.amount for p in subscription.subscription_product_prices)
2158 metered_amount = sum(meter.amount for meter in subscription.meters)
2160 subtotal_amount = base_price + metered_amount
2162 discount_amount = 0
2164 applicable_discount = None
2166 # Ensure the discount has not expired yet for the next charge (so at current_period_end)
2167 if subscription.discount is not None:
2168 assert subscription.started_at is not None
2169 assert subscription.current_period_end is not None
2170 if not subscription.discount.is_repetition_expired(
2171 subscription.started_at,
2172 subscription.current_period_end,
2173 subscription.status == SubscriptionStatus.trialing,
2174 ):
2175 applicable_discount = subscription.discount
2177 if applicable_discount is not None:
2178 discount_amount = applicable_discount.get_discount_amount(subtotal_amount)
2180 taxable_amount = subtotal_amount - discount_amount
2182 tax_amount = 0
2184 if (
2185 taxable_amount > 0
2186 and subscription.product.is_tax_applicable
2187 and subscription.customer.billing_address is not None
2188 ):
2189 tax = await calculate_tax(
2190 subscription.id,
2191 subscription.currency,
2192 taxable_amount,
2193 subscription.product.tax_code,
2194 subscription.customer.billing_address,
2195 [subscription.customer.tax_id]
2196 if subscription.customer.tax_id is not None
2197 else [],
2198 subscription.tax_exempted,
2199 )
2201 tax_amount = tax["amount"]
2203 total = taxable_amount + tax_amount
2205 return SubscriptionChargePreview(
2206 base_amount=base_price,
2207 metered_amount=metered_amount,
2208 subtotal_amount=subtotal_amount,
2209 discount_amount=discount_amount,
2210 tax_amount=tax_amount,
2211 total_amount=total,
2212 )
2214 async def _after_subscription_updated( 1a
2215 self,
2216 session: AsyncSession,
2217 subscription: Subscription,
2218 *,
2219 previous_status: SubscriptionStatus,
2220 previous_is_canceled: bool,
2221 ) -> None:
2222 await self._on_subscription_updated(session, subscription)
2224 became_activated = subscription.active and not SubscriptionStatus.is_active(
2225 previous_status
2226 )
2227 became_reactivated = (
2228 became_activated and previous_status == SubscriptionStatus.past_due
2229 )
2230 became_past_due = (
2231 subscription.status == SubscriptionStatus.past_due
2232 and previous_status != SubscriptionStatus.past_due
2233 )
2234 became_canceled = subscription.canceled and not previous_is_canceled
2235 became_uncanceled = not subscription.canceled and previous_is_canceled
2236 became_revoked = subscription.revoked and not SubscriptionStatus.is_revoked(
2237 previous_status
2238 )
2240 if became_activated:
2241 await self._on_subscription_activated(
2242 session, subscription, became_reactivated
2243 )
2245 if became_uncanceled:
2246 await self._on_subscription_uncanceled(session, subscription)
2248 if became_past_due:
2249 await self._on_subscription_past_due(session, subscription)
2251 if became_canceled:
2252 await self._on_subscription_canceled(
2253 session, subscription, revoked=became_revoked
2254 )
2256 if became_revoked:
2257 await self._on_subscription_revoked(
2258 session, subscription, past_due=became_past_due
2259 )
2261 enqueue_job(
2262 "customer.webhook",
2263 WebhookEventType.customer_state_changed,
2264 subscription.customer_id,
2265 )
2267 async def _on_subscription_updated( 1a
2268 self,
2269 session: AsyncSession,
2270 subscription: Subscription,
2271 ) -> None:
2272 await self._send_webhook(
2273 session, subscription, WebhookEventType.subscription_updated
2274 )
2276 async def _on_subscription_activated( 1a
2277 self,
2278 session: AsyncSession,
2279 subscription: Subscription,
2280 reactivated: bool,
2281 ) -> None:
2282 await self._send_webhook(
2283 session, subscription, WebhookEventType.subscription_active
2284 )
2286 # Only send merchant notification if the subscription is a new one,
2287 # not a past due that has been reactivated.
2288 if not reactivated:
2289 await self._send_new_subscription_notification(session, subscription)
2291 async def _on_subscription_past_due( 1a
2292 self, session: AsyncSession, subscription: Subscription
2293 ) -> None:
2294 await self.send_past_due_email(session, subscription)
2296 async def _on_subscription_uncanceled( 1a
2297 self,
2298 session: AsyncSession,
2299 subscription: Subscription,
2300 ) -> None:
2301 await self._send_webhook(
2302 session, subscription, WebhookEventType.subscription_uncanceled
2303 )
2304 await self.send_uncanceled_email(session, subscription)
2306 async def _on_subscription_canceled( 1a
2307 self,
2308 session: AsyncSession,
2309 subscription: Subscription,
2310 revoked: bool,
2311 ) -> None:
2312 await self._send_webhook(
2313 session, subscription, WebhookEventType.subscription_canceled
2314 )
2316 # Only send cancellation email if the subscription is not revoked,
2317 # as revocation has its own email.
2318 if not revoked:
2319 await self.send_cancellation_email(session, subscription)
2321 async def _on_subscription_revoked( 1a
2322 self,
2323 session: AsyncSession,
2324 subscription: Subscription,
2325 past_due: bool,
2326 ) -> None:
2327 await self._send_webhook(
2328 session, subscription, WebhookEventType.subscription_revoked
2329 )
2331 # Only send revoked email if the subscription is not past due,
2332 # as past due has its own email.
2333 if not past_due:
2334 await self.send_revoked_email(session, subscription)
2336 async def _send_new_subscription_notification( 1a
2337 self, session: AsyncSession, subscription: Subscription
2338 ) -> None:
2339 product = subscription.product
2341 if product.organization.notification_settings["new_subscription"]:
2342 await notifications_service.send_to_org_members(
2343 session,
2344 org_id=product.organization_id,
2345 notif=PartialNotification(
2346 type=NotificationType.maintainer_new_paid_subscription,
2347 payload=MaintainerNewPaidSubscriptionNotificationPayload(
2348 subscriber_name=subscription.customer.email,
2349 tier_name=product.name,
2350 tier_price_amount=subscription.amount,
2351 tier_price_recurring_interval=subscription.recurring_interval,
2352 tier_organization_name=subscription.organization.name,
2353 ),
2354 ),
2355 )
2357 async def _send_webhook( 1a
2358 self,
2359 session: AsyncSession,
2360 subscription: Subscription,
2361 event_type: Literal[
2362 WebhookEventType.subscription_created,
2363 WebhookEventType.subscription_updated,
2364 WebhookEventType.subscription_active,
2365 WebhookEventType.subscription_canceled,
2366 WebhookEventType.subscription_uncanceled,
2367 WebhookEventType.subscription_revoked,
2368 ],
2369 ) -> None:
2370 repository = SubscriptionRepository.from_session(session)
2371 subscription = cast(
2372 Subscription,
2373 await repository.get_by_id(
2374 subscription.id, options=repository.get_eager_options()
2375 ),
2376 )
2377 product_repository = ProductRepository.from_session(session)
2378 product = await product_repository.get_by_id(
2379 subscription.product_id, options=product_repository.get_eager_options()
2380 )
2381 if product is not None:
2382 await webhook_service.send(
2383 session, product.organization, event_type, subscription
2384 )
2386 async def _is_within_revocation_grace_period( 1a
2387 self,
2388 session: AsyncSession,
2389 subscription: Subscription,
2390 organization: Organization,
2391 ) -> bool:
2392 """Check if a subscription is within its benefit revocation grace period.
2394 Returns True if within grace period (benefits should not be revoked yet).
2395 Returns False if grace period has expired or doesn't apply.
2396 """
2397 if subscription.status not in {
2398 SubscriptionStatus.past_due,
2399 SubscriptionStatus.unpaid,
2400 }:
2401 return False
2403 grace_period_days = int(organization.benefit_revocation_grace_period)
2405 if grace_period_days == 0:
2406 return False
2408 if not subscription.past_due_at:
2409 return False
2411 grace_period_ends_at = subscription.past_due_at + timedelta(
2412 days=grace_period_days
2413 )
2414 now = utc_now()
2416 if now < grace_period_ends_at:
2417 log.info(
2418 "Subscription is within benefit revocation grace period",
2419 subscription_id=str(subscription.id),
2420 customer_id=str(subscription.customer_id),
2421 past_due_at=subscription.past_due_at.isoformat(),
2422 grace_period_ends_at=grace_period_ends_at.isoformat(),
2423 days_remaining=(grace_period_ends_at - now).days,
2424 )
2425 return True
2427 return False
2429 async def enqueue_benefits_grants( 1a
2430 self, session: AsyncSession, subscription: Subscription
2431 ) -> None:
2432 product_repository = ProductRepository.from_session(session)
2433 product = await product_repository.get_by_id(subscription.product_id)
2434 assert product is not None
2436 if subscription.is_incomplete():
2437 return
2439 task = "grant" if subscription.active else "revoke"
2441 # Check grace period for benefit revocation
2442 if task == "revoke":
2443 organization_repository = OrganizationRepository.from_session(session)
2444 organization = await organization_repository.get_by_id(
2445 product.organization_id
2446 )
2447 assert organization is not None
2449 if await self._is_within_revocation_grace_period(
2450 session, subscription, organization
2451 ):
2452 # Don't enqueue revocation yet, still within grace period
2453 return
2455 # For seat-based products, handle benefits through seats
2456 if product.has_seat_based_price:
2457 # When subscription is cancelled/revoked, revoke all seats
2458 # which will in turn revoke benefits for each seat holder
2459 if not subscription.active:
2460 await seat_service.revoke_all_seats_for_subscription(
2461 session, subscription
2462 )
2463 # When subscription is active, benefits are granted when seats are claimed
2464 # So we don't need to do anything here
2465 return
2467 enqueue_job(
2468 "benefit.enqueue_benefits_grants",
2469 task=task,
2470 customer_id=subscription.customer_id,
2471 product_id=product.id,
2472 subscription_id=subscription.id,
2473 )
2475 async def update_product_benefits_grants( 1a
2476 self, session: AsyncSession, product: Product
2477 ) -> None:
2478 statement = select(Subscription).where(
2479 Subscription.product_id == product.id, Subscription.deleted_at.is_(None)
2480 )
2481 subscriptions = await session.stream_scalars(
2482 statement,
2483 execution_options={"yield_per": settings.DATABASE_STREAM_YIELD_PER},
2484 )
2485 async for subscription in subscriptions:
2486 await self.enqueue_benefits_grants(session, subscription)
2488 async def send_uncanceled_email( 1a
2489 self, session: AsyncSession, subscription: Subscription
2490 ) -> None:
2491 return await self._send_customer_email(
2492 session,
2493 subscription,
2494 subject_template="Your {product.name} subscription is uncanceled",
2495 template_name="subscription_uncanceled",
2496 )
2498 async def send_cancellation_email( 1a
2499 self, session: AsyncSession, subscription: Subscription
2500 ) -> None:
2501 return await self._send_customer_email(
2502 session,
2503 subscription,
2504 subject_template="Your {product.name} subscription cancellation",
2505 template_name="subscription_cancellation",
2506 )
2508 async def send_revoked_email( 1a
2509 self, session: AsyncSession, subscription: Subscription
2510 ) -> None:
2511 return await self._send_customer_email(
2512 session,
2513 subscription,
2514 subject_template="Your {product.name} subscription has ended",
2515 template_name="subscription_revoked",
2516 )
2518 async def send_past_due_email( 1a
2519 self, session: AsyncSession, subscription: Subscription
2520 ) -> None:
2521 """Send past due email to customer with optional payment link."""
2522 payment_url = None
2524 # Try to get payment link from Stripe if available
2525 if subscription.stripe_subscription_id:
2526 try:
2527 stripe_subscription = await stripe_lib.Subscription.retrieve_async(
2528 subscription.stripe_subscription_id
2529 )
2530 if stripe_subscription.latest_invoice:
2531 invoice_id = get_expandable_id(stripe_subscription.latest_invoice)
2532 invoice = await stripe_service.get_invoice(invoice_id)
2533 if invoice.hosted_invoice_url:
2534 payment_url = invoice.hosted_invoice_url
2535 except Exception:
2536 # If we can't get the payment link, continue without it
2537 pass
2539 # Only include payment_url if it's not None
2540 extra_context: dict[str, Any] = {}
2541 if payment_url is not None:
2542 extra_context["payment_url"] = payment_url
2544 return await self._send_customer_email(
2545 session,
2546 subscription,
2547 subject_template="Your {product.name} subscription payment is past due",
2548 template_name="subscription_past_due",
2549 extra_context=extra_context if extra_context else None,
2550 )
2552 async def send_subscription_updated_email( 1a
2553 self,
2554 session: AsyncSession,
2555 subscription: Subscription,
2556 new_product: Product,
2557 proration_behavior: SubscriptionProrationBehavior,
2558 ) -> None:
2559 # Don't send email if invoicing immediately
2560 # It'll be sent after the Order has been created
2561 if proration_behavior == SubscriptionProrationBehavior.invoice:
2562 return
2564 subject = f"Your subscription has changed to {new_product.name}"
2566 return await self._send_customer_email(
2567 session,
2568 subscription,
2569 subject_template=subject,
2570 template_name="subscription_updated",
2571 extra_context={
2572 "order": None,
2573 },
2574 )
2576 async def _send_customer_email( 1a
2577 self,
2578 session: AsyncSession,
2579 subscription: Subscription,
2580 *,
2581 subject_template: str,
2582 template_name: Literal[
2583 "subscription_cancellation",
2584 "subscription_past_due",
2585 "subscription_revoked",
2586 "subscription_uncanceled",
2587 "subscription_updated",
2588 ],
2589 extra_context: dict[str, Any] | None = None,
2590 ) -> None:
2591 product_repository = ProductRepository.from_session(session)
2592 product = await product_repository.get_by_id(
2593 subscription.product_id, options=product_repository.get_eager_options()
2594 )
2595 assert product is not None
2596 product = subscription.product
2597 organization_repository = OrganizationRepository.from_session(session)
2598 organization = await organization_repository.get_by_id(
2599 product.organization_id,
2600 # We block organizations in case of fraud and then refund/cancel
2601 # so make sure we can still fetch them for the purpose of sending
2602 # customer emails.
2603 include_deleted=True,
2604 include_blocked=True,
2605 )
2606 assert organization is not None
2608 if not organization.customer_email_settings[template_name]:
2609 return
2611 customer = subscription.customer
2612 token, _ = await customer_session_service.create_customer_session(
2613 session, customer
2614 )
2616 # Build query parameters with proper URL encoding
2617 query_string = urlencode(
2618 {
2619 "customer_session_token": token,
2620 "id": str(subscription.id),
2621 "email": customer.email,
2622 }
2623 )
2624 portal_url = settings.generate_frontend_url(
2625 f"/{organization.slug}/portal?{query_string}"
2626 )
2628 email = EmailAdapter.validate_python(
2629 {
2630 "template": template_name,
2631 "props": {
2632 "email": subscription.customer.email,
2633 "organization": organization,
2634 "product": product,
2635 "subscription": subscription,
2636 "url": portal_url,
2637 **(extra_context or {}),
2638 },
2639 }
2640 )
2642 body = render_email_template(email)
2644 subject = subject_template.format(product=product)
2646 enqueue_email(
2647 **organization.email_from_reply,
2648 to_email_addr=subscription.customer.email,
2649 subject=subject,
2650 html_content=body,
2651 )
2653 async def _get_outdated_grants( 1a
2654 self,
2655 session: AsyncSession,
2656 subscription: Subscription,
2657 current_subscription_tier: Product,
2658 ) -> Sequence[BenefitGrant]:
2659 subscription_tier_benefits_statement = (
2660 select(Benefit.id)
2661 .join(ProductBenefit)
2662 .where(ProductBenefit.product_id == current_subscription_tier.id)
2663 )
2665 statement = select(BenefitGrant).where(
2666 BenefitGrant.subscription_id == subscription.id,
2667 BenefitGrant.benefit_id.not_in(subscription_tier_benefits_statement),
2668 BenefitGrant.is_granted.is_(True),
2669 BenefitGrant.deleted_at.is_(None),
2670 )
2672 result = await session.execute(statement)
2673 return result.scalars().all()
2675 async def mark_past_due( 1a
2676 self, session: AsyncSession, subscription: Subscription
2677 ) -> Subscription:
2678 """Mark a subscription as past due. Main use case is to set it when payment fails.
2679 When this happens the customer will be notified and lose access to the benefits"""
2681 previous_status = subscription.status
2682 previous_is_canceled = subscription.canceled
2684 repository = SubscriptionRepository.from_session(session)
2685 update_dict: dict[str, Any] = {"status": SubscriptionStatus.past_due}
2686 if subscription.past_due_at is None:
2687 update_dict["past_due_at"] = utc_now()
2688 subscription = await repository.update(subscription, update_dict=update_dict)
2690 # Trigger subscription updated events
2691 await self._after_subscription_updated(
2692 session,
2693 subscription,
2694 previous_status=previous_status,
2695 previous_is_canceled=previous_is_canceled,
2696 )
2697 # Cancel all grants for this subscription
2698 await self.enqueue_benefits_grants(session, subscription)
2700 return subscription
2702 async def mark_active( 1a
2703 self, session: AsyncSession, subscription: Subscription
2704 ) -> Subscription:
2705 """Mark a subscription as active. Used when payment succeeds after being past due."""
2707 previous_status = subscription.status
2708 previous_is_canceled = subscription.canceled
2710 repository = SubscriptionRepository.from_session(session)
2711 subscription = await repository.update(
2712 subscription,
2713 update_dict={"status": SubscriptionStatus.active, "past_due_at": None},
2714 )
2716 await self._after_subscription_updated(
2717 session,
2718 subscription,
2719 previous_status=previous_status,
2720 previous_is_canceled=previous_is_canceled,
2721 )
2722 await self.enqueue_benefits_grants(session, subscription)
2724 return subscription
2726 async def update_payment_method_from_retry( 1a
2727 self,
2728 session: AsyncSession,
2729 subscription: Subscription,
2730 payment_method: PaymentMethod,
2731 ) -> Subscription:
2732 """
2733 Update subscription payment method after successful retry payment.
2735 This method updates both the local subscription record and the Stripe
2736 subscription (if Stripe-managed) to use the new payment method as default.
2737 """
2738 if subscription.stripe_subscription_id:
2739 await stripe_service.set_automatically_charged_subscription(
2740 subscription.stripe_subscription_id, payment_method.processor_id
2741 )
2743 subscription.payment_method = payment_method
2744 repository = SubscriptionRepository.from_session(session)
2745 return await repository.update(subscription)
2747 async def migrate_stripe_subscription( 1a
2748 self, session: AsyncSession, subscription: Subscription
2749 ) -> Subscription:
2750 # Subscription is already migrated, do nothing
2751 if subscription.legacy_stripe_subscription_id is not None:
2752 return subscription
2754 stripe_subscription_id = subscription.stripe_subscription_id
2755 if stripe_subscription_id is None:
2756 raise SubscriptionNotActiveOnStripe(subscription)
2758 # Subscription is already canceled, nothing to do
2759 if subscription.status == SubscriptionStatus.canceled:
2760 subscription.legacy_stripe_subscription_id = stripe_subscription_id
2761 subscription.stripe_subscription_id = None
2762 session.add(subscription)
2763 return subscription
2765 # Ensure the latest invoice is paid
2766 stripe_subscription = await stripe_lib.Subscription.retrieve_async(
2767 stripe_subscription_id, expand=["latest_invoice"]
2768 )
2769 latest_invoice = cast(
2770 stripe_lib.Invoice | None, stripe_subscription.latest_invoice
2771 )
2772 if latest_invoice is not None and not latest_invoice.paid:
2773 raise SubscriptionNotReadyForMigration(subscription)
2775 # Ensure there are no pending prorations
2776 try:
2777 upcoming_invoice = await stripe_lib.Invoice.create_preview_async(
2778 subscription=stripe_subscription_id
2779 )
2780 async for item in upcoming_invoice.lines.auto_paging_iter():
2781 if item.proration:
2782 raise SubscriptionNotReadyForMigration(subscription)
2783 except stripe_lib.InvalidRequestError as e:
2784 if "no upcoming invoices" in str(e).lower():
2785 # No upcoming invoice, so no prorations
2786 pass
2787 else:
2788 raise
2790 subscription.legacy_stripe_subscription_id = stripe_subscription_id
2791 subscription.stripe_subscription_id = None
2792 session.add(subscription)
2793 await session.commit() # Commit now so we stop handling Stripe webhooks
2795 try:
2796 await stripe_lib.Subscription.cancel_async(
2797 stripe_subscription_id,
2798 cancellation_details={
2799 "feedback": "other",
2800 "comment": "Migrated to Polar",
2801 },
2802 invoice_now=False,
2803 prorate=False,
2804 )
2806 # Handle cases where the payment method is NULL
2807 if subscription.payment_method_id is None:
2808 payment_method_repository = PaymentMethodRepository.from_session(
2809 session
2810 )
2811 payment_method = await payment_method_repository.get_one_or_none(
2812 payment_method_repository.get_base_statement()
2813 .where(
2814 PaymentMethod.customer_id == subscription.customer_id,
2815 PaymentMethod.deleted_at.is_(None),
2816 )
2817 .order_by(PaymentMethod.created_at.desc())
2818 .limit(1)
2819 )
2820 subscription.payment_method = payment_method
2821 session.add(subscription)
2822 except Exception:
2823 # Revert changes
2824 subscription.stripe_subscription_id = stripe_subscription_id
2825 subscription.legacy_stripe_subscription_id = None
2826 session.add(subscription)
2827 await session.commit()
2828 raise
2830 return subscription
2833subscription = SubscriptionService() 1a