Coverage for polar/subscription/endpoints.py: 43%

75 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from collections.abc import AsyncGenerator 1a

2 

3import structlog 1a

4from fastapi import Depends, Query, Response 1a

5from fastapi.responses import StreamingResponse 1a

6 

7from polar.customer.schemas.customer import CustomerID, ExternalCustomerID 1a

8from polar.exceptions import ResourceNotFound 1a

9from polar.kit.csv import IterableCSVWriter 1a

10from polar.kit.metadata import MetadataQuery, get_metadata_query_openapi_schema 1a

11from polar.kit.pagination import ListResource, PaginationParams, PaginationParamsQuery 1a

12from polar.kit.schemas import MultipleQueryFilter 1a

13from polar.locker import Locker, get_locker 1a

14from polar.models import Subscription 1a

15from polar.openapi import APITag 1a

16from polar.organization.schemas import OrganizationID 1a

17from polar.postgres import ( 1a

18 AsyncReadSession, 

19 AsyncSession, 

20 get_db_read_session, 

21 get_db_session, 

22) 

23from polar.product.schemas import ProductID 1a

24from polar.routing import APIRouter 1a

25 

26from . import auth, sorting 1a

27from .schemas import Subscription as SubscriptionSchema 1a

28from .schemas import ( 1a

29 SubscriptionChargePreview, 

30 SubscriptionCreate, 

31 SubscriptionID, 

32 SubscriptionUpdate, 

33) 

34from .service import AlreadyCanceledSubscription, SubscriptionLocked 1a

35from .service import subscription as subscription_service 1a

36 

37log = structlog.get_logger() 1a

38 

39router = APIRouter( 1a

40 prefix="/subscriptions", tags=["subscriptions", APITag.public, APITag.mcp] 

41) 

42 

43SubscriptionNotFound = { 1a

44 "description": "Subscription not found.", 

45 "model": ResourceNotFound.schema(), 

46} 

47 

48 

49@router.get( 1a

50 "/", 

51 response_model=ListResource[SubscriptionSchema], 

52 summary="List Subscriptions", 

53 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]}, 

54) 

55async def list( 1a

56 auth_subject: auth.SubscriptionsRead, 

57 pagination: PaginationParamsQuery, 

58 sorting: sorting.ListSorting, 

59 metadata: MetadataQuery, 

60 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

61 None, title="OrganizationID Filter", description="Filter by organization ID." 

62 ), 

63 product_id: MultipleQueryFilter[ProductID] | None = Query( 

64 None, title="ProductID Filter", description="Filter by product ID." 

65 ), 

66 customer_id: MultipleQueryFilter[CustomerID] | None = Query( 

67 None, title="CustomerID Filter", description="Filter by customer ID." 

68 ), 

69 external_customer_id: MultipleQueryFilter[ExternalCustomerID] | None = Query( 

70 None, 

71 title="ExternalCustomerID Filter", 

72 description="Filter by customer external ID.", 

73 ), 

74 discount_id: MultipleQueryFilter[ProductID] | None = Query( 

75 None, title="DiscountID Filter", description="Filter by discount ID." 

76 ), 

77 active: bool | None = Query( 

78 None, description="Filter by active or inactive subscription." 

79 ), 

80 cancel_at_period_end: bool | None = Query( 

81 None, 

82 description="Filter by subscriptions that are set to cancel at period end.", 

83 ), 

84 session: AsyncReadSession = Depends(get_db_read_session), 

85) -> ListResource[SubscriptionSchema]: 

86 """List subscriptions.""" 

87 results, count = await subscription_service.list( 

88 session, 

89 auth_subject, 

90 organization_id=organization_id, 

91 product_id=product_id, 

92 customer_id=customer_id, 

93 external_customer_id=external_customer_id, 

94 discount_id=discount_id, 

95 active=active, 

96 cancel_at_period_end=cancel_at_period_end, 

97 metadata=metadata, 

98 pagination=pagination, 

99 sorting=sorting, 

100 ) 

101 

102 return ListResource.from_paginated_results( 

103 [SubscriptionSchema.model_validate(result) for result in results], 

104 count, 

105 pagination, 

106 ) 

107 

108 

109@router.get("/export", summary="Export Subscriptions") 1a

110async def export( 1a

111 auth_subject: auth.SubscriptionsRead, 

112 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

113 None, description="Filter by organization ID." 

114 ), 

115 session: AsyncReadSession = Depends(get_db_read_session), 

116) -> Response: 

117 """Export subscriptions as a CSV file.""" 

118 

119 async def create_csv() -> AsyncGenerator[str, None]: 

120 csv_writer = IterableCSVWriter(dialect="excel") 

121 # CSV header 

122 yield csv_writer.getrow( 

123 ( 

124 "Email", 

125 "Created At", 

126 "Active", 

127 "Product", 

128 "Price", 

129 "Currency", 

130 "Interval", 

131 ) 

132 ) 

133 

134 (subscribers, _) = await subscription_service.list( 

135 session, 

136 auth_subject, 

137 organization_id=organization_id, 

138 pagination=PaginationParams(limit=1000000, page=1), 

139 ) 

140 

141 for sub in subscribers: 

142 yield csv_writer.getrow( 

143 ( 

144 sub.customer.email, 

145 sub.created_at.isoformat(), 

146 "true" if sub.active else "false", 

147 sub.product.name, 

148 sub.amount / 100, 

149 sub.currency, 

150 sub.recurring_interval, 

151 ) 

152 ) 

153 

154 filename = "polar-subscribers.csv" 

155 return StreamingResponse( 

156 create_csv(), 

157 media_type="text/csv", 

158 headers={"Content-Disposition": f"attachment; filename={filename}"}, 

159 ) 

160 

161 

162@router.get( 1a

163 "/{id}", 

164 summary="Get Subscription", 

165 response_model=SubscriptionSchema, 

166 responses={404: SubscriptionNotFound}, 

167) 

168async def get( 1a

169 id: SubscriptionID, 

170 auth_subject: auth.SubscriptionsRead, 

171 session: AsyncReadSession = Depends(get_db_read_session), 

172) -> Subscription: 

173 """Get a subscription by ID.""" 

174 subscription = await subscription_service.get(session, auth_subject, id) 

175 

176 if subscription is None: 

177 raise ResourceNotFound() 

178 

179 return subscription 

180 

181 

182@router.get( 1a

183 "/{id}/charge-preview", 

184 summary="Preview Next Charge For Subscription", 

185 response_model=SubscriptionChargePreview, 

186 responses={404: SubscriptionNotFound}, 

187 tags=[APITag.private], 

188) 

189async def get_charge_preview( 1a

190 id: SubscriptionID, 

191 auth_subject: auth.SubscriptionsRead, 

192 session: AsyncSession = Depends(get_db_session), 

193) -> SubscriptionChargePreview: 

194 """ 

195 Get a preview of the next charge for an active or trialing subscription. 

196 

197 Returns a breakdown of: 

198 - Base subscription amount 

199 - Metered usage charges 

200 - Applied discounts 

201 - Calculated taxes 

202 - Total amount 

203 

204 For trialing subscriptions, shows what the first charge will be when the trial ends. 

205 For subscriptions set to cancel at period end, shows the final charge. 

206 Only available for active or trialing subscriptions, including those set to cancel. 

207 """ 

208 subscription = await subscription_service.get(session, auth_subject, id) 

209 

210 if subscription is None: 

211 raise ResourceNotFound() 

212 

213 # Allow active, trialing, and subscriptions set to cancel at period end 

214 if subscription.status not in ("active", "trialing"): 

215 raise ResourceNotFound() 

216 

217 # If subscription will end (cancel_at_period_end or ends_at), ensure there's still a charge coming 

218 if subscription.cancel_at_period_end or subscription.ends_at: 

219 # Only show preview if we haven't reached the end date yet 

220 if subscription.ended_at: 

221 raise ResourceNotFound() 

222 

223 return await subscription_service.calculate_charge_preview(session, subscription) 

224 

225 

226@router.post( 1a

227 "/", 

228 response_model=SubscriptionSchema, 

229 status_code=201, 

230 summary="Create Subscription", 

231 responses={201: {"description": "Subscription created."}}, 

232) 

233async def create( 1a

234 subscription_create: SubscriptionCreate, 

235 auth_subject: auth.SubscriptionsWrite, 

236 session: AsyncSession = Depends(get_db_session), 

237) -> Subscription: 

238 """ 

239 Create a subscription programmatically. 

240 

241 This endpoint only allows to create subscription on free products. 

242 For paid products, use the checkout flow. 

243 

244 No initial order will be created and no confirmation email will be sent. 

245 """ 

246 return await subscription_service.create(session, subscription_create, auth_subject) 

247 

248 

249@router.patch( 1a

250 "/{id}", 

251 summary="Update Subscription", 

252 response_model=SubscriptionSchema, 

253 responses={ 

254 200: {"description": "Subscription updated."}, 

255 403: { 

256 "description": ( 

257 "Subscription is already canceled or will be at the end of the period." 

258 ), 

259 "model": AlreadyCanceledSubscription.schema(), 

260 }, 

261 404: SubscriptionNotFound, 

262 409: { 

263 "description": "Subscription is pending an update.", 

264 "model": SubscriptionLocked.schema(), 

265 }, 

266 }, 

267) 

268async def update( 1a

269 id: SubscriptionID, 

270 subscription_update: SubscriptionUpdate, 

271 auth_subject: auth.SubscriptionsWrite, 

272 session: AsyncSession = Depends(get_db_session), 

273 locker: Locker = Depends(get_locker), 

274) -> Subscription: 

275 """Update a subscription.""" 

276 subscription = await subscription_service.get(session, auth_subject, id) 

277 if subscription is None: 

278 raise ResourceNotFound() 

279 

280 log.info( 

281 "subscription.update", 

282 id=id, 

283 customer_id=auth_subject.subject.id, 

284 updates=subscription_update, 

285 ) 

286 async with subscription_service.lock(locker, subscription): 

287 return await subscription_service.update( 

288 session, locker, subscription, update=subscription_update 

289 ) 

290 

291 

292@router.delete( 1a

293 "/{id}", 

294 summary="Revoke Subscription", 

295 response_model=SubscriptionSchema, 

296 responses={ 

297 200: {"description": "Subscription revoked."}, 

298 403: { 

299 "description": "This subscription is already revoked.", 

300 "model": AlreadyCanceledSubscription.schema(), 

301 }, 

302 404: SubscriptionNotFound, 

303 409: { 

304 "description": "Subscription is pending an update.", 

305 "model": SubscriptionLocked.schema(), 

306 }, 

307 }, 

308) 

309async def revoke( 1a

310 id: SubscriptionID, 

311 auth_subject: auth.SubscriptionsWrite, 

312 session: AsyncSession = Depends(get_db_session), 

313 locker: Locker = Depends(get_locker), 

314) -> Subscription: 

315 """Revoke a subscription, i.e cancel immediately.""" 

316 subscription = await subscription_service.get(session, auth_subject, id) 

317 if subscription is None: 

318 raise ResourceNotFound() 

319 

320 log.info( 

321 "subscription.revoke", id=id, admin_id=auth_subject.subject.id, immediate=True 

322 ) 

323 async with subscription_service.lock(locker, subscription): 

324 return await subscription_service.revoke(session, subscription)