Coverage for polar/customer_seat/endpoints.py: 19%

156 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from typing import Annotated, cast 1a

2 

3from fastapi import Depends, Query, Request 1a

4from pydantic import UUID4 1a

5from sqlalchemy.orm import joinedload 1a

6from sse_starlette import EventSourceResponse 1a

7 

8from polar.auth.models import Anonymous, Organization, User 1a

9from polar.auth.models import AuthSubject as AuthSubjectType 1a

10from polar.checkout.repository import CheckoutRepository 1a

11from polar.eventstream.endpoints import subscribe 1a

12from polar.eventstream.service import Receivers 1a

13from polar.exceptions import BadRequest, NotPermitted, ResourceNotFound 1a

14from polar.models import CustomerSeat, Order, Product, Subscription 1a

15from polar.models.customer_seat import SeatStatus 1a

16from polar.openapi import APITag 1a

17from polar.order.repository import OrderRepository 1a

18from polar.organization.repository import OrganizationRepository 1a

19from polar.postgres import AsyncSession, get_db_session 1a

20from polar.redis import Redis, get_redis 1a

21from polar.routing import APIRouter 1a

22from polar.subscription.repository import SubscriptionRepository 1a

23 

24from .auth import SeatWriteOrAnonymous 1a

25from .repository import CustomerSeatRepository 1a

26from .schemas import CustomerSeat as CustomerSeatSchema 1a

27from .schemas import ( 1a

28 CustomerSeatClaimResponse, 

29 SeatAssign, 

30 SeatClaim, 

31 SeatClaimInfo, 

32 SeatsList, 

33) 

34from .service import seat_service 1a

35 

36router = APIRouter( 1a

37 prefix="/customer-seats", 

38 tags=["customer-seats", APITag.public], 

39) 

40 

41 

42@router.post( 1a

43 "", 

44 summary="Assign Seat", 

45 response_model=CustomerSeatSchema, 

46 responses={ 

47 400: {"description": "No available seats or customer already has a seat"}, 

48 401: { 

49 "description": "Authentication required for direct subscription or order assignment" 

50 }, 

51 403: {"description": "Not permitted or seat-based pricing not enabled"}, 

52 404: {"description": "Subscription, order, checkout, or customer not found"}, 

53 }, 

54) 

55async def assign_seat( 1a

56 seat_assign: SeatAssign, 

57 auth_subject: SeatWriteOrAnonymous, 

58 session: AsyncSession = Depends(get_db_session), 

59) -> CustomerSeatSchema: 

60 # Prevent anonymous users from using immediate_claim 

61 if isinstance(auth_subject.subject, Anonymous) and seat_assign.immediate_claim: 

62 raise NotPermitted( 

63 "Anonymous users cannot use immediate_claim. This feature is only available for authenticated API access." 

64 ) 

65 

66 subscription: Subscription | None = None 

67 order: Order | None = None 

68 

69 if seat_assign.subscription_id: 

70 if isinstance(auth_subject.subject, Anonymous): 

71 raise NotPermitted( 

72 "Authentication required for subscription-based assignment" 

73 ) 

74 

75 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject) 

76 subscription_repository = SubscriptionRepository.from_session(session) 

77 

78 statement = ( 

79 subscription_repository.get_readable_statement(typed_auth_subject) 

80 .options(*subscription_repository.get_eager_options()) 

81 .where(Subscription.id == seat_assign.subscription_id) 

82 ) 

83 subscription = await subscription_repository.get_one_or_none(statement) 

84 

85 if not subscription: 

86 raise ResourceNotFound("Subscription not found") 

87 

88 elif seat_assign.checkout_id: 

89 subscription_repository = SubscriptionRepository.from_session(session) 

90 order_repository = OrderRepository.from_session(session) 

91 checkout_repository = CheckoutRepository.from_session(session) 

92 checkout = await checkout_repository.get_by_id(seat_assign.checkout_id) 

93 

94 if not checkout: 

95 raise ResourceNotFound("Checkout not found") 

96 

97 # Try to find subscription first (for recurring purchases) 

98 subscription = await subscription_repository.get_by_checkout_id( 

99 seat_assign.checkout_id, 

100 options=( 

101 joinedload(Subscription.product).joinedload(Product.organization), 

102 joinedload(Subscription.customer), 

103 ), 

104 ) 

105 

106 # If no subscription, try to find order (for one-time purchases) 

107 if not subscription: 

108 order = await order_repository.get_earliest_by_checkout_id( 

109 seat_assign.checkout_id, 

110 options=order_repository.get_eager_options(), 

111 ) 

112 if not order: 

113 raise ResourceNotFound( 

114 "No subscription or order found for this checkout" 

115 ) 

116 

117 elif seat_assign.order_id: 

118 if isinstance(auth_subject.subject, Anonymous): 

119 raise NotPermitted("Authentication required for order-based assignment") 

120 

121 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject) 

122 order_repository = OrderRepository.from_session(session) 

123 

124 order_statement = ( 

125 order_repository.get_readable_statement(typed_auth_subject) 

126 .options(*order_repository.get_eager_options()) 

127 .where(Order.id == seat_assign.order_id) 

128 ) 

129 order = await order_repository.get_one_or_none(order_statement) 

130 

131 if not order: 

132 raise ResourceNotFound("Order not found") 

133 

134 if not subscription and not order: 

135 raise BadRequest( 

136 "Either subscription_id, checkout_id, or order_id must be provided" 

137 ) 

138 

139 container = subscription or order 

140 assert container is not None # Already validated above 

141 

142 seat = await seat_service.assign_seat( 

143 session, 

144 container, 

145 email=seat_assign.email, 

146 external_customer_id=seat_assign.external_customer_id, 

147 customer_id=seat_assign.customer_id, 

148 metadata=seat_assign.metadata, 

149 immediate_claim=seat_assign.immediate_claim, 

150 ) 

151 

152 return CustomerSeatSchema.model_validate(seat) 

153 

154 

155@router.get( 1a

156 "", 

157 summary="List Seats", 

158 response_model=SeatsList, 

159 responses={ 

160 401: {"description": "Authentication required"}, 

161 403: {"description": "Not permitted or seat-based pricing not enabled"}, 

162 404: {"description": "Subscription or order not found"}, 

163 }, 

164) 

165async def list_seats( 1a

166 auth_subject: SeatWriteOrAnonymous, 

167 session: AsyncSession = Depends(get_db_session), 

168 subscription_id: Annotated[UUID4 | None, Query()] = None, 

169 order_id: Annotated[UUID4 | None, Query()] = None, 

170) -> SeatsList: 

171 if isinstance(auth_subject.subject, Anonymous): 

172 raise NotPermitted("Authentication required") 

173 

174 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject) 

175 

176 subscription: Subscription | None = None 

177 order: Order | None = None 

178 total_seats = 0 

179 

180 if subscription_id: 

181 subscription_repository = SubscriptionRepository.from_session(session) 

182 

183 statement = ( 

184 subscription_repository.get_readable_statement(typed_auth_subject) 

185 .options(*subscription_repository.get_eager_options()) 

186 .where(Subscription.id == subscription_id) 

187 ) 

188 subscription = await subscription_repository.get_one_or_none(statement) 

189 

190 if not subscription: 

191 raise ResourceNotFound("Subscription not found") 

192 

193 total_seats = subscription.seats or 0 

194 

195 elif order_id: 

196 order_repository = OrderRepository.from_session(session) 

197 

198 order_statement = ( 

199 order_repository.get_readable_statement(typed_auth_subject) 

200 .options(*order_repository.get_eager_options()) 

201 .where(Order.id == order_id) 

202 ) 

203 order = await order_repository.get_one_or_none(order_statement) 

204 

205 if not order: 

206 raise ResourceNotFound("Order not found") 

207 

208 total_seats = order.seats or 0 

209 

210 else: 

211 raise BadRequest("Either subscription_id or order_id must be provided") 

212 

213 container = subscription or order 

214 assert container is not None # Already validated above 

215 

216 seats = await seat_service.list_seats(session, container) 

217 available_seats = await seat_service.get_available_seats_count(session, container) 

218 

219 return SeatsList( 

220 seats=[CustomerSeatSchema.model_validate(seat) for seat in seats], 

221 available_seats=available_seats, 

222 total_seats=total_seats, 

223 ) 

224 

225 

226@router.delete( 1a

227 "/{seat_id}", 

228 summary="Revoke Seat", 

229 response_model=CustomerSeatSchema, 

230 responses={ 

231 401: {"description": "Authentication required"}, 

232 403: {"description": "Not permitted or seat-based pricing not enabled"}, 

233 404: {"description": "Seat not found"}, 

234 }, 

235) 

236async def revoke_seat( 1a

237 seat_id: UUID4, 

238 auth_subject: SeatWriteOrAnonymous, 

239 session: AsyncSession = Depends(get_db_session), 

240) -> CustomerSeat: 

241 if isinstance(auth_subject.subject, Anonymous): 

242 raise NotPermitted("Authentication required") 

243 

244 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject) 

245 seat_repository = CustomerSeatRepository.from_session(session) 

246 

247 seat = await seat_repository.get_by_id_and_auth_subject( 

248 typed_auth_subject, 

249 seat_id, 

250 options=seat_repository.get_eager_options(), 

251 ) 

252 

253 if not seat: 

254 raise ResourceNotFound("Seat not found") 

255 

256 if seat.subscription: 

257 organization_id = seat.subscription.product.organization_id 

258 elif seat.order: 

259 organization_id = seat.order.organization.id 

260 else: 

261 raise ResourceNotFound("Seat has no subscription or order") 

262 

263 await seat_service.check_seat_feature_enabled(session, organization_id) 

264 

265 return await seat_service.revoke_seat(session, seat) 

266 

267 

268@router.post( 1a

269 "/{seat_id}/resend", 

270 summary="Resend Invitation", 

271 response_model=CustomerSeatSchema, 

272 responses={ 

273 400: {"description": "Seat is not pending or already claimed"}, 

274 401: {"description": "Authentication required"}, 

275 403: {"description": "Not permitted or seat-based pricing not enabled"}, 

276 404: {"description": "Seat not found"}, 

277 }, 

278) 

279async def resend_invitation( 1a

280 seat_id: UUID4, 

281 auth_subject: SeatWriteOrAnonymous, 

282 session: AsyncSession = Depends(get_db_session), 

283) -> CustomerSeat: 

284 if isinstance(auth_subject.subject, Anonymous): 

285 raise NotPermitted("Authentication required") 

286 

287 typed_auth_subject = cast(AuthSubjectType[User | Organization], auth_subject) 

288 seat_repository = CustomerSeatRepository.from_session(session) 

289 

290 seat = await seat_repository.get_by_id_and_auth_subject( 

291 typed_auth_subject, 

292 seat_id, 

293 options=seat_repository.get_eager_options(), 

294 ) 

295 

296 if not seat: 

297 raise ResourceNotFound("Seat not found") 

298 

299 if seat.subscription: 

300 organization_id = seat.subscription.product.organization_id 

301 elif seat.order: 

302 organization_id = seat.order.organization.id 

303 else: 

304 raise ResourceNotFound("Seat has no subscription or order") 

305 

306 await seat_service.check_seat_feature_enabled(session, organization_id) 

307 

308 return await seat_service.resend_invitation(session, seat) 

309 

310 

311@router.get( 1a

312 "/claim/{invitation_token}", 

313 summary="Get Claim Info", 

314 response_model=SeatClaimInfo, 

315 responses={ 

316 400: {"description": "Invalid or expired invitation token"}, 

317 403: {"description": "Seat-based pricing not enabled for organization"}, 

318 404: {"description": "Seat not found"}, 

319 }, 

320) 

321async def get_claim_info( 1ab

322 invitation_token: str, 

323 session: AsyncSession = Depends(get_db_session), 

324) -> SeatClaimInfo: 

325 seat = await seat_service.get_seat_by_token(session, invitation_token) 

326 

327 if not seat: 

328 raise ResourceNotFound("Invalid or expired invitation token") 

329 

330 if seat.subscription: 

331 product = seat.subscription.product 

332 elif seat.order and seat.order.product: 

333 product = seat.order.product 

334 else: 

335 raise ResourceNotFound("Seat has no subscription or order") 

336 

337 await seat_service.check_seat_feature_enabled(session, product.organization_id) 

338 

339 organization_repository = OrganizationRepository.from_session(session) 

340 organization = await organization_repository.get_by_id(product.organization_id) 

341 if not organization: 

342 raise ResourceNotFound("Organization not found") 

343 

344 return SeatClaimInfo( 

345 product_name=product.name, 

346 product_id=product.id, 

347 organization_name=organization.name, 

348 organization_slug=organization.slug, 

349 customer_email=seat.customer.email if seat.customer else "", 

350 can_claim=seat.status == SeatStatus.pending, 

351 ) 

352 

353 

354@router.get("/claim/{invitation_token}/stream", include_in_schema=False) 1a

355async def claim_stream( 1ab

356 request: Request, 

357 invitation_token: str, 

358 session: AsyncSession = Depends(get_db_session), 

359 redis: Redis = Depends(get_redis), 

360) -> EventSourceResponse: 

361 seat = await seat_service.get_seat_by_token(session, invitation_token) 

362 

363 if not seat or not seat.customer_id or seat.status != SeatStatus.pending: 

364 raise ResourceNotFound("Invalid or expired invitation token") 

365 

366 receivers = Receivers(customer_id=seat.customer_id) 

367 return EventSourceResponse(subscribe(redis, receivers.get_channels(), request)) 

368 

369 

370@router.post( 1a

371 "/claim", 

372 summary="Claim Seat", 

373 response_model=CustomerSeatClaimResponse, 

374 responses={ 

375 400: {"description": "Invalid, expired, or already claimed token"}, 

376 403: {"description": "Seat-based pricing not enabled for organization"}, 

377 }, 

378) 

379async def claim_seat( 1ab

380 seat_claim: SeatClaim, 

381 request: Request, 

382 session: AsyncSession = Depends(get_db_session), 

383) -> CustomerSeatClaimResponse: 

384 # Capture request metadata for audit logging 

385 request_metadata = { 

386 "user_agent": request.headers.get("user-agent"), 

387 "ip": request.client.host if request.client else None, 

388 } 

389 

390 seat, customer_session_token = await seat_service.claim_seat( 

391 session, 

392 seat_claim.invitation_token, 

393 request_metadata, 

394 ) 

395 

396 return CustomerSeatClaimResponse( 

397 seat=CustomerSeatSchema.model_validate(seat), 

398 customer_session_token=customer_session_token, 

399 )