Coverage for polar/subscription/scheduler.py: 0%
65 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import datetime
2import uuid
4import dramatiq
5import structlog
6from apscheduler.job import Job
7from apscheduler.jobstores.base import BaseJobStore
8from apscheduler.triggers.date import DateTrigger
9from sqlalchemy import Select, select, update
10from sqlalchemy.orm import Session
12from polar.kit.utils import utc_now
13from polar.logging import Logger
14from polar.models import Subscription
15from polar.postgres import create_sync_engine
18def enqueue_subscription_cycle(subscription_id: uuid.UUID) -> None:
19 actor = dramatiq.get_broker().get_actor("subscription.cycle")
20 actor.send(subscription_id=subscription_id)
23class SubscriptionJobStore(BaseJobStore):
24 """
25 A custom job store for APScheduler that uses our subscription data to trigger
26 cycle jobs based on subscription dates
27 """
29 def __init__(self, executor: str = "default") -> None:
30 self.engine = create_sync_engine("scheduler")
31 self.executor = executor
32 self.log: Logger = structlog.get_logger()
34 def shutdown(self) -> None:
35 self.engine.dispose()
36 return super().shutdown()
38 def lookup_job(self, job_id: str) -> Job | None:
39 return None
41 def get_due_jobs(self, now: datetime.datetime) -> list[Job]:
42 statement = (
43 select(Subscription)
44 .where(
45 Subscription.scheduler_locked_at.is_(None),
46 Subscription.stripe_subscription_id.is_(None),
47 Subscription.active.is_(True),
48 Subscription.current_period_end.is_not(None),
49 Subscription.current_period_end <= now,
50 )
51 .order_by(Subscription.current_period_end.asc())
52 )
53 jobs = self._list_jobs_from_statement(statement)
54 self.log.debug("Due jobs", count=len(jobs))
55 return jobs
57 def get_next_run_time(self) -> datetime.datetime | None:
58 statement = (
59 select(Subscription.current_period_end)
60 .where(
61 Subscription.scheduler_locked_at.is_(None),
62 Subscription.stripe_subscription_id.is_(None),
63 Subscription.active.is_(True),
64 Subscription.current_period_end.is_not(None),
65 )
66 .order_by(Subscription.current_period_end.asc())
67 .limit(1)
68 )
69 with self.engine.connect() as connection:
70 result = connection.execute(statement)
71 next_run_time = result.scalar_one_or_none()
72 self.log.debug("Next run time", next_run_time=next_run_time)
73 return next_run_time
75 def get_all_jobs(self) -> list[Job]:
76 statement = (
77 select(Subscription)
78 .where(
79 Subscription.scheduler_locked_at.is_(None),
80 Subscription.stripe_subscription_id.is_(None),
81 Subscription.active.is_(True),
82 Subscription.current_period_end.is_not(None),
83 )
84 .order_by(Subscription.current_period_end.asc())
85 )
86 jobs = self._list_jobs_from_statement(statement)
87 self.log.debug("All jobs", count=len(jobs))
88 return jobs
90 def remove_job(self, job_id: str) -> None:
91 subscription_id = job_id.split(":")[-1]
92 statement = (
93 update(Subscription)
94 .where(Subscription.id == subscription_id)
95 .values(scheduler_locked_at=utc_now())
96 )
97 with self.engine.begin() as connection:
98 connection.execute(statement)
100 def add_job(self, job: Job) -> None:
101 raise RuntimeError("This job store does not support managing jobs directly.")
103 def update_job(self, job: Job) -> None:
104 raise RuntimeError("This job store does not support managing jobs directly.")
106 def remove_all_jobs(self) -> None:
107 raise RuntimeError("This job store does not support managing jobs directly.")
109 def _list_jobs_from_statement(
110 self, statement: Select[tuple[Subscription]]
111 ) -> list[Job]:
112 jobs: list[Job] = []
113 with Session(self.engine) as session:
114 results = session.execute(
115 statement.with_only_columns(
116 Subscription.id, Subscription.current_period_end
117 ).execution_options(stream_results=True, max_row_buffer=250)
118 )
119 for result in results.yield_per(250):
120 subscription_id, current_period_end = result._tuple()
121 trigger = DateTrigger(current_period_end, datetime.UTC)
122 job_kwargs = {
123 **(self._scheduler._job_defaults if self._scheduler else {}),
124 "trigger": trigger,
125 "executor": self.executor,
126 "func": enqueue_subscription_cycle,
127 "args": (subscription_id,),
128 "kwargs": {},
129 "id": f"subscriptions:cycle:{subscription_id}",
130 "name": None,
131 "next_run_time": trigger.run_date,
132 "misfire_grace_time": None,
133 }
134 job = Job(self._scheduler, **job_kwargs)
135 jobs.append(job)
136 return jobs