Coverage for polar/worker/scheduler.py: 0%
34 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import logfire
2from apscheduler.jobstores.memory import MemoryJobStore
3from apscheduler.schedulers.base import STATE_STOPPED
4from apscheduler.schedulers.blocking import BlockingScheduler
6from polar import tasks
7from polar.customer_meter.scheduler import CustomerMeterJobStore
8from polar.logfire import configure_logfire
9from polar.logging import configure as configure_logging
10from polar.sentry import configure_sentry
11from polar.subscription.scheduler import SubscriptionJobStore
12from polar.worker import scheduler_middleware
14configure_sentry()
15configure_logfire("worker")
16configure_logging(logfire=True)
19class LogfireBlockingScheduler(BlockingScheduler):
20 def _main_loop(self) -> None:
21 wait_seconds = 1
22 while self.state != STATE_STOPPED:
23 with logfire.span("Scheduler wakeup"):
24 self._event.wait(wait_seconds)
25 self._event.clear()
26 wait_seconds = self._process_jobs()
29def start() -> None:
30 scheduler = LogfireBlockingScheduler()
32 scheduler.add_jobstore(MemoryJobStore(), "memory")
33 scheduler.add_jobstore(SubscriptionJobStore(), "subscription")
34 scheduler.add_jobstore(CustomerMeterJobStore(), "customer_meter")
36 for func, cron_trigger in scheduler_middleware.cron_triggers:
37 scheduler.add_job(func, cron_trigger, jobstore="memory")
39 try:
40 scheduler.start()
41 except KeyboardInterrupt:
42 scheduler.shutdown()
45__all__ = ["tasks", "start"]