Coverage for polar/order/endpoints.py: 49%

61 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from collections.abc import AsyncGenerator 1a

2 

3from fastapi import Depends, Query, Response 1a

4from fastapi.responses import StreamingResponse 1a

5from pydantic import UUID4 1a

6 

7from polar.customer.schemas.customer import CustomerID 1a

8from polar.exceptions import ResourceNotFound 1a

9from polar.kit.csv import IterableCSVWriter 1a

10from polar.kit.metadata import MetadataQuery, get_metadata_query_openapi_schema 1a

11from polar.kit.pagination import ListResource, PaginationParams, PaginationParamsQuery 1a

12from polar.kit.schemas import MultipleQueryFilter 1a

13from polar.models import Order 1a

14from polar.models.product import ProductBillingType 1a

15from polar.openapi import APITag 1a

16from polar.organization.schemas import OrganizationID 1a

17from polar.postgres import ( 1a

18 AsyncReadSession, 

19 AsyncSession, 

20 get_db_read_session, 

21 get_db_session, 

22) 

23from polar.product.schemas import ProductID 1a

24from polar.routing import APIRouter 1a

25 

26from . import auth, sorting 1a

27from .schemas import Order as OrderSchema 1a

28from .schemas import OrderID, OrderInvoice, OrderNotFound, OrderUpdate 1a

29from .service import MissingInvoiceBillingDetails, NotPaidOrder 1a

30from .service import order as order_service 1a

31 

32router = APIRouter(prefix="/orders", tags=["orders", APITag.public, APITag.mcp]) 1a

33 

34 

35@router.get( 1a

36 "/", 

37 summary="List Orders", 

38 response_model=ListResource[OrderSchema], 

39 openapi_extra={"parameters": [get_metadata_query_openapi_schema()]}, 

40) 

41async def list( 1a

42 auth_subject: auth.OrdersRead, 

43 pagination: PaginationParamsQuery, 

44 sorting: sorting.ListSorting, 

45 metadata: MetadataQuery, 

46 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

47 None, title="OrganizationID Filter", description="Filter by organization ID." 

48 ), 

49 product_id: MultipleQueryFilter[ProductID] | None = Query( 

50 None, title="ProductID Filter", description="Filter by product ID." 

51 ), 

52 product_billing_type: MultipleQueryFilter[ProductBillingType] | None = Query( 

53 None, 

54 title="ProductBillingType Filter", 

55 description=( 

56 "Filter by product billing type. " 

57 "`recurring` will filter data corresponding " 

58 "to subscriptions creations or renewals. " 

59 "`one_time` will filter data corresponding to one-time purchases." 

60 ), 

61 ), 

62 discount_id: MultipleQueryFilter[UUID4] | None = Query( 

63 None, title="DiscountID Filter", description="Filter by discount ID." 

64 ), 

65 customer_id: MultipleQueryFilter[CustomerID] | None = Query( 

66 None, title="CustomerID Filter", description="Filter by customer ID." 

67 ), 

68 checkout_id: MultipleQueryFilter[UUID4] | None = Query( 

69 None, title="CheckoutID Filter", description="Filter by checkout ID." 

70 ), 

71 session: AsyncReadSession = Depends(get_db_read_session), 

72) -> ListResource[OrderSchema]: 

73 """List orders.""" 

74 results, count = await order_service.list( 

75 session, 

76 auth_subject, 

77 organization_id=organization_id, 

78 product_id=product_id, 

79 product_billing_type=product_billing_type, 

80 discount_id=discount_id, 

81 customer_id=customer_id, 

82 checkout_id=checkout_id, 

83 metadata=metadata, 

84 pagination=pagination, 

85 sorting=sorting, 

86 ) 

87 

88 return ListResource.from_paginated_results( 

89 [OrderSchema.model_validate(result) for result in results], 

90 count, 

91 pagination, 

92 ) 

93 

94 

95@router.get("/export", summary="Export Subscriptions") 1a

96async def export( 1a

97 auth_subject: auth.OrdersRead, 

98 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

99 None, title="OrganizationID Filter", description="Filter by organization ID." 

100 ), 

101 product_id: MultipleQueryFilter[ProductID] | None = Query( 

102 None, title="ProductID Filter", description="Filter by product ID." 

103 ), 

104 session: AsyncReadSession = Depends(get_db_read_session), 

105) -> Response: 

106 """Export orders as a CSV file.""" 

107 

108 async def create_csv() -> AsyncGenerator[str, None]: 

109 csv_writer = IterableCSVWriter(dialect="excel") 

110 # CSV header 

111 yield csv_writer.getrow( 

112 ( 

113 "Email", 

114 "Created At", 

115 "Product", 

116 "Amount", 

117 "Currency", 

118 "Status", 

119 "Invoice number", 

120 ) 

121 ) 

122 

123 (results, _) = await order_service.list( 

124 session, 

125 auth_subject, 

126 organization_id=organization_id, 

127 product_id=product_id, 

128 pagination=PaginationParams(limit=1000000, page=1), 

129 ) 

130 

131 for order in results: 

132 yield csv_writer.getrow( 

133 ( 

134 order.customer.email, 

135 order.created_at.isoformat(), 

136 order.description, 

137 order.net_amount / 100, 

138 order.currency, 

139 order.status, 

140 order.invoice_number, 

141 ) 

142 ) 

143 

144 filename = "polar-orders.csv" 

145 return StreamingResponse( 

146 create_csv(), 

147 media_type="text/csv", 

148 headers={"Content-Disposition": f"attachment; filename={filename}"}, 

149 ) 

150 

151 

152@router.get( 1a

153 "/{id}", 

154 summary="Get Order", 

155 response_model=OrderSchema, 

156 responses={404: OrderNotFound}, 

157) 

158async def get( 1a

159 id: OrderID, 

160 auth_subject: auth.OrdersRead, 

161 session: AsyncReadSession = Depends(get_db_read_session), 

162) -> Order: 

163 """Get an order by ID.""" 

164 order = await order_service.get(session, auth_subject, id) 

165 

166 if order is None: 

167 raise ResourceNotFound() 

168 

169 return order 

170 

171 

172@router.patch( 1a

173 "/{id}", 

174 summary="Update Order", 

175 response_model=OrderSchema, 

176 responses={404: OrderNotFound}, 

177) 

178async def update( 1a

179 id: OrderID, 

180 order_update: OrderUpdate, 

181 auth_subject: auth.OrdersWrite, 

182 session: AsyncSession = Depends(get_db_session), 

183) -> Order: 

184 """Update an order.""" 

185 order = await order_service.get(session, auth_subject, id) 

186 

187 if order is None: 

188 raise ResourceNotFound() 

189 

190 return await order_service.update(session, order, order_update) 

191 

192 

193@router.post( 1a

194 "/{id}/invoice", 

195 status_code=202, 

196 summary="Generate Order Invoice", 

197 responses={ 

198 422: { 

199 "description": "Order is not paid or is missing billing name or address.", 

200 "model": MissingInvoiceBillingDetails.schema() | NotPaidOrder.schema(), 

201 }, 

202 }, 

203) 

204async def generate_invoice( 1a

205 id: OrderID, 

206 auth_subject: auth.OrdersRead, 

207 session: AsyncSession = Depends(get_db_session), 

208) -> None: 

209 """Trigger generation of an order's invoice.""" 

210 order = await order_service.get(session, auth_subject, id) 

211 

212 if order is None: 

213 raise ResourceNotFound() 

214 

215 await order_service.trigger_invoice_generation(session, order) 

216 

217 

218@router.get( 1a

219 "/{id}/invoice", 

220 summary="Get Order Invoice", 

221 response_model=OrderInvoice, 

222 responses={404: OrderNotFound}, 

223) 

224async def invoice( 1a

225 id: OrderID, 

226 auth_subject: auth.OrdersRead, 

227 session: AsyncReadSession = Depends(get_db_read_session), 

228) -> OrderInvoice: 

229 """Get an order's invoice data.""" 

230 order = await order_service.get(session, auth_subject, id) 

231 

232 if order is None: 

233 raise ResourceNotFound() 

234 

235 return await order_service.get_order_invoice(order)