Coverage for polar/metrics/queries.py: 18%
131 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import uuid 1a
2from collections.abc import Generator, Sequence 1a
3from datetime import datetime 1a
4from enum import StrEnum 1a
5from typing import TYPE_CHECKING, Protocol, cast 1a
7from sqlalchemy import ( 1a
8 CTE,
9 ColumnElement,
10 Select,
11 SQLColumnExpression,
12 and_,
13 cte,
14 func,
15 literal,
16 or_,
17 select,
18)
20from polar.auth.models import AuthSubject, is_organization, is_user 1a
21from polar.kit.time_queries import TimeInterval 1a
22from polar.models import ( 1a
23 Checkout,
24 CheckoutProduct,
25 Customer,
26 Event,
27 Order,
28 Organization,
29 Product,
30 Subscription,
31 User,
32 UserOrganization,
33)
34from polar.models.product import ProductBillingType 1a
36if TYPE_CHECKING: 36 ↛ 37line 36 didn't jump to line 37 because the condition on line 36 was never true1a
37 from .metrics import SQLMetric
40class MetricQuery(StrEnum): 1a
41 orders = "orders" 1a
42 active_subscriptions = "active_subscriptions" 1a
43 checkouts = "checkouts" 1a
44 canceled_subscriptions = "canceled_subscriptions" 1a
45 churned_subscriptions = "churned_subscriptions" 1a
46 events = "events" 1a
49def _get_metrics_columns( 1a
50 metric_cte: MetricQuery,
51 timestamp_column: ColumnElement[datetime],
52 interval: TimeInterval,
53 metrics: list["type[SQLMetric]"],
54 now: datetime,
55) -> Generator[ColumnElement[int] | ColumnElement[float], None, None]:
56 return (
57 func.coalesce(
58 metric.get_sql_expression(timestamp_column, interval, now), 0
59 ).label(metric.slug)
60 for metric in metrics
61 if metric.query == metric_cte
62 )
65class QueryCallable(Protocol): 1a
66 def __call__( 66 ↛ exitline 66 didn't return from function '__call__' because 1a
67 self,
68 timestamp_series: CTE,
69 interval: TimeInterval,
70 auth_subject: AuthSubject[User | Organization],
71 metrics: list["type[SQLMetric]"],
72 now: datetime,
73 *,
74 bounds: tuple[datetime, datetime],
75 organization_id: Sequence[uuid.UUID] | None = None,
76 product_id: Sequence[uuid.UUID] | None = None,
77 billing_type: Sequence[ProductBillingType] | None = None,
78 customer_id: Sequence[uuid.UUID] | None = None,
79 ) -> CTE: ...
82def _get_readable_orders_statement( 1a
83 auth_subject: AuthSubject[User | Organization],
84 *,
85 organization_id: Sequence[uuid.UUID] | None = None,
86 product_id: Sequence[uuid.UUID] | None = None,
87 billing_type: Sequence[ProductBillingType] | None = None,
88 customer_id: Sequence[uuid.UUID] | None = None,
89) -> Select[tuple[uuid.UUID]]:
90 statement = select(Order.id).join(
91 Product, onclause=Order.product_id == Product.id, isouter=True
92 )
94 if is_user(auth_subject):
95 statement = statement.where(
96 Product.organization_id.in_(
97 select(UserOrganization.organization_id).where(
98 UserOrganization.user_id == auth_subject.subject.id,
99 UserOrganization.deleted_at.is_(None),
100 )
101 )
102 )
103 elif is_organization(auth_subject):
104 statement = statement.where(Product.organization_id == auth_subject.subject.id)
106 if organization_id is not None:
107 statement = statement.where(Product.organization_id.in_(organization_id))
109 if product_id is not None:
110 statement = statement.where(Order.product_id.in_(product_id))
112 if billing_type is not None:
113 statement = statement.where(Product.billing_type.in_(billing_type))
115 if customer_id is not None:
116 statement = statement.join(
117 Customer,
118 onclause=Order.customer_id == Customer.id,
119 ).where(Customer.id.in_(customer_id))
121 return statement
124def get_orders_metrics_cte( 1a
125 timestamp_series: CTE,
126 interval: TimeInterval,
127 auth_subject: AuthSubject[User | Organization],
128 metrics: list["type[SQLMetric]"],
129 now: datetime,
130 *,
131 bounds: tuple[datetime, datetime],
132 organization_id: Sequence[uuid.UUID] | None = None,
133 product_id: Sequence[uuid.UUID] | None = None,
134 billing_type: Sequence[ProductBillingType] | None = None,
135 customer_id: Sequence[uuid.UUID] | None = None,
136) -> CTE:
137 start_timestamp, end_timestamp = bounds
138 timestamp_column: ColumnElement[datetime] = timestamp_series.c.timestamp
140 readable_orders_statement = _get_readable_orders_statement(
141 auth_subject,
142 organization_id=organization_id,
143 product_id=product_id,
144 billing_type=billing_type,
145 customer_id=customer_id,
146 )
148 day_column = interval.sql_date_trunc(Order.created_at)
150 cumulative_metrics = ["cumulative_revenue", "net_cumulative_revenue"]
151 cumulative_metrics_to_compute = [
152 m
153 for m in metrics
154 if m.query == MetricQuery.orders and m.slug in cumulative_metrics
155 ]
157 min_timestamp_subquery = select(
158 func.min(timestamp_series.c.timestamp)
159 ).scalar_subquery()
161 # Only create historical baseline CTE if we have cumulative metrics
162 historical_baseline = None
163 if any(m.slug in cumulative_metrics for m in metrics):
164 historical_baseline = cte(
165 select(
166 func.coalesce(func.sum(Order.net_amount), 0).label(
167 "hist_cumulative_revenue"
168 ),
169 func.coalesce(func.sum(Order.payout_amount), 0).label(
170 "hist_net_cumulative_revenue"
171 ),
172 )
173 .select_from(Order)
174 .where(
175 Order.paid.is_(True),
176 Order.id.in_(readable_orders_statement),
177 Order.created_at < start_timestamp,
178 )
179 )
181 daily_metrics = cte(
182 select(
183 day_column.label("day"),
184 *[
185 func.coalesce(
186 metric.get_sql_expression(day_column, interval, now), 0
187 ).label(metric.slug)
188 for metric in metrics
189 if metric.query == MetricQuery.orders
190 ],
191 )
192 .select_from(Order)
193 .join(
194 Subscription,
195 isouter=True,
196 onclause=Order.subscription_id == Subscription.id,
197 )
198 .where(
199 Order.paid.is_(True),
200 Order.id.in_(readable_orders_statement),
201 Order.created_at >= start_timestamp,
202 Order.created_at <= end_timestamp,
203 )
204 .group_by(day_column)
205 )
207 # Build from clause with conditional cross join
208 from_clause = timestamp_series.join(
209 daily_metrics,
210 onclause=daily_metrics.c.day == timestamp_column,
211 isouter=True,
212 )
214 if historical_baseline is not None:
215 # Cross join: every row gets the historical baseline values
216 from_clause = from_clause.join(
217 historical_baseline,
218 isouter=False,
219 onclause=literal(True), # This creates a cross join (cartesian product)
220 )
222 return cte(
223 select(
224 timestamp_column.label("timestamp"),
225 *[
226 (
227 func.coalesce(
228 func.sum(getattr(daily_metrics.c, metric.slug)).over(
229 order_by=timestamp_column
230 ),
231 0,
232 )
233 + (
234 getattr(historical_baseline.c, f"hist_{metric.slug}")
235 if historical_baseline is not None
236 else 0
237 )
238 if metric.slug in cumulative_metrics
239 else func.coalesce(getattr(daily_metrics.c, metric.slug), 0)
240 ).label(metric.slug)
241 for metric in metrics
242 if metric.query == MetricQuery.orders
243 ],
244 )
245 .select_from(from_clause)
246 .order_by(timestamp_column.asc())
247 )
250def get_active_subscriptions_cte( 1a
251 timestamp_series: CTE,
252 interval: TimeInterval,
253 auth_subject: AuthSubject[User | Organization],
254 metrics: list["type[SQLMetric]"],
255 now: datetime,
256 *,
257 bounds: tuple[datetime, datetime],
258 organization_id: Sequence[uuid.UUID] | None = None,
259 product_id: Sequence[uuid.UUID] | None = None,
260 billing_type: Sequence[ProductBillingType] | None = None,
261 customer_id: Sequence[uuid.UUID] | None = None,
262) -> CTE:
263 start_timestamp, end_timestamp = bounds
264 timestamp_column: ColumnElement[datetime] = timestamp_series.c.timestamp
266 readable_subscriptions_statement = _get_readable_subscriptions_statement(
267 auth_subject,
268 organization_id=organization_id,
269 product_id=product_id,
270 billing_type=billing_type,
271 customer_id=customer_id,
272 )
274 return cte(
275 select(
276 timestamp_column.label("timestamp"),
277 *_get_metrics_columns(
278 MetricQuery.active_subscriptions,
279 timestamp_column,
280 interval,
281 metrics,
282 now,
283 ),
284 )
285 .select_from(
286 timestamp_series.join(
287 Subscription,
288 isouter=True,
289 onclause=and_(
290 or_(
291 Subscription.started_at.is_(None),
292 interval.sql_date_trunc(
293 cast(SQLColumnExpression[datetime], Subscription.started_at)
294 )
295 <= interval.sql_date_trunc(timestamp_column),
296 ),
297 or_(
298 func.coalesce(Subscription.ended_at, Subscription.ends_at).is_(
299 None
300 ),
301 interval.sql_date_trunc(
302 cast(
303 SQLColumnExpression[datetime],
304 func.coalesce(
305 Subscription.ended_at, Subscription.ends_at
306 ),
307 )
308 )
309 > interval.sql_date_trunc(timestamp_column),
310 ),
311 Subscription.id.in_(readable_subscriptions_statement),
312 # Filter to only include subscriptions that overlap with the original bounds
313 or_(
314 Subscription.started_at.is_(None),
315 Subscription.started_at <= end_timestamp,
316 ),
317 or_(
318 func.coalesce(Subscription.ended_at, Subscription.ends_at).is_(
319 None
320 ),
321 func.coalesce(Subscription.ended_at, Subscription.ends_at)
322 >= start_timestamp,
323 ),
324 ),
325 )
326 )
327 .group_by(timestamp_column)
328 .order_by(timestamp_column.asc())
329 )
332def _get_readable_subscriptions_statement( 1a
333 auth_subject: AuthSubject[User | Organization],
334 *,
335 organization_id: Sequence[uuid.UUID] | None = None,
336 product_id: Sequence[uuid.UUID] | None = None,
337 billing_type: Sequence[ProductBillingType] | None = None,
338 customer_id: Sequence[uuid.UUID] | None = None,
339) -> Select[tuple[uuid.UUID]]:
340 statement = select(Subscription.id).join(
341 Product, onclause=Subscription.product_id == Product.id
342 )
344 if is_user(auth_subject):
345 statement = statement.where(
346 Product.organization_id.in_(
347 select(UserOrganization.organization_id).where(
348 UserOrganization.user_id == auth_subject.subject.id,
349 UserOrganization.deleted_at.is_(None),
350 )
351 )
352 )
353 elif is_organization(auth_subject):
354 statement = statement.where(Product.organization_id == auth_subject.subject.id)
356 if organization_id is not None:
357 statement = statement.where(Product.organization_id.in_(organization_id))
359 if product_id is not None:
360 statement = statement.where(Subscription.product_id.in_(product_id))
362 if billing_type is not None:
363 statement = statement.where(Product.billing_type.in_(billing_type))
365 if customer_id is not None:
366 statement = statement.join(
367 Customer,
368 onclause=Subscription.customer_id == Customer.id,
369 ).where(Customer.id.in_(customer_id))
371 return statement
374def get_checkouts_cte( 1a
375 timestamp_series: CTE,
376 interval: TimeInterval,
377 auth_subject: AuthSubject[User | Organization],
378 metrics: list["type[SQLMetric]"],
379 now: datetime,
380 *,
381 bounds: tuple[datetime, datetime],
382 organization_id: Sequence[uuid.UUID] | None = None,
383 product_id: Sequence[uuid.UUID] | None = None,
384 billing_type: Sequence[ProductBillingType] | None = None,
385 customer_id: Sequence[uuid.UUID] | None = None,
386) -> CTE:
387 start_timestamp, end_timestamp = bounds
388 timestamp_column: ColumnElement[datetime] = timestamp_series.c.timestamp
390 readable_checkouts_statement = (
391 select(Checkout.id)
392 .join(CheckoutProduct, CheckoutProduct.checkout_id == Checkout.id)
393 .join(Product, onclause=CheckoutProduct.product_id == Product.id)
394 )
396 if is_user(auth_subject):
397 readable_checkouts_statement = readable_checkouts_statement.where(
398 Product.organization_id.in_(
399 select(UserOrganization.organization_id).where(
400 UserOrganization.user_id == auth_subject.subject.id,
401 UserOrganization.deleted_at.is_(None),
402 )
403 )
404 )
405 elif is_organization(auth_subject):
406 readable_checkouts_statement = readable_checkouts_statement.where(
407 Product.organization_id == auth_subject.subject.id
408 )
410 if organization_id is not None:
411 readable_checkouts_statement = readable_checkouts_statement.where(
412 Product.organization_id.in_(organization_id)
413 )
415 if product_id is not None:
416 readable_checkouts_statement = readable_checkouts_statement.where(
417 CheckoutProduct.product_id.in_(product_id)
418 )
420 if billing_type is not None:
421 readable_checkouts_statement = readable_checkouts_statement.where(
422 Product.billing_type.in_(billing_type)
423 )
425 if customer_id is not None:
426 readable_checkouts_statement = readable_checkouts_statement.where(
427 Checkout.customer_id.in_(customer_id)
428 )
430 return cte(
431 select(
432 timestamp_column.label("timestamp"),
433 *_get_metrics_columns(
434 MetricQuery.checkouts, timestamp_column, interval, metrics, now
435 ),
436 )
437 .select_from(
438 timestamp_series.join(
439 Checkout,
440 isouter=True,
441 onclause=and_(
442 interval.sql_date_trunc(Checkout.created_at)
443 == interval.sql_date_trunc(timestamp_column),
444 Checkout.id.in_(readable_checkouts_statement),
445 Checkout.created_at >= start_timestamp,
446 Checkout.created_at <= end_timestamp,
447 ),
448 )
449 )
450 .group_by(timestamp_column)
451 .order_by(timestamp_column.asc())
452 )
455def get_canceled_subscriptions_cte( 1a
456 timestamp_series: CTE,
457 interval: TimeInterval,
458 auth_subject: AuthSubject[User | Organization],
459 metrics: list["type[SQLMetric]"],
460 now: datetime,
461 *,
462 bounds: tuple[datetime, datetime],
463 organization_id: Sequence[uuid.UUID] | None = None,
464 product_id: Sequence[uuid.UUID] | None = None,
465 billing_type: Sequence[ProductBillingType] | None = None,
466 customer_id: Sequence[uuid.UUID] | None = None,
467) -> CTE:
468 start_timestamp, end_timestamp = bounds
469 timestamp_column: ColumnElement[datetime] = timestamp_series.c.timestamp
471 readable_subscriptions_statement = _get_readable_subscriptions_statement(
472 auth_subject,
473 organization_id=organization_id,
474 product_id=product_id,
475 billing_type=billing_type,
476 customer_id=customer_id,
477 )
479 return cte(
480 select(
481 timestamp_column.label("timestamp"),
482 *_get_metrics_columns(
483 MetricQuery.canceled_subscriptions,
484 timestamp_column,
485 interval,
486 metrics,
487 now,
488 ),
489 )
490 .select_from(
491 timestamp_series.join(
492 Subscription,
493 isouter=True,
494 onclause=and_(
495 Subscription.canceled_at.is_not(None),
496 interval.sql_date_trunc(
497 cast(SQLColumnExpression[datetime], Subscription.canceled_at)
498 )
499 == interval.sql_date_trunc(timestamp_column),
500 Subscription.id.in_(readable_subscriptions_statement),
501 Subscription.canceled_at >= start_timestamp,
502 Subscription.canceled_at <= end_timestamp,
503 ),
504 )
505 )
506 .group_by(timestamp_column)
507 .order_by(timestamp_column.asc())
508 )
511def get_churned_subscriptions_cte( 1a
512 timestamp_series: CTE,
513 interval: TimeInterval,
514 auth_subject: AuthSubject[User | Organization],
515 metrics: list["type[SQLMetric]"],
516 now: datetime,
517 *,
518 bounds: tuple[datetime, datetime],
519 organization_id: Sequence[uuid.UUID] | None = None,
520 product_id: Sequence[uuid.UUID] | None = None,
521 billing_type: Sequence[ProductBillingType] | None = None,
522 customer_id: Sequence[uuid.UUID] | None = None,
523) -> CTE:
524 start_timestamp, end_timestamp = bounds
525 timestamp_column: ColumnElement[datetime] = timestamp_series.c.timestamp
527 readable_subscriptions_statement = _get_readable_subscriptions_statement(
528 auth_subject,
529 organization_id=organization_id,
530 product_id=product_id,
531 billing_type=billing_type,
532 customer_id=customer_id,
533 )
535 return cte(
536 select(
537 timestamp_column.label("timestamp"),
538 *_get_metrics_columns(
539 MetricQuery.churned_subscriptions,
540 timestamp_column,
541 interval,
542 metrics,
543 now,
544 ),
545 )
546 .select_from(
547 timestamp_series.join(
548 Subscription,
549 isouter=True,
550 onclause=and_(
551 func.coalesce(Subscription.ended_at, Subscription.ends_at).is_not(
552 None
553 ),
554 interval.sql_date_trunc(
555 cast(
556 SQLColumnExpression[datetime],
557 func.coalesce(Subscription.ended_at, Subscription.ends_at),
558 )
559 )
560 == interval.sql_date_trunc(timestamp_column),
561 Subscription.id.in_(readable_subscriptions_statement),
562 func.coalesce(Subscription.ended_at, Subscription.ends_at)
563 >= start_timestamp,
564 func.coalesce(Subscription.ended_at, Subscription.ends_at)
565 <= end_timestamp,
566 ),
567 )
568 )
569 .group_by(timestamp_column)
570 .order_by(timestamp_column.asc())
571 )
574def _get_readable_cost_events_statement( 1a
575 *,
576 auth_subject: AuthSubject[User | Organization],
577 organization_id: Sequence[uuid.UUID] | None = None,
578 customer_id: Sequence[uuid.UUID] | None = None,
579) -> Select[tuple[uuid.UUID]]:
580 statement = select(Event.id).where(Event.user_metadata["_cost"].is_not(None))
582 if is_user(auth_subject):
583 statement = statement.where(
584 Event.organization_id.in_(
585 select(UserOrganization.organization_id).where(
586 UserOrganization.user_id == auth_subject.subject.id,
587 UserOrganization.deleted_at.is_(None),
588 )
589 )
590 )
591 elif is_organization(auth_subject):
592 statement = statement.where(Event.organization_id == auth_subject.subject.id)
594 if organization_id is not None:
595 statement = statement.where(Event.organization_id.in_(organization_id))
597 if customer_id is not None:
598 statement = statement.join(
599 Customer,
600 onclause=or_(
601 Event.customer_id == Customer.id,
602 and_(
603 Customer.external_id.is_not(None),
604 Event.external_customer_id == Customer.external_id,
605 Event.organization_id == Customer.organization_id,
606 ),
607 ),
608 ).where(Customer.id.in_(customer_id))
610 return statement
613def get_events_metrics_cte( 1a
614 timestamp_series: CTE,
615 interval: TimeInterval,
616 auth_subject: AuthSubject[User | Organization],
617 metrics: list["type[SQLMetric]"],
618 now: datetime,
619 *,
620 bounds: tuple[datetime, datetime],
621 organization_id: Sequence[uuid.UUID] | None = None,
622 customer_id: Sequence[uuid.UUID] | None = None,
623 product_id: Sequence[uuid.UUID] | None = None,
624 billing_type: Sequence[ProductBillingType] | None = None,
625) -> CTE:
626 start_timestamp, end_timestamp = bounds
627 timestamp_column: ColumnElement[datetime] = timestamp_series.c.timestamp
629 readable_events_statement = _get_readable_events_statement(
630 auth_subject,
631 organization_id=organization_id,
632 customer_id=customer_id,
633 )
635 day_column = interval.sql_date_trunc(Event.timestamp)
637 daily_metrics = cte(
638 select(
639 day_column.label("day"),
640 *[
641 func.coalesce(
642 metric.get_sql_expression(day_column, interval, now), 0
643 ).label(metric.slug)
644 for metric in metrics
645 if metric.query == MetricQuery.events
646 ],
647 )
648 .select_from(Event)
649 .where(
650 Event.id.in_(readable_events_statement),
651 Event.timestamp >= start_timestamp,
652 Event.timestamp <= end_timestamp,
653 )
654 .group_by(day_column)
655 )
657 return cte(
658 select(
659 timestamp_column.label("timestamp"),
660 *[
661 func.coalesce(
662 func.sum(getattr(daily_metrics.c, metric.slug)).over(
663 order_by=timestamp_column
664 )
665 if metric.slug in ["cumulative_costs"]
666 else getattr(daily_metrics.c, metric.slug),
667 0,
668 ).label(metric.slug)
669 for metric in metrics
670 if metric.query == MetricQuery.events
671 ],
672 )
673 .select_from(
674 timestamp_series.join(
675 daily_metrics,
676 onclause=daily_metrics.c.day == timestamp_column,
677 isouter=True,
678 )
679 )
680 .order_by(timestamp_column.asc())
681 )
684def _get_readable_events_statement( 1a
685 auth_subject: AuthSubject[User | Organization],
686 *,
687 organization_id: Sequence[uuid.UUID] | None = None,
688 customer_id: Sequence[uuid.UUID] | None = None,
689) -> Select[tuple[uuid.UUID]]:
690 statement = select(Event.id)
692 if is_user(auth_subject):
693 statement = statement.where(
694 Event.organization_id.in_(
695 select(UserOrganization.organization_id).where(
696 UserOrganization.user_id == auth_subject.subject.id,
697 UserOrganization.deleted_at.is_(None),
698 )
699 )
700 )
701 elif is_organization(auth_subject):
702 statement = statement.where(Event.organization_id == auth_subject.subject.id)
704 if organization_id is not None:
705 statement = statement.where(Event.organization_id.in_(organization_id))
707 if customer_id is not None:
708 statement = statement.join(
709 Customer,
710 onclause=or_(
711 Event.customer_id == Customer.id,
712 and_(
713 Customer.external_id.is_not(None),
714 Event.external_customer_id == Customer.external_id,
715 Event.organization_id == Customer.organization_id,
716 ),
717 ),
718 ).where(Customer.id.in_(customer_id))
720 return statement
723QUERIES: list[QueryCallable] = [ 1a
724 get_orders_metrics_cte,
725 get_active_subscriptions_cte,
726 get_checkouts_cte,
727 get_canceled_subscriptions_cte,
728 get_churned_subscriptions_cte,
729 get_events_metrics_cte,
730]