Coverage for polar/checkout/endpoints.py: 68%
73 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1from typing import Annotated 1a
3from fastapi import Depends, Path, Query, Request 1a
4from pydantic import UUID4 1a
5from sse_starlette.sse import EventSourceResponse 1a
7from polar.customer.schemas.customer import CustomerID 1a
8from polar.eventstream.endpoints import subscribe 1a
9from polar.eventstream.service import Receivers 1a
10from polar.exceptions import PaymentNotReady, ResourceNotFound 1a
11from polar.kit.pagination import ListResource, PaginationParamsQuery 1a
12from polar.kit.schemas import ( 1a
13 MultipleQueryFilter,
14 SetSchemaReference,
15)
16from polar.locker import Locker, get_locker 1a
17from polar.models import Checkout 1a
18from polar.models.checkout import CheckoutStatus 1a
19from polar.openapi import APITag 1a
20from polar.organization.schemas import OrganizationID 1a
21from polar.postgres import ( 1a
22 AsyncReadSession,
23 AsyncSession,
24 get_db_read_session,
25 get_db_session,
26)
27from polar.product.schemas import ProductID 1a
28from polar.redis import Redis, get_redis 1a
29from polar.routing import APIRouter 1a
31from . import auth, ip_geolocation, sorting 1a
32from .schemas import Checkout as CheckoutSchema 1a
33from .schemas import ( 1a
34 CheckoutConfirm,
35 CheckoutCreate,
36 CheckoutCreatePublic,
37 CheckoutPublic,
38 CheckoutPublicConfirmed,
39 CheckoutUpdate,
40 CheckoutUpdatePublic,
41)
42from .service import ( 1a
43 AlreadyActiveSubscriptionError,
44 ExpiredCheckoutError,
45 NotOpenCheckout,
46 PaymentError,
47 TrialAlreadyRedeemed,
48)
49from .service import checkout as checkout_service 1a
51inner_router = APIRouter(tags=["checkouts", APITag.public]) 1a
54CheckoutID = Annotated[UUID4, Path(description="The checkout session ID.")] 1a
55CheckoutClientSecret = Annotated[ 1a
56 str, Path(description="The checkout session client secret.")
57]
58CheckoutNotFound = { 1a
59 "description": "Checkout session not found.",
60 "model": ResourceNotFound.schema(),
61}
62CheckoutExpired = { 1a
63 "description": "The checkout session is expired.",
64 "model": ExpiredCheckoutError.schema(),
65}
66CheckoutPaymentError = { 1a
67 "description": "The payment failed.",
68 "model": PaymentError.schema(),
69}
70CheckoutForbiddenError = { 1a
71 "description": "The checkout is expired, the customer already has an active subscription, or the organization is not ready to accept payments.",
72 "model": Annotated[
73 AlreadyActiveSubscriptionError.schema()
74 | NotOpenCheckout.schema()
75 | PaymentNotReady.schema()
76 | TrialAlreadyRedeemed.schema(),
77 SetSchemaReference("CheckoutForbiddenError"),
78 ],
79}
82@inner_router.get( 1a
83 "/", summary="List Checkout Sessions", response_model=ListResource[CheckoutSchema]
84)
85async def list( 1a
86 auth_subject: auth.CheckoutRead,
87 pagination: PaginationParamsQuery,
88 sorting: sorting.ListSorting,
89 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
90 None, title="OrganizationID Filter", description="Filter by organization ID."
91 ),
92 product_id: MultipleQueryFilter[ProductID] | None = Query(
93 None, title="ProductID Filter", description="Filter by product ID."
94 ),
95 customer_id: MultipleQueryFilter[CustomerID] | None = Query(
96 None, title="CustomerID Filter", description="Filter by customer ID."
97 ),
98 status: MultipleQueryFilter[CheckoutStatus] | None = Query(
99 None,
100 title="Status Filter",
101 description="Filter by checkout session status.",
102 ),
103 query: str | None = Query(None, description="Filter by customer email."),
104 session: AsyncReadSession = Depends(get_db_read_session),
105) -> ListResource[CheckoutSchema]:
106 """List checkout sessions."""
107 results, count = await checkout_service.list(
108 session,
109 auth_subject,
110 organization_id=organization_id,
111 product_id=product_id,
112 customer_id=customer_id,
113 status=status,
114 query=query,
115 pagination=pagination,
116 sorting=sorting,
117 )
119 return ListResource.from_paginated_results(
120 [CheckoutSchema.model_validate(result) for result in results],
121 count,
122 pagination,
123 )
126@inner_router.get( 1a
127 "/{id}",
128 summary="Get Checkout Session",
129 response_model=CheckoutSchema,
130 responses={404: CheckoutNotFound},
131)
132async def get( 1a
133 id: CheckoutID,
134 auth_subject: auth.CheckoutRead,
135 session: AsyncReadSession = Depends(get_db_read_session),
136) -> Checkout:
137 """Get a checkout session by ID."""
138 checkout = await checkout_service.get_by_id(session, auth_subject, id)
140 if checkout is None:
141 raise ResourceNotFound()
143 return checkout
146@inner_router.post( 1a
147 "/",
148 response_model=CheckoutSchema,
149 status_code=201,
150 summary="Create Checkout Session",
151 responses={201: {"description": "Checkout session created."}},
152)
153async def create( 1a
154 checkout_create: CheckoutCreate,
155 auth_subject: auth.CheckoutWrite,
156 ip_geolocation_client: ip_geolocation.IPGeolocationClient,
157 session: AsyncSession = Depends(get_db_session),
158) -> Checkout:
159 """Create a checkout session."""
160 return await checkout_service.create(
161 session, checkout_create, auth_subject, ip_geolocation_client
162 )
165@inner_router.patch( 1a
166 "/{id}",
167 response_model=CheckoutSchema,
168 summary="Update Checkout Session",
169 responses={
170 200: {"description": "Checkout session updated."},
171 404: CheckoutNotFound,
172 403: CheckoutForbiddenError,
173 },
174)
175async def update( 1a
176 id: CheckoutID,
177 checkout_update: CheckoutUpdate,
178 auth_subject: auth.CheckoutWrite,
179 ip_geolocation_client: ip_geolocation.IPGeolocationClient,
180 session: AsyncSession = Depends(get_db_session),
181 locker: Locker = Depends(get_locker),
182) -> Checkout:
183 """Update a checkout session."""
184 checkout = await checkout_service.get_by_id(session, auth_subject, id)
186 if checkout is None:
187 raise ResourceNotFound()
189 return await checkout_service.update(
190 session, locker, checkout, checkout_update, ip_geolocation_client
191 )
194@inner_router.get( 1a
195 "/client/{client_secret}",
196 summary="Get Checkout Session from Client",
197 response_model=CheckoutPublic,
198 responses={404: CheckoutNotFound, 410: CheckoutExpired},
199)
200async def client_get( 1ab
201 client_secret: CheckoutClientSecret,
202 session: AsyncSession = Depends(get_db_session),
203) -> Checkout:
204 """Get a checkout session by client secret."""
205 return await checkout_service.get_by_client_secret(session, client_secret)
208@inner_router.post( 1a
209 "/client/",
210 summary="Create Checkout Session from Client",
211 response_model=CheckoutPublic,
212 status_code=201,
213 tags=[APITag.private],
214)
215async def client_create( 1a
216 request: Request,
217 checkout_create: CheckoutCreatePublic,
218 auth_subject: auth.CheckoutWeb,
219 ip_geolocation_client: ip_geolocation.IPGeolocationClient,
220 session: AsyncSession = Depends(get_db_session),
221) -> Checkout:
222 """Create a checkout session from a client. Suitable to build checkout links."""
223 ip_address = request.client.host if request.client else None
224 return await checkout_service.client_create(
225 session, checkout_create, auth_subject, ip_geolocation_client, ip_address
226 )
229@inner_router.patch( 1a
230 "/client/{client_secret}",
231 response_model=CheckoutPublic,
232 summary="Update Checkout Session from Client",
233 responses={
234 200: {"description": "Checkout session updated."},
235 404: CheckoutNotFound,
236 403: CheckoutForbiddenError,
237 410: CheckoutExpired,
238 },
239)
240async def client_update( 1ab
241 client_secret: CheckoutClientSecret,
242 checkout_update: CheckoutUpdatePublic,
243 ip_geolocation_client: ip_geolocation.IPGeolocationClient,
244 session: AsyncSession = Depends(get_db_session),
245 locker: Locker = Depends(get_locker),
246) -> Checkout:
247 """Update a checkout session by client secret."""
248 checkout = await checkout_service.get_by_client_secret(session, client_secret)
250 return await checkout_service.update(
251 session, locker, checkout, checkout_update, ip_geolocation_client
252 )
255@inner_router.post( 1a
256 "/client/{client_secret}/confirm",
257 response_model=CheckoutPublicConfirmed,
258 summary="Confirm Checkout Session from Client",
259 responses={
260 200: {"description": "Checkout session confirmed."},
261 400: CheckoutPaymentError,
262 404: CheckoutNotFound,
263 403: CheckoutForbiddenError,
264 410: CheckoutExpired,
265 },
266)
267async def client_confirm( 1a
268 client_secret: CheckoutClientSecret,
269 checkout_confirm: CheckoutConfirm,
270 auth_subject: auth.CheckoutWeb,
271 session: AsyncSession = Depends(get_db_session),
272 locker: Locker = Depends(get_locker),
273) -> Checkout:
274 """
275 Confirm a checkout session by client secret.
277 Orders and subscriptions will be processed.
278 """
279 checkout = await checkout_service.get_by_client_secret(session, client_secret)
281 return await checkout_service.confirm(
282 session, locker, auth_subject, checkout, checkout_confirm
283 )
286@inner_router.get("/client/{client_secret}/stream", include_in_schema=False) 1a
287async def client_stream( 1ab
288 request: Request,
289 client_secret: CheckoutClientSecret,
290 session: AsyncSession = Depends(get_db_session),
291 redis: Redis = Depends(get_redis),
292) -> EventSourceResponse:
293 checkout = await checkout_service.get_by_client_secret(session, client_secret)
295 receivers = Receivers(checkout_client_secret=checkout.client_secret)
296 return EventSourceResponse(subscribe(redis, receivers.get_channels(), request))
299router = APIRouter(prefix="/checkouts") 1a
300router.include_router(inner_router, prefix="/custom", include_in_schema=False) 1a
301router.include_router(inner_router) 1a