Coverage for polar/subscription/scheduler.py: 0%

65 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import datetime 

2import uuid 

3 

4import dramatiq 

5import structlog 

6from apscheduler.job import Job 

7from apscheduler.jobstores.base import BaseJobStore 

8from apscheduler.triggers.date import DateTrigger 

9from sqlalchemy import Select, select, update 

10from sqlalchemy.orm import Session 

11 

12from polar.kit.utils import utc_now 

13from polar.logging import Logger 

14from polar.models import Subscription 

15from polar.postgres import create_sync_engine 

16 

17 

18def enqueue_subscription_cycle(subscription_id: uuid.UUID) -> None: 

19 actor = dramatiq.get_broker().get_actor("subscription.cycle") 

20 actor.send(subscription_id=subscription_id) 

21 

22 

23class SubscriptionJobStore(BaseJobStore): 

24 """ 

25 A custom job store for APScheduler that uses our subscription data to trigger 

26 cycle jobs based on subscription dates 

27 """ 

28 

29 def __init__(self, executor: str = "default") -> None: 

30 self.engine = create_sync_engine("scheduler") 

31 self.executor = executor 

32 self.log: Logger = structlog.get_logger() 

33 

34 def shutdown(self) -> None: 

35 self.engine.dispose() 

36 return super().shutdown() 

37 

38 def lookup_job(self, job_id: str) -> Job | None: 

39 return None 

40 

41 def get_due_jobs(self, now: datetime.datetime) -> list[Job]: 

42 statement = ( 

43 select(Subscription) 

44 .where( 

45 Subscription.scheduler_locked_at.is_(None), 

46 Subscription.stripe_subscription_id.is_(None), 

47 Subscription.active.is_(True), 

48 Subscription.current_period_end.is_not(None), 

49 Subscription.current_period_end <= now, 

50 ) 

51 .order_by(Subscription.current_period_end.asc()) 

52 ) 

53 jobs = self._list_jobs_from_statement(statement) 

54 self.log.debug("Due jobs", count=len(jobs)) 

55 return jobs 

56 

57 def get_next_run_time(self) -> datetime.datetime | None: 

58 statement = ( 

59 select(Subscription.current_period_end) 

60 .where( 

61 Subscription.scheduler_locked_at.is_(None), 

62 Subscription.stripe_subscription_id.is_(None), 

63 Subscription.active.is_(True), 

64 Subscription.current_period_end.is_not(None), 

65 ) 

66 .order_by(Subscription.current_period_end.asc()) 

67 .limit(1) 

68 ) 

69 with self.engine.connect() as connection: 

70 result = connection.execute(statement) 

71 next_run_time = result.scalar_one_or_none() 

72 self.log.debug("Next run time", next_run_time=next_run_time) 

73 return next_run_time 

74 

75 def get_all_jobs(self) -> list[Job]: 

76 statement = ( 

77 select(Subscription) 

78 .where( 

79 Subscription.scheduler_locked_at.is_(None), 

80 Subscription.stripe_subscription_id.is_(None), 

81 Subscription.active.is_(True), 

82 Subscription.current_period_end.is_not(None), 

83 ) 

84 .order_by(Subscription.current_period_end.asc()) 

85 ) 

86 jobs = self._list_jobs_from_statement(statement) 

87 self.log.debug("All jobs", count=len(jobs)) 

88 return jobs 

89 

90 def remove_job(self, job_id: str) -> None: 

91 subscription_id = job_id.split(":")[-1] 

92 statement = ( 

93 update(Subscription) 

94 .where(Subscription.id == subscription_id) 

95 .values(scheduler_locked_at=utc_now()) 

96 ) 

97 with self.engine.begin() as connection: 

98 connection.execute(statement) 

99 

100 def add_job(self, job: Job) -> None: 

101 raise RuntimeError("This job store does not support managing jobs directly.") 

102 

103 def update_job(self, job: Job) -> None: 

104 raise RuntimeError("This job store does not support managing jobs directly.") 

105 

106 def remove_all_jobs(self) -> None: 

107 raise RuntimeError("This job store does not support managing jobs directly.") 

108 

109 def _list_jobs_from_statement( 

110 self, statement: Select[tuple[Subscription]] 

111 ) -> list[Job]: 

112 jobs: list[Job] = [] 

113 with Session(self.engine) as session: 

114 results = session.execute( 

115 statement.with_only_columns( 

116 Subscription.id, Subscription.current_period_end 

117 ).execution_options(stream_results=True, max_row_buffer=250) 

118 ) 

119 for result in results.yield_per(250): 

120 subscription_id, current_period_end = result._tuple() 

121 trigger = DateTrigger(current_period_end, datetime.UTC) 

122 job_kwargs = { 

123 **(self._scheduler._job_defaults if self._scheduler else {}), 

124 "trigger": trigger, 

125 "executor": self.executor, 

126 "func": enqueue_subscription_cycle, 

127 "args": (subscription_id,), 

128 "kwargs": {}, 

129 "id": f"subscriptions:cycle:{subscription_id}", 

130 "name": None, 

131 "next_run_time": trigger.run_date, 

132 "misfire_grace_time": None, 

133 } 

134 job = Job(self._scheduler, **job_kwargs) 

135 jobs.append(job) 

136 return jobs