Coverage for polar/customer_meter/scheduler.py: 0%

68 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import datetime 

2import uuid 

3from operator import or_ 

4 

5import dramatiq 

6import structlog 

7from apscheduler.job import Job 

8from apscheduler.jobstores.base import BaseJobStore 

9from apscheduler.triggers.date import DateTrigger 

10from sqlalchemy import Select, func, select, update 

11from sqlalchemy.orm import Session 

12 

13from polar.config import settings 

14from polar.kit.utils import utc_now 

15from polar.logging import Logger 

16from polar.models import Customer 

17from polar.postgres import create_sync_engine 

18 

19 

20def enqueue_update_customer(customer_id: uuid.UUID) -> None: 

21 actor = dramatiq.get_broker().get_actor("customer_meter.update_customer") 

22 actor.send(customer_id=customer_id) 

23 

24 

25class CustomerMeterJobStore(BaseJobStore): 

26 """ 

27 A custom job store for APScheduler that creates jobs for customers that 

28 have a meters_dirtied_at set for more than a given threshold. 

29 """ 

30 

31 def __init__(self, executor: str = "default") -> None: 

32 self.engine = create_sync_engine("scheduler") 

33 self.executor = executor 

34 self.log: Logger = structlog.get_logger() 

35 

36 def shutdown(self) -> None: 

37 self.engine.dispose() 

38 return super().shutdown() 

39 

40 def lookup_job(self, job_id: str) -> Job | None: 

41 return None 

42 

43 def get_due_jobs(self, now: datetime.datetime) -> list[Job]: 

44 statement = ( 

45 select(Customer) 

46 .where( 

47 Customer.meters_dirtied_at.is_not(None), 

48 or_( 

49 Customer.meters_dirtied_at 

50 < now - settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MIN_THRESHOLD, 

51 Customer.meters_dirtied_at 

52 > func.coalesce(Customer.meters_updated_at, Customer.created_at) 

53 + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MAX_THRESHOLD, 

54 ), 

55 ) 

56 .order_by(Customer.meters_dirtied_at.asc()) 

57 ) 

58 jobs = self._list_jobs_from_statement(statement) 

59 self.log.debug("Due jobs", count=len(jobs)) 

60 return jobs 

61 

62 def get_next_run_time(self) -> datetime.datetime: 

63 statement = ( 

64 select( 

65 func.least( 

66 Customer.meters_dirtied_at 

67 + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MIN_THRESHOLD, 

68 func.coalesce(Customer.meters_updated_at, Customer.created_at) 

69 + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MAX_THRESHOLD, 

70 ) 

71 ) 

72 .where(Customer.meters_dirtied_at.is_not(None)) 

73 .order_by(Customer.meters_dirtied_at.asc()) 

74 .limit(1) 

75 ) 

76 with self.engine.connect() as connection: 

77 result = connection.execute(statement) 

78 next_run_time = result.scalar_one_or_none() or ( 

79 utc_now() + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MIN_THRESHOLD 

80 ) 

81 self.log.debug("Next run time", next_run_time=next_run_time) 

82 return next_run_time 

83 

84 def get_all_jobs(self) -> list[Job]: 

85 now = utc_now() 

86 statement = ( 

87 select(Customer) 

88 .where( 

89 or_( 

90 Customer.meters_dirtied_at 

91 < now - settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MIN_THRESHOLD, 

92 Customer.meters_dirtied_at 

93 > func.coalesce(Customer.meters_updated_at, Customer.created_at) 

94 + settings.CUSTOMER_METER_UPDATE_DEBOUNCE_MAX_THRESHOLD, 

95 ), 

96 ) 

97 .order_by(Customer.meters_dirtied_at.asc()) 

98 ) 

99 jobs = self._list_jobs_from_statement(statement) 

100 self.log.debug("All jobs", count=len(jobs)) 

101 return jobs 

102 

103 def remove_job(self, job_id: str) -> None: 

104 customer_id = job_id.split(":")[-1] 

105 statement = ( 

106 update(Customer) 

107 .where(Customer.id == customer_id) 

108 .values(meters_dirtied_at=None) 

109 ) 

110 with self.engine.begin() as connection: 

111 connection.execute(statement) 

112 

113 def add_job(self, job: Job) -> None: 

114 raise RuntimeError("This job store does not support managing jobs directly.") 

115 

116 def update_job(self, job: Job) -> None: 

117 raise RuntimeError("This job store does not support managing jobs directly.") 

118 

119 def remove_all_jobs(self) -> None: 

120 raise RuntimeError("This job store does not support managing jobs directly.") 

121 

122 def _list_jobs_from_statement( 

123 self, statement: Select[tuple[Customer]] 

124 ) -> list[Job]: 

125 jobs: list[Job] = [] 

126 with Session(self.engine) as session: 

127 results = session.execute( 

128 statement.with_only_columns( 

129 Customer.id, Customer.meters_dirtied_at 

130 ).execution_options(stream_results=True, max_row_buffer=250) 

131 ) 

132 for result in results.yield_per(250): 

133 customer_id, meters_dirtied_at = result._tuple() 

134 trigger = DateTrigger(meters_dirtied_at, datetime.UTC) 

135 job_kwargs = { 

136 **(self._scheduler._job_defaults if self._scheduler else {}), 

137 "trigger": trigger, 

138 "executor": self.executor, 

139 "func": enqueue_update_customer, 

140 "args": (customer_id,), 

141 "kwargs": {}, 

142 "id": f"customers:meter_update:{customer_id}", 

143 "name": None, 

144 "next_run_time": trigger.run_date, 

145 "misfire_grace_time": None, 

146 } 

147 job = Job(self._scheduler, **job_kwargs) 

148 jobs.append(job) 

149 return jobs