Coverage for polar/customer_meter/scheduler.py: 0%
68 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import datetime
2import uuid
3from operator import or_
5import dramatiq
6import structlog
7from apscheduler.job import Job
8from apscheduler.jobstores.base import BaseJobStore
9from apscheduler.triggers.date import DateTrigger
10from sqlalchemy import Select, func, select, update
11from sqlalchemy.orm import Session
13from polar.config import settings
14from polar.kit.utils import utc_now
15from polar.logging import Logger
16from polar.models import Customer
17from polar.postgres import create_sync_engine
20def enqueue_update_customer(customer_id: uuid.UUID) -> None:
21 actor = dramatiq.get_broker().get_actor("customer_meter.update_customer")
22 actor.send(customer_id=customer_id)
25class CustomerMeterJobStore(BaseJobStore):
26 """
27 A custom job store for APScheduler that creates jobs for customers that
28 have a meters_dirtied_at set for more than a given threshold.
29 """
31 def __init__(self, executor: str = "default") -> None:
32 self.engine = create_sync_engine("scheduler")
33 self.executor = executor
34 self.log: Logger = structlog.get_logger()
36 def shutdown(self) -> None:
37 self.engine.dispose()
38 return super().shutdown()
40 def lookup_job(self, job_id: str) -> Job | None:
41 return None
43 def get_due_jobs(self, now: datetime.datetime) -> list[Job]:
44 statement = (
45 select(Customer)
46 .where(
47 Customer.meters_dirtied_at.is_not(None),
48 or_(
49 Customer.meters_dirtied_at
50 < now - settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MIN_THRESHOLD,
51 Customer.meters_dirtied_at
52 > func.coalesce(Customer.meters_updated_at, Customer.created_at)
53 + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MAX_THRESHOLD,
54 ),
55 )
56 .order_by(Customer.meters_dirtied_at.asc())
57 )
58 jobs = self._list_jobs_from_statement(statement)
59 self.log.debug("Due jobs", count=len(jobs))
60 return jobs
62 def get_next_run_time(self) -> datetime.datetime:
63 statement = (
64 select(
65 func.least(
66 Customer.meters_dirtied_at
67 + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MIN_THRESHOLD,
68 func.coalesce(Customer.meters_updated_at, Customer.created_at)
69 + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MAX_THRESHOLD,
70 )
71 )
72 .where(Customer.meters_dirtied_at.is_not(None))
73 .order_by(Customer.meters_dirtied_at.asc())
74 .limit(1)
75 )
76 with self.engine.connect() as connection:
77 result = connection.execute(statement)
78 next_run_time = result.scalar_one_or_none() or (
79 utc_now() + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MIN_THRESHOLD
80 )
81 self.log.debug("Next run time", next_run_time=next_run_time)
82 return next_run_time
84 def get_all_jobs(self) -> list[Job]:
85 now = utc_now()
86 statement = (
87 select(Customer)
88 .where(
89 or_(
90 Customer.meters_dirtied_at
91 < now - settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MIN_THRESHOLD,
92 Customer.meters_dirtied_at
93 > func.coalesce(Customer.meters_updated_at, Customer.created_at)
94 + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MAX_THRESHOLD,
95 ),
96 )
97 .order_by(Customer.meters_dirtied_at.asc())
98 )
99 jobs = self._list_jobs_from_statement(statement)
100 self.log.debug("All jobs", count=len(jobs))
101 return jobs
103 def remove_job(self, job_id: str) -> None:
104 customer_id = job_id.split(":")[-1]
105 statement = (
106 update(Customer)
107 .where(Customer.id == customer_id)
108 .values(meters_dirtied_at=None)
109 )
110 with self.engine.begin() as connection:
111 connection.execute(statement)
113 def add_job(self, job: Job) -> None:
114 raise RuntimeError("This job store does not support managing jobs directly.")
116 def update_job(self, job: Job) -> None:
117 raise RuntimeError("This job store does not support managing jobs directly.")
119 def remove_all_jobs(self) -> None:
120 raise RuntimeError("This job store does not support managing jobs directly.")
122 def _list_jobs_from_statement(
123 self, statement: Select[tuple[Customer]]
124 ) -> list[Job]:
125 jobs: list[Job] = []
126 with Session(self.engine) as session:
127 results = session.execute(
128 statement.with_only_columns(
129 Customer.id, Customer.meters_dirtied_at
130 ).execution_options(stream_results=True, max_row_buffer=250)
131 )
132 for result in results.yield_per(250):
133 customer_id, meters_dirtied_at = result._tuple()
134 trigger = DateTrigger(meters_dirtied_at, datetime.UTC)
135 job_kwargs = {
136 **(self._scheduler._job_defaults if self._scheduler else {}),
137 "trigger": trigger,
138 "executor": self.executor,
139 "func": enqueue_update_customer,
140 "args": (customer_id,),
141 "kwargs": {},
142 "id": f"customers:meter_update:{customer_id}",
143 "name": None,
144 "next_run_time": trigger.run_date,
145 "misfire_grace_time": None,
146 }
147 job = Job(self._scheduler, **job_kwargs)
148 jobs.append(job)
149 return jobs