Coverage for polar/subscription/endpoints.py: 43%
75 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1from collections.abc import AsyncGenerator 1a
3import structlog 1a
4from fastapi import Depends, Query, Response 1a
5from fastapi.responses import StreamingResponse 1a
7from polar.customer.schemas.customer import CustomerID, ExternalCustomerID 1a
8from polar.exceptions import ResourceNotFound 1a
9from polar.kit.csv import IterableCSVWriter 1a
10from polar.kit.metadata import MetadataQuery, get_metadata_query_openapi_schema 1a
11from polar.kit.pagination import ListResource, PaginationParams, PaginationParamsQuery 1a
12from polar.kit.schemas import MultipleQueryFilter 1a
13from polar.locker import Locker, get_locker 1a
14from polar.models import Subscription 1a
15from polar.openapi import APITag 1a
16from polar.organization.schemas import OrganizationID 1a
17from polar.postgres import ( 1a
18 AsyncReadSession,
19 AsyncSession,
20 get_db_read_session,
21 get_db_session,
22)
23from polar.product.schemas import ProductID 1a
24from polar.routing import APIRouter 1a
26from . import auth, sorting 1a
27from .schemas import Subscription as SubscriptionSchema 1a
28from .schemas import ( 1a
29 SubscriptionChargePreview,
30 SubscriptionCreate,
31 SubscriptionID,
32 SubscriptionUpdate,
33)
34from .service import AlreadyCanceledSubscription, SubscriptionLocked 1a
35from .service import subscription as subscription_service 1a
37log = structlog.get_logger() 1a
39router = APIRouter( 1a
40 prefix="/subscriptions", tags=["subscriptions", APITag.public, APITag.mcp]
41)
43SubscriptionNotFound = { 1a
44 "description": "Subscription not found.",
45 "model": ResourceNotFound.schema(),
46}
49@router.get( 1a
50 "/",
51 response_model=ListResource[SubscriptionSchema],
52 summary="List Subscriptions",
53 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]},
54)
55async def list( 1a
56 auth_subject: auth.SubscriptionsRead,
57 pagination: PaginationParamsQuery,
58 sorting: sorting.ListSorting,
59 metadata: MetadataQuery,
60 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
61 None, title="OrganizationID Filter", description="Filter by organization ID."
62 ),
63 product_id: MultipleQueryFilter[ProductID] | None = Query(
64 None, title="ProductID Filter", description="Filter by product ID."
65 ),
66 customer_id: MultipleQueryFilter[CustomerID] | None = Query(
67 None, title="CustomerID Filter", description="Filter by customer ID."
68 ),
69 external_customer_id: MultipleQueryFilter[ExternalCustomerID] | None = Query(
70 None,
71 title="ExternalCustomerID Filter",
72 description="Filter by customer external ID.",
73 ),
74 discount_id: MultipleQueryFilter[ProductID] | None = Query(
75 None, title="DiscountID Filter", description="Filter by discount ID."
76 ),
77 active: bool | None = Query(
78 None, description="Filter by active or inactive subscription."
79 ),
80 cancel_at_period_end: bool | None = Query(
81 None,
82 description="Filter by subscriptions that are set to cancel at period end.",
83 ),
84 session: AsyncReadSession = Depends(get_db_read_session),
85) -> ListResource[SubscriptionSchema]:
86 """List subscriptions."""
87 results, count = await subscription_service.list(
88 session,
89 auth_subject,
90 organization_id=organization_id,
91 product_id=product_id,
92 customer_id=customer_id,
93 external_customer_id=external_customer_id,
94 discount_id=discount_id,
95 active=active,
96 cancel_at_period_end=cancel_at_period_end,
97 metadata=metadata,
98 pagination=pagination,
99 sorting=sorting,
100 )
102 return ListResource.from_paginated_results(
103 [SubscriptionSchema.model_validate(result) for result in results],
104 count,
105 pagination,
106 )
109@router.get("/export", summary="Export Subscriptions") 1a
110async def export( 1a
111 auth_subject: auth.SubscriptionsRead,
112 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
113 None, description="Filter by organization ID."
114 ),
115 session: AsyncReadSession = Depends(get_db_read_session),
116) -> Response:
117 """Export subscriptions as a CSV file."""
119 async def create_csv() -> AsyncGenerator[str, None]:
120 csv_writer = IterableCSVWriter(dialect="excel")
121 # CSV header
122 yield csv_writer.getrow(
123 (
124 "Email",
125 "Created At",
126 "Active",
127 "Product",
128 "Price",
129 "Currency",
130 "Interval",
131 )
132 )
134 (subscribers, _) = await subscription_service.list(
135 session,
136 auth_subject,
137 organization_id=organization_id,
138 pagination=PaginationParams(limit=1000000, page=1),
139 )
141 for sub in subscribers:
142 yield csv_writer.getrow(
143 (
144 sub.customer.email,
145 sub.created_at.isoformat(),
146 "true" if sub.active else "false",
147 sub.product.name,
148 sub.amount / 100,
149 sub.currency,
150 sub.recurring_interval,
151 )
152 )
154 filename = "polar-subscribers.csv"
155 return StreamingResponse(
156 create_csv(),
157 media_type="text/csv",
158 headers={"Content-Disposition": f"attachment; filename={filename}"},
159 )
162@router.get( 1a
163 "/{id}",
164 summary="Get Subscription",
165 response_model=SubscriptionSchema,
166 responses={404: SubscriptionNotFound},
167)
168async def get( 1a
169 id: SubscriptionID,
170 auth_subject: auth.SubscriptionsRead,
171 session: AsyncReadSession = Depends(get_db_read_session),
172) -> Subscription:
173 """Get a subscription by ID."""
174 subscription = await subscription_service.get(session, auth_subject, id)
176 if subscription is None:
177 raise ResourceNotFound()
179 return subscription
182@router.get( 1a
183 "/{id}/charge-preview",
184 summary="Preview Next Charge For Subscription",
185 response_model=SubscriptionChargePreview,
186 responses={404: SubscriptionNotFound},
187 tags=[APITag.private],
188)
189async def get_charge_preview( 1a
190 id: SubscriptionID,
191 auth_subject: auth.SubscriptionsRead,
192 session: AsyncSession = Depends(get_db_session),
193) -> SubscriptionChargePreview:
194 """
195 Get a preview of the next charge for an active or trialing subscription.
197 Returns a breakdown of:
198 - Base subscription amount
199 - Metered usage charges
200 - Applied discounts
201 - Calculated taxes
202 - Total amount
204 For trialing subscriptions, shows what the first charge will be when the trial ends.
205 For subscriptions set to cancel at period end, shows the final charge.
206 Only available for active or trialing subscriptions, including those set to cancel.
207 """
208 subscription = await subscription_service.get(session, auth_subject, id)
210 if subscription is None:
211 raise ResourceNotFound()
213 # Allow active, trialing, and subscriptions set to cancel at period end
214 if subscription.status not in ("active", "trialing"):
215 raise ResourceNotFound()
217 # If subscription will end (cancel_at_period_end or ends_at), ensure there's still a charge coming
218 if subscription.cancel_at_period_end or subscription.ends_at:
219 # Only show preview if we haven't reached the end date yet
220 if subscription.ended_at:
221 raise ResourceNotFound()
223 return await subscription_service.calculate_charge_preview(session, subscription)
226@router.post( 1a
227 "/",
228 response_model=SubscriptionSchema,
229 status_code=201,
230 summary="Create Subscription",
231 responses={201: {"description": "Subscription created."}},
232)
233async def create( 1a
234 subscription_create: SubscriptionCreate,
235 auth_subject: auth.SubscriptionsWrite,
236 session: AsyncSession = Depends(get_db_session),
237) -> Subscription:
238 """
239 Create a subscription programmatically.
241 This endpoint only allows to create subscription on free products.
242 For paid products, use the checkout flow.
244 No initial order will be created and no confirmation email will be sent.
245 """
246 return await subscription_service.create(session, subscription_create, auth_subject)
249@router.patch( 1a
250 "/{id}",
251 summary="Update Subscription",
252 response_model=SubscriptionSchema,
253 responses={
254 200: {"description": "Subscription updated."},
255 403: {
256 "description": (
257 "Subscription is already canceled or will be at the end of the period."
258 ),
259 "model": AlreadyCanceledSubscription.schema(),
260 },
261 404: SubscriptionNotFound,
262 409: {
263 "description": "Subscription is pending an update.",
264 "model": SubscriptionLocked.schema(),
265 },
266 },
267)
268async def update( 1a
269 id: SubscriptionID,
270 subscription_update: SubscriptionUpdate,
271 auth_subject: auth.SubscriptionsWrite,
272 session: AsyncSession = Depends(get_db_session),
273 locker: Locker = Depends(get_locker),
274) -> Subscription:
275 """Update a subscription."""
276 subscription = await subscription_service.get(session, auth_subject, id)
277 if subscription is None:
278 raise ResourceNotFound()
280 log.info(
281 "subscription.update",
282 id=id,
283 customer_id=auth_subject.subject.id,
284 updates=subscription_update,
285 )
286 async with subscription_service.lock(locker, subscription):
287 return await subscription_service.update(
288 session, locker, subscription, update=subscription_update
289 )
292@router.delete( 1a
293 "/{id}",
294 summary="Revoke Subscription",
295 response_model=SubscriptionSchema,
296 responses={
297 200: {"description": "Subscription revoked."},
298 403: {
299 "description": "This subscription is already revoked.",
300 "model": AlreadyCanceledSubscription.schema(),
301 },
302 404: SubscriptionNotFound,
303 409: {
304 "description": "Subscription is pending an update.",
305 "model": SubscriptionLocked.schema(),
306 },
307 },
308)
309async def revoke( 1a
310 id: SubscriptionID,
311 auth_subject: auth.SubscriptionsWrite,
312 session: AsyncSession = Depends(get_db_session),
313 locker: Locker = Depends(get_locker),
314) -> Subscription:
315 """Revoke a subscription, i.e cancel immediately."""
316 subscription = await subscription_service.get(session, auth_subject, id)
317 if subscription is None:
318 raise ResourceNotFound()
320 log.info(
321 "subscription.revoke", id=id, admin_id=auth_subject.subject.id, immediate=True
322 )
323 async with subscription_service.lock(locker, subscription):
324 return await subscription_service.revoke(session, subscription)