Coverage for polar/worker/scheduler.py: 0%

34 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1import logfire 

2from apscheduler.jobstores.memory import MemoryJobStore 

3from apscheduler.schedulers.base import STATE_STOPPED 

4from apscheduler.schedulers.blocking import BlockingScheduler 

5 

6from polar import tasks 

7from polar.customer_meter.scheduler import CustomerMeterJobStore 

8from polar.logfire import configure_logfire 

9from polar.logging import configure as configure_logging 

10from polar.sentry import configure_sentry 

11from polar.subscription.scheduler import SubscriptionJobStore 

12from polar.worker import scheduler_middleware 

13 

14configure_sentry() 

15configure_logfire("worker") 

16configure_logging(logfire=True) 

17 

18 

19class LogfireBlockingScheduler(BlockingScheduler): 

20 def _main_loop(self) -> None: 

21 wait_seconds = 1 

22 while self.state != STATE_STOPPED: 

23 with logfire.span("Scheduler wakeup"): 

24 self._event.wait(wait_seconds) 

25 self._event.clear() 

26 wait_seconds = self._process_jobs() 

27 

28 

29def start() -> None: 

30 scheduler = LogfireBlockingScheduler() 

31 

32 scheduler.add_jobstore(MemoryJobStore(), "memory") 

33 scheduler.add_jobstore(SubscriptionJobStore(), "subscription") 

34 scheduler.add_jobstore(CustomerMeterJobStore(), "customer_meter") 

35 

36 for func, cron_trigger in scheduler_middleware.cron_triggers: 

37 scheduler.add_job(func, cron_trigger, jobstore="memory") 

38 

39 try: 

40 scheduler.start() 

41 except KeyboardInterrupt: 

42 scheduler.shutdown() 

43 

44 

45__all__ = ["tasks", "start"]