Coverage for polar/customer_seat/endpoints.py: 19%
156 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1from typing import Annotated, cast 1a
3from fastapi import Depends, Query, Request 1a
4from pydantic import UUID4 1a
5from sqlalchemy.orm import joinedload 1a
6from sse_starlette import EventSourceResponse 1a
8from polar.auth.models import Anonymous, Organization, User 1a
9from polar.auth.models import AuthSubject as AuthSubjectType 1a
10from polar.checkout.repository import CheckoutRepository 1a
11from polar.eventstream.endpoints import subscribe 1a
12from polar.eventstream.service import Receivers 1a
13from polar.exceptions import BadRequest, NotPermitted, ResourceNotFound 1a
14from polar.models import CustomerSeat, Order, Product, Subscription 1a
15from polar.models.customer_seat import SeatStatus 1a
16from polar.openapi import APITag 1a
17from polar.order.repository import OrderRepository 1a
18from polar.organization.repository import OrganizationRepository 1a
19from polar.postgres import AsyncSession, get_db_session 1a
20from polar.redis import Redis, get_redis 1a
21from polar.routing import APIRouter 1a
22from polar.subscription.repository import SubscriptionRepository 1a
24from .auth import SeatWriteOrAnonymous 1a
25from .repository import CustomerSeatRepository 1a
26from .schemas import CustomerSeat as CustomerSeatSchema 1a
27from .schemas import ( 1a
28 CustomerSeatClaimResponse,
29 SeatAssign,
30 SeatClaim,
31 SeatClaimInfo,
32 SeatsList,
33)
34from .service import seat_service 1a
36router = APIRouter( 1a
37 prefix="/customer-seats",
38 tags=["customer-seats", APITag.public],
39)
42@router.post( 1a
43 "",
44 summary="Assign Seat",
45 response_model=CustomerSeatSchema,
46 responses={
47 400: {"description": "No available seats or customer already has a seat"},
48 401: {
49 "description": "Authentication required for direct subscription or order assignment"
50 },
51 403: {"description": "Not permitted or seat-based pricing not enabled"},
52 404: {"description": "Subscription, order, checkout, or customer not found"},
53 },
54)
55async def assign_seat( 1a
56 seat_assign: SeatAssign,
57 auth_subject: SeatWriteOrAnonymous,
58 session: AsyncSession = Depends(get_db_session),
59) -> CustomerSeatSchema:
60 # Prevent anonymous users from using immediate_claim
61 if isinstance(auth_subject.subject, Anonymous) and seat_assign.immediate_claim:
62 raise NotPermitted(
63 "Anonymous users cannot use immediate_claim. This feature is only available for authenticated API access."
64 )
66 subscription: Subscription | None = None
67 order: Order | None = None
69 if seat_assign.subscription_id:
70 if isinstance(auth_subject.subject, Anonymous):
71 raise NotPermitted(
72 "Authentication required for subscription-based assignment"
73 )
75 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject)
76 subscription_repository = SubscriptionRepository.from_session(session)
78 statement = (
79 subscription_repository.get_readable_statement(typed_auth_subject)
80 .options(*subscription_repository.get_eager_options())
81 .where(Subscription.id == seat_assign.subscription_id)
82 )
83 subscription = await subscription_repository.get_one_or_none(statement)
85 if not subscription:
86 raise ResourceNotFound("Subscription not found")
88 elif seat_assign.checkout_id:
89 subscription_repository = SubscriptionRepository.from_session(session)
90 order_repository = OrderRepository.from_session(session)
91 checkout_repository = CheckoutRepository.from_session(session)
92 checkout = await checkout_repository.get_by_id(seat_assign.checkout_id)
94 if not checkout:
95 raise ResourceNotFound("Checkout not found")
97 # Try to find subscription first (for recurring purchases)
98 subscription = await subscription_repository.get_by_checkout_id(
99 seat_assign.checkout_id,
100 options=(
101 joinedload(Subscription.product).joinedload(Product.organization),
102 joinedload(Subscription.customer),
103 ),
104 )
106 # If no subscription, try to find order (for one-time purchases)
107 if not subscription:
108 order = await order_repository.get_earliest_by_checkout_id(
109 seat_assign.checkout_id,
110 options=order_repository.get_eager_options(),
111 )
112 if not order:
113 raise ResourceNotFound(
114 "No subscription or order found for this checkout"
115 )
117 elif seat_assign.order_id:
118 if isinstance(auth_subject.subject, Anonymous):
119 raise NotPermitted("Authentication required for order-based assignment")
121 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject)
122 order_repository = OrderRepository.from_session(session)
124 order_statement = (
125 order_repository.get_readable_statement(typed_auth_subject)
126 .options(*order_repository.get_eager_options())
127 .where(Order.id == seat_assign.order_id)
128 )
129 order = await order_repository.get_one_or_none(order_statement)
131 if not order:
132 raise ResourceNotFound("Order not found")
134 if not subscription and not order:
135 raise BadRequest(
136 "Either subscription_id, checkout_id, or order_id must be provided"
137 )
139 container = subscription or order
140 assert container is not None # Already validated above
142 seat = await seat_service.assign_seat(
143 session,
144 container,
145 email=seat_assign.email,
146 external_customer_id=seat_assign.external_customer_id,
147 customer_id=seat_assign.customer_id,
148 metadata=seat_assign.metadata,
149 immediate_claim=seat_assign.immediate_claim,
150 )
152 return CustomerSeatSchema.model_validate(seat)
155@router.get( 1a
156 "",
157 summary="List Seats",
158 response_model=SeatsList,
159 responses={
160 401: {"description": "Authentication required"},
161 403: {"description": "Not permitted or seat-based pricing not enabled"},
162 404: {"description": "Subscription or order not found"},
163 },
164)
165async def list_seats( 1a
166 auth_subject: SeatWriteOrAnonymous,
167 session: AsyncSession = Depends(get_db_session),
168 subscription_id: Annotated[UUID4 | None, Query()] = None,
169 order_id: Annotated[UUID4 | None, Query()] = None,
170) -> SeatsList:
171 if isinstance(auth_subject.subject, Anonymous):
172 raise NotPermitted("Authentication required")
174 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject)
176 subscription: Subscription | None = None
177 order: Order | None = None
178 total_seats = 0
180 if subscription_id:
181 subscription_repository = SubscriptionRepository.from_session(session)
183 statement = (
184 subscription_repository.get_readable_statement(typed_auth_subject)
185 .options(*subscription_repository.get_eager_options())
186 .where(Subscription.id == subscription_id)
187 )
188 subscription = await subscription_repository.get_one_or_none(statement)
190 if not subscription:
191 raise ResourceNotFound("Subscription not found")
193 total_seats = subscription.seats or 0
195 elif order_id:
196 order_repository = OrderRepository.from_session(session)
198 order_statement = (
199 order_repository.get_readable_statement(typed_auth_subject)
200 .options(*order_repository.get_eager_options())
201 .where(Order.id == order_id)
202 )
203 order = await order_repository.get_one_or_none(order_statement)
205 if not order:
206 raise ResourceNotFound("Order not found")
208 total_seats = order.seats or 0
210 else:
211 raise BadRequest("Either subscription_id or order_id must be provided")
213 container = subscription or order
214 assert container is not None # Already validated above
216 seats = await seat_service.list_seats(session, container)
217 available_seats = await seat_service.get_available_seats_count(session, container)
219 return SeatsList(
220 seats=[CustomerSeatSchema.model_validate(seat) for seat in seats],
221 available_seats=available_seats,
222 total_seats=total_seats,
223 )
226@router.delete( 1a
227 "/{seat_id}",
228 summary="Revoke Seat",
229 response_model=CustomerSeatSchema,
230 responses={
231 401: {"description": "Authentication required"},
232 403: {"description": "Not permitted or seat-based pricing not enabled"},
233 404: {"description": "Seat not found"},
234 },
235)
236async def revoke_seat( 1a
237 seat_id: UUID4,
238 auth_subject: SeatWriteOrAnonymous,
239 session: AsyncSession = Depends(get_db_session),
240) -> CustomerSeat:
241 if isinstance(auth_subject.subject, Anonymous):
242 raise NotPermitted("Authentication required")
244 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject)
245 seat_repository = CustomerSeatRepository.from_session(session)
247 seat = await seat_repository.get_by_id_and_auth_subject(
248 typed_auth_subject,
249 seat_id,
250 options=seat_repository.get_eager_options(),
251 )
253 if not seat:
254 raise ResourceNotFound("Seat not found")
256 if seat.subscription:
257 organization_id = seat.subscription.product.organization_id
258 elif seat.order:
259 organization_id = seat.order.organization.id
260 else:
261 raise ResourceNotFound("Seat has no subscription or order")
263 await seat_service.check_seat_feature_enabled(session, organization_id)
265 return await seat_service.revoke_seat(session, seat)
268@router.post( 1a
269 "/{seat_id}/resend",
270 summary="Resend Invitation",
271 response_model=CustomerSeatSchema,
272 responses={
273 400: {"description": "Seat is not pending or already claimed"},
274 401: {"description": "Authentication required"},
275 403: {"description": "Not permitted or seat-based pricing not enabled"},
276 404: {"description": "Seat not found"},
277 },
278)
279async def resend_invitation( 1a
280 seat_id: UUID4,
281 auth_subject: SeatWriteOrAnonymous,
282 session: AsyncSession = Depends(get_db_session),
283) -> CustomerSeat:
284 if isinstance(auth_subject.subject, Anonymous):
285 raise NotPermitted("Authentication required")
287 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject)
288 seat_repository = CustomerSeatRepository.from_session(session)
290 seat = await seat_repository.get_by_id_and_auth_subject(
291 typed_auth_subject,
292 seat_id,
293 options=seat_repository.get_eager_options(),
294 )
296 if not seat:
297 raise ResourceNotFound("Seat not found")
299 if seat.subscription:
300 organization_id = seat.subscription.product.organization_id
301 elif seat.order:
302 organization_id = seat.order.organization.id
303 else:
304 raise ResourceNotFound("Seat has no subscription or order")
306 await seat_service.check_seat_feature_enabled(session, organization_id)
308 return await seat_service.resend_invitation(session, seat)
311@router.get( 1a
312 "/claim/{invitation_token}",
313 summary="Get Claim Info",
314 response_model=SeatClaimInfo,
315 responses={
316 400: {"description": "Invalid or expired invitation token"},
317 403: {"description": "Seat-based pricing not enabled for organization"},
318 404: {"description": "Seat not found"},
319 },
320)
321async def get_claim_info( 1ab
322 invitation_token: str,
323 session: AsyncSession = Depends(get_db_session),
324) -> SeatClaimInfo:
325 seat = await seat_service.get_seat_by_token(session, invitation_token)
327 if not seat:
328 raise ResourceNotFound("Invalid or expired invitation token")
330 if seat.subscription:
331 product = seat.subscription.product
332 elif seat.order and seat.order.product:
333 product = seat.order.product
334 else:
335 raise ResourceNotFound("Seat has no subscription or order")
337 await seat_service.check_seat_feature_enabled(session, product.organization_id)
339 organization_repository = OrganizationRepository.from_session(session)
340 organization = await organization_repository.get_by_id(product.organization_id)
341 if not organization:
342 raise ResourceNotFound("Organization not found")
344 return SeatClaimInfo(
345 product_name=product.name,
346 product_id=product.id,
347 organization_name=organization.name,
348 organization_slug=organization.slug,
349 customer_email=seat.customer.email if seat.customer else "",
350 can_claim=seat.status == SeatStatus.pending,
351 )
354@router.get("/claim/{invitation_token}/stream", include_in_schema=False) 1a
355async def claim_stream( 1ab
356 request: Request,
357 invitation_token: str,
358 session: AsyncSession = Depends(get_db_session),
359 redis: Redis = Depends(get_redis),
360) -> EventSourceResponse:
361 seat = await seat_service.get_seat_by_token(session, invitation_token)
363 if not seat or not seat.customer_id or seat.status != SeatStatus.pending:
364 raise ResourceNotFound("Invalid or expired invitation token")
366 receivers = Receivers(customer_id=seat.customer_id)
367 return EventSourceResponse(subscribe(redis, receivers.get_channels(), request))
370@router.post( 1a
371 "/claim",
372 summary="Claim Seat",
373 response_model=CustomerSeatClaimResponse,
374 responses={
375 400: {"description": "Invalid, expired, or already claimed token"},
376 403: {"description": "Seat-based pricing not enabled for organization"},
377 },
378)
379async def claim_seat( 1ab
380 seat_claim: SeatClaim,
381 request: Request,
382 session: AsyncSession = Depends(get_db_session),
383) -> CustomerSeatClaimResponse:
384 # Capture request metadata for audit logging
385 request_metadata = {
386 "user_agent": request.headers.get("user-agent"),
387 "ip": request.client.host if request.client else None,
388 }
390 seat, customer_session_token = await seat_service.claim_seat(
391 session,
392 seat_claim.invitation_token,
393 request_metadata,
394 )
396 return CustomerSeatClaimResponse(
397 seat=CustomerSeatSchema.model_validate(seat),
398 customer_session_token=customer_session_token,
399 )