Coverage for polar/checkout/endpoints.py: 68%

73 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from typing import Annotated 1a

2 

3from fastapi import Depends, Path, Query, Request 1a

4from pydantic import UUID4 1a

5from sse_starlette.sse import EventSourceResponse 1a

6 

7from polar.customer.schemas.customer import CustomerID 1a

8from polar.eventstream.endpoints import subscribe 1a

9from polar.eventstream.service import Receivers 1a

10from polar.exceptions import PaymentNotReady, ResourceNotFound 1a

11from polar.kit.pagination import ListResource, PaginationParamsQuery 1a

12from polar.kit.schemas import ( 1a

13 MultipleQueryFilter, 

14 SetSchemaReference, 

15) 

16from polar.locker import Locker, get_locker 1a

17from polar.models import Checkout 1a

18from polar.models.checkout import CheckoutStatus 1a

19from polar.openapi import APITag 1a

20from polar.organization.schemas import OrganizationID 1a

21from polar.postgres import ( 1a

22 AsyncReadSession, 

23 AsyncSession, 

24 get_db_read_session, 

25 get_db_session, 

26) 

27from polar.product.schemas import ProductID 1a

28from polar.redis import Redis, get_redis 1a

29from polar.routing import APIRouter 1a

30 

31from . import auth, ip_geolocation, sorting 1a

32from .schemas import Checkout as CheckoutSchema 1a

33from .schemas import ( 1a

34 CheckoutConfirm, 

35 CheckoutCreate, 

36 CheckoutCreatePublic, 

37 CheckoutPublic, 

38 CheckoutPublicConfirmed, 

39 CheckoutUpdate, 

40 CheckoutUpdatePublic, 

41) 

42from .service import ( 1a

43 AlreadyActiveSubscriptionError, 

44 ExpiredCheckoutError, 

45 NotOpenCheckout, 

46 PaymentError, 

47 TrialAlreadyRedeemed, 

48) 

49from .service import checkout as checkout_service 1a

50 

51inner_router = APIRouter(tags=["checkouts", APITag.public]) 1a

52 

53 

54CheckoutID = Annotated[UUID4, Path(description="The checkout session ID.")] 1a

55CheckoutClientSecret = Annotated[ 1a

56 str, Path(description="The checkout session client secret.") 

57] 

58CheckoutNotFound = { 1a

59 "description": "Checkout session not found.", 

60 "model": ResourceNotFound.schema(), 

61} 

62CheckoutExpired = { 1a

63 "description": "The checkout session is expired.", 

64 "model": ExpiredCheckoutError.schema(), 

65} 

66CheckoutPaymentError = { 1a

67 "description": "The payment failed.", 

68 "model": PaymentError.schema(), 

69} 

70CheckoutForbiddenError = { 1a

71 "description": "The checkout is expired, the customer already has an active subscription, or the organization is not ready to accept payments.", 

72 "model": Annotated[ 

73 AlreadyActiveSubscriptionError.schema() 

74 | NotOpenCheckout.schema() 

75 | PaymentNotReady.schema() 

76 | TrialAlreadyRedeemed.schema(), 

77 SetSchemaReference("CheckoutForbiddenError"), 

78 ], 

79} 

80 

81 

82@inner_router.get( 1a

83 "/", summary="List Checkout Sessions", response_model=ListResource[CheckoutSchema] 

84) 

85async def list( 1a

86 auth_subject: auth.CheckoutRead, 

87 pagination: PaginationParamsQuery, 

88 sorting: sorting.ListSorting, 

89 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

90 None, title="OrganizationID Filter", description="Filter by organization ID." 

91 ), 

92 product_id: MultipleQueryFilter[ProductID] | None = Query( 

93 None, title="ProductID Filter", description="Filter by product ID." 

94 ), 

95 customer_id: MultipleQueryFilter[CustomerID] | None = Query( 

96 None, title="CustomerID Filter", description="Filter by customer ID." 

97 ), 

98 status: MultipleQueryFilter[CheckoutStatus] | None = Query( 

99 None, 

100 title="Status Filter", 

101 description="Filter by checkout session status.", 

102 ), 

103 query: str | None = Query(None, description="Filter by customer email."), 

104 session: AsyncReadSession = Depends(get_db_read_session), 

105) -> ListResource[CheckoutSchema]: 

106 """List checkout sessions.""" 

107 results, count = await checkout_service.list( 

108 session, 

109 auth_subject, 

110 organization_id=organization_id, 

111 product_id=product_id, 

112 customer_id=customer_id, 

113 status=status, 

114 query=query, 

115 pagination=pagination, 

116 sorting=sorting, 

117 ) 

118 

119 return ListResource.from_paginated_results( 

120 [CheckoutSchema.model_validate(result) for result in results], 

121 count, 

122 pagination, 

123 ) 

124 

125 

126@inner_router.get( 1a

127 "/{id}", 

128 summary="Get Checkout Session", 

129 response_model=CheckoutSchema, 

130 responses={404: CheckoutNotFound}, 

131) 

132async def get( 1a

133 id: CheckoutID, 

134 auth_subject: auth.CheckoutRead, 

135 session: AsyncReadSession = Depends(get_db_read_session), 

136) -> Checkout: 

137 """Get a checkout session by ID.""" 

138 checkout = await checkout_service.get_by_id(session, auth_subject, id) 

139 

140 if checkout is None: 

141 raise ResourceNotFound() 

142 

143 return checkout 

144 

145 

146@inner_router.post( 1a

147 "/", 

148 response_model=CheckoutSchema, 

149 status_code=201, 

150 summary="Create Checkout Session", 

151 responses={201: {"description": "Checkout session created."}}, 

152) 

153async def create( 1a

154 checkout_create: CheckoutCreate, 

155 auth_subject: auth.CheckoutWrite, 

156 ip_geolocation_client: ip_geolocation.IPGeolocationClient, 

157 session: AsyncSession = Depends(get_db_session), 

158) -> Checkout: 

159 """Create a checkout session.""" 

160 return await checkout_service.create( 

161 session, checkout_create, auth_subject, ip_geolocation_client 

162 ) 

163 

164 

165@inner_router.patch( 1a

166 "/{id}", 

167 response_model=CheckoutSchema, 

168 summary="Update Checkout Session", 

169 responses={ 

170 200: {"description": "Checkout session updated."}, 

171 404: CheckoutNotFound, 

172 403: CheckoutForbiddenError, 

173 }, 

174) 

175async def update( 1a

176 id: CheckoutID, 

177 checkout_update: CheckoutUpdate, 

178 auth_subject: auth.CheckoutWrite, 

179 ip_geolocation_client: ip_geolocation.IPGeolocationClient, 

180 session: AsyncSession = Depends(get_db_session), 

181 locker: Locker = Depends(get_locker), 

182) -> Checkout: 

183 """Update a checkout session.""" 

184 checkout = await checkout_service.get_by_id(session, auth_subject, id) 

185 

186 if checkout is None: 

187 raise ResourceNotFound() 

188 

189 return await checkout_service.update( 

190 session, locker, checkout, checkout_update, ip_geolocation_client 

191 ) 

192 

193 

194@inner_router.get( 1a

195 "/client/{client_secret}", 

196 summary="Get Checkout Session from Client", 

197 response_model=CheckoutPublic, 

198 responses={404: CheckoutNotFound, 410: CheckoutExpired}, 

199) 

200async def client_get( 1ab

201 client_secret: CheckoutClientSecret, 

202 session: AsyncSession = Depends(get_db_session), 

203) -> Checkout: 

204 """Get a checkout session by client secret.""" 

205 return await checkout_service.get_by_client_secret(session, client_secret) 

206 

207 

208@inner_router.post( 1a

209 "/client/", 

210 summary="Create Checkout Session from Client", 

211 response_model=CheckoutPublic, 

212 status_code=201, 

213 tags=[APITag.private], 

214) 

215async def client_create( 1a

216 request: Request, 

217 checkout_create: CheckoutCreatePublic, 

218 auth_subject: auth.CheckoutWeb, 

219 ip_geolocation_client: ip_geolocation.IPGeolocationClient, 

220 session: AsyncSession = Depends(get_db_session), 

221) -> Checkout: 

222 """Create a checkout session from a client. Suitable to build checkout links.""" 

223 ip_address = request.client.host if request.client else None 

224 return await checkout_service.client_create( 

225 session, checkout_create, auth_subject, ip_geolocation_client, ip_address 

226 ) 

227 

228 

229@inner_router.patch( 1a

230 "/client/{client_secret}", 

231 response_model=CheckoutPublic, 

232 summary="Update Checkout Session from Client", 

233 responses={ 

234 200: {"description": "Checkout session updated."}, 

235 404: CheckoutNotFound, 

236 403: CheckoutForbiddenError, 

237 410: CheckoutExpired, 

238 }, 

239) 

240async def client_update( 1ab

241 client_secret: CheckoutClientSecret, 

242 checkout_update: CheckoutUpdatePublic, 

243 ip_geolocation_client: ip_geolocation.IPGeolocationClient, 

244 session: AsyncSession = Depends(get_db_session), 

245 locker: Locker = Depends(get_locker), 

246) -> Checkout: 

247 """Update a checkout session by client secret.""" 

248 checkout = await checkout_service.get_by_client_secret(session, client_secret) 

249 

250 return await checkout_service.update( 

251 session, locker, checkout, checkout_update, ip_geolocation_client 

252 ) 

253 

254 

255@inner_router.post( 1a

256 "/client/{client_secret}/confirm", 

257 response_model=CheckoutPublicConfirmed, 

258 summary="Confirm Checkout Session from Client", 

259 responses={ 

260 200: {"description": "Checkout session confirmed."}, 

261 400: CheckoutPaymentError, 

262 404: CheckoutNotFound, 

263 403: CheckoutForbiddenError, 

264 410: CheckoutExpired, 

265 }, 

266) 

267async def client_confirm( 1a

268 client_secret: CheckoutClientSecret, 

269 checkout_confirm: CheckoutConfirm, 

270 auth_subject: auth.CheckoutWeb, 

271 session: AsyncSession = Depends(get_db_session), 

272 locker: Locker = Depends(get_locker), 

273) -> Checkout: 

274 """ 

275 Confirm a checkout session by client secret. 

276 

277 Orders and subscriptions will be processed. 

278 """ 

279 checkout = await checkout_service.get_by_client_secret(session, client_secret) 

280 

281 return await checkout_service.confirm( 

282 session, locker, auth_subject, checkout, checkout_confirm 

283 ) 

284 

285 

286@inner_router.get("/client/{client_secret}/stream", include_in_schema=False) 1a

287async def client_stream( 1ab

288 request: Request, 

289 client_secret: CheckoutClientSecret, 

290 session: AsyncSession = Depends(get_db_session), 

291 redis: Redis = Depends(get_redis), 

292) -> EventSourceResponse: 

293 checkout = await checkout_service.get_by_client_secret(session, client_secret) 

294 

295 receivers = Receivers(checkout_client_secret=checkout.client_secret) 

296 return EventSourceResponse(subscribe(redis, receivers.get_channels(), request)) 

297 

298 

299router = APIRouter(prefix="/checkouts") 1a

300router.include_router(inner_router, prefix="/custom", include_in_schema=False) 1a

301router.include_router(inner_router) 1a