Coverage for polar/integrations/stripe/endpoints.py: 51%
43 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1import stripe 1a
2import structlog 1a
3from fastapi import Depends, HTTPException, Query, Request 1a
4from starlette.responses import RedirectResponse 1a
6from polar.config import settings 1a
7from polar.external_event.service import external_event as external_event_service 1a
8from polar.models.external_event import ExternalEventSource 1a
9from polar.postgres import AsyncSession, get_db_session 1a
10from polar.routing import APIRouter 1a
12log = structlog.get_logger() 1a
14stripe.api_key = settings.STRIPE_SECRET_KEY 1a
16router = APIRouter( 1a
17 prefix="/integrations/stripe", tags=["integrations_stripe"], include_in_schema=False
18)
21DIRECT_IMPLEMENTED_WEBHOOKS = { 1a
22 "payment_intent.succeeded",
23 "payment_intent.payment_failed",
24 "setup_intent.succeeded",
25 "setup_intent.setup_failed",
26 "charge.pending",
27 "charge.failed",
28 "charge.succeeded",
29 "charge.dispute.closed",
30 "refund.created",
31 "refund.updated",
32 "refund.failed",
33 "customer.subscription.updated",
34 "customer.subscription.deleted",
35 "invoice.created",
36 "invoice.paid",
37 "identity.verification_session.verified",
38 "identity.verification_session.processing",
39 "identity.verification_session.requires_input",
40}
41CONNECT_IMPLEMENTED_WEBHOOKS = {"account.updated", "payout.updated", "payout.paid"} 1a
44async def enqueue(session: AsyncSession, event: stripe.Event) -> None: 1a
45 event_type: str = event["type"]
46 task_name = f"stripe.webhook.{event_type}"
47 await external_event_service.enqueue(
48 session, ExternalEventSource.stripe, task_name, event.id, event
49 )
52@router.get("/refresh", name="integrations.stripe.refresh") 1a
53async def stripe_connect_refresh( 1ab
54 return_path: str | None = Query(None),
55) -> RedirectResponse:
56 if return_path is None:
57 raise HTTPException(404)
58 return RedirectResponse(settings.generate_frontend_url(return_path))
61class WebhookEventGetter: 1a
62 def __init__(self, secret: str) -> None: 1a
63 self.secret = secret 1a
65 async def __call__(self, request: Request) -> stripe.Event: 1a
66 payload = await request.body()
67 sig_header = request.headers["Stripe-Signature"]
69 try:
70 return stripe.Webhook.construct_event(payload, sig_header, self.secret)
71 except ValueError as e:
72 raise HTTPException(status_code=400) from e
73 except stripe.SignatureVerificationError as e:
74 raise HTTPException(status_code=401) from e
77@router.post("/webhook", status_code=202, name="integrations.stripe.webhook") 1a
78async def webhook( 1ab
79 session: AsyncSession = Depends(get_db_session),
80 event: stripe.Event = Depends(WebhookEventGetter(settings.STRIPE_WEBHOOK_SECRET)),
81) -> None:
82 if event["type"] in DIRECT_IMPLEMENTED_WEBHOOKS:
83 await enqueue(session, event)
86@router.post( 1a
87 "/webhook-connect", status_code=202, name="integrations.stripe.webhook_connect"
88)
89async def webhook_connect( 1ab
90 session: AsyncSession = Depends(get_db_session),
91 event: stripe.Event = Depends(
92 WebhookEventGetter(settings.STRIPE_CONNECT_WEBHOOK_SECRET)
93 ),
94) -> None:
95 if event["type"] in CONNECT_IMPLEMENTED_WEBHOOKS:
96 return await enqueue(session, event)