Coverage for polar/metrics/endpoints.py: 79%
26 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1from datetime import date 1a
2from zoneinfo import ZoneInfo 1a
4from fastapi import Depends, Query 1a
5from pydantic_extra_types.timezone_name import TimeZoneName 1a
7from polar.customer.schemas.customer import CustomerID 1a
8from polar.exceptions import PolarRequestValidationError 1a
9from polar.kit.schemas import MultipleQueryFilter 1a
10from polar.kit.time_queries import ( 1a
11 MAX_INTERVAL_DAYS,
12 MIN_DATE,
13 MIN_INTERVAL_DAYS,
14 TimeInterval,
15 is_under_limits,
16)
17from polar.models.product import ProductBillingType 1a
18from polar.openapi import APITag 1a
19from polar.organization.schemas import OrganizationID 1a
20from polar.postgres import AsyncReadSession, get_db_read_session 1a
21from polar.product.schemas import ProductID 1a
22from polar.routing import APIRouter 1a
24from . import auth 1a
25from .schemas import MetricsLimits, MetricsResponse 1a
26from .service import metrics as metrics_service 1a
28router = APIRouter(prefix="/metrics", tags=["metrics", APITag.public, APITag.mcp]) 1a
31@router.get("/", summary="Get Metrics", response_model=MetricsResponse) 1a
32async def get( 1a
33 auth_subject: auth.MetricsRead,
34 start_date: date = Query(
35 ...,
36 description="Start date.",
37 ),
38 end_date: date = Query(..., description="End date."),
39 timezone: TimeZoneName = Query(
40 default="UTC",
41 description="Timezone to use for the timestamps. Default is UTC.",
42 ),
43 interval: TimeInterval = Query(..., description="Interval between two timestamps."),
44 organization_id: MultipleQueryFilter[OrganizationID] | None = Query(
45 None, title="OrganizationID Filter", description="Filter by organization ID."
46 ),
47 product_id: MultipleQueryFilter[ProductID] | None = Query(
48 None, title="ProductID Filter", description="Filter by product ID."
49 ),
50 billing_type: MultipleQueryFilter[ProductBillingType] | None = Query(
51 None,
52 title="ProductBillingType Filter",
53 description=(
54 "Filter by billing type. "
55 "`recurring` will filter data corresponding "
56 "to subscriptions creations or renewals. "
57 "`one_time` will filter data corresponding to one-time purchases."
58 ),
59 ),
60 customer_id: MultipleQueryFilter[CustomerID] | None = Query(
61 None, title="CustomerID Filter", description="Filter by customer ID."
62 ),
63 session: AsyncReadSession = Depends(get_db_read_session),
64) -> MetricsResponse:
65 """
66 Get metrics about your orders and subscriptions.
68 Currency values are output in cents.
69 """
70 if not is_under_limits(start_date, end_date, interval):
71 raise PolarRequestValidationError(
72 [
73 {
74 "loc": ("query",),
75 "msg": (
76 "The interval is too big. "
77 "Try to change the interval or reduce the date range."
78 ),
79 "type": "value_error",
80 "input": (start_date, end_date, interval),
81 }
82 ]
83 )
85 return await metrics_service.get_metrics(
86 session,
87 auth_subject,
88 start_date=start_date,
89 end_date=end_date,
90 timezone=ZoneInfo(timezone),
91 interval=interval,
92 organization_id=organization_id,
93 product_id=product_id,
94 billing_type=billing_type,
95 customer_id=customer_id,
96 )
99@router.get("/limits", summary="Get Metrics Limits", response_model=MetricsLimits) 1a
100async def limits(auth_subject: auth.MetricsRead) -> MetricsLimits: 1a
101 """Get the interval limits for the metrics endpoint."""
102 return MetricsLimits.model_validate(
103 {
104 "min_date": MIN_DATE,
105 "intervals": {
106 interval.value: {
107 "min_days": MIN_INTERVAL_DAYS[interval],
108 "max_days": MAX_INTERVAL_DAYS[interval],
109 }
110 for interval in TimeInterval
111 },
112 }
113 )