Coverage for polar/metrics/endpoints.py: 79%

26 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from datetime import date 1a

2from zoneinfo import ZoneInfo 1a

3 

4from fastapi import Depends, Query 1a

5from pydantic_extra_types.timezone_name import TimeZoneName 1a

6 

7from polar.customer.schemas.customer import CustomerID 1a

8from polar.exceptions import PolarRequestValidationError 1a

9from polar.kit.schemas import MultipleQueryFilter 1a

10from polar.kit.time_queries import ( 1a

11 MAX_INTERVAL_DAYS, 

12 MIN_DATE, 

13 MIN_INTERVAL_DAYS, 

14 TimeInterval, 

15 is_under_limits, 

16) 

17from polar.models.product import ProductBillingType 1a

18from polar.openapi import APITag 1a

19from polar.organization.schemas import OrganizationID 1a

20from polar.postgres import AsyncReadSession, get_db_read_session 1a

21from polar.product.schemas import ProductID 1a

22from polar.routing import APIRouter 1a

23 

24from . import auth 1a

25from .schemas import MetricsLimits, MetricsResponse 1a

26from .service import metrics as metrics_service 1a

27 

28router = APIRouter(prefix="/metrics", tags=["metrics", APITag.public, APITag.mcp]) 1a

29 

30 

31@router.get("/", summary="Get Metrics", response_model=MetricsResponse) 1a

32async def get( 1a

33 auth_subject: auth.MetricsRead, 

34 start_date: date = Query( 

35 ..., 

36 description="Start date.", 

37 ), 

38 end_date: date = Query(..., description="End date."), 

39 timezone: TimeZoneName = Query( 

40 default="UTC", 

41 description="Timezone to use for the timestamps. Default is UTC.", 

42 ), 

43 interval: TimeInterval = Query(..., description="Interval between two timestamps."), 

44 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

45 None, title="OrganizationID Filter", description="Filter by organization ID." 

46 ), 

47 product_id: MultipleQueryFilter[ProductID] | None = Query( 

48 None, title="ProductID Filter", description="Filter by product ID." 

49 ), 

50 billing_type: MultipleQueryFilter[ProductBillingType] | None = Query( 

51 None, 

52 title="ProductBillingType Filter", 

53 description=( 

54 "Filter by billing type. " 

55 "`recurring` will filter data corresponding " 

56 "to subscriptions creations or renewals. " 

57 "`one_time` will filter data corresponding to one-time purchases." 

58 ), 

59 ), 

60 customer_id: MultipleQueryFilter[CustomerID] | None = Query( 

61 None, title="CustomerID Filter", description="Filter by customer ID." 

62 ), 

63 session: AsyncReadSession = Depends(get_db_read_session), 

64) -> MetricsResponse: 

65 """ 

66 Get metrics about your orders and subscriptions. 

67 

68 Currency values are output in cents. 

69 """ 

70 if not is_under_limits(start_date, end_date, interval): 

71 raise PolarRequestValidationError( 

72 [ 

73 { 

74 "loc": ("query",), 

75 "msg": ( 

76 "The interval is too big. " 

77 "Try to change the interval or reduce the date range." 

78 ), 

79 "type": "value_error", 

80 "input": (start_date, end_date, interval), 

81 } 

82 ] 

83 ) 

84 

85 return await metrics_service.get_metrics( 

86 session, 

87 auth_subject, 

88 start_date=start_date, 

89 end_date=end_date, 

90 timezone=ZoneInfo(timezone), 

91 interval=interval, 

92 organization_id=organization_id, 

93 product_id=product_id, 

94 billing_type=billing_type, 

95 customer_id=customer_id, 

96 ) 

97 

98 

99@router.get("/limits", summary="Get Metrics Limits", response_model=MetricsLimits) 1a

100async def limits(auth_subject: auth.MetricsRead) -> MetricsLimits: 1a

101 """Get the interval limits for the metrics endpoint.""" 

102 return MetricsLimits.model_validate( 

103 { 

104 "min_date": MIN_DATE, 

105 "intervals": { 

106 interval.value: { 

107 "min_days": MIN_INTERVAL_DAYS[interval], 

108 "max_days": MAX_INTERVAL_DAYS[interval], 

109 } 

110 for interval in TimeInterval 

111 }, 

112 } 

113 )