Coverage for polar/worker/_redis.py: 45%

25 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1import dramatiq 1ab

2import structlog 1ab

3from dramatiq.asyncio import get_event_loop_thread 1ab

4 

5from polar.logging import Logger 1ab

6from polar.redis import Redis, create_redis 1ab

7 

8log: Logger = structlog.get_logger() 1ab

9 

10 

11_redis: Redis | None = None 1ab

12 

13 

14async def _close_redis() -> None: 1ab

15 global _redis 

16 if _redis is not None: 

17 await _redis.close(True) 

18 log.info("Closed Redis client") 

19 _redis = None 

20 

21 

22class RedisMiddleware(dramatiq.Middleware): 1ab

23 """ 

24 Middleware managing the lifecycle of the Redis connection. 

25 """ 

26 

27 @classmethod 1ab

28 def get(cls) -> Redis: 1ab

29 global _redis 

30 if _redis is None: 

31 raise RuntimeError("Redis not initialized") 

32 return _redis 

33 

34 def before_worker_boot( 1ab

35 self, broker: dramatiq.Broker, worker: dramatiq.Worker 

36 ) -> None: 

37 global _redis 

38 _redis = create_redis("worker") 

39 log.info("Created Redis client") 

40 

41 def after_worker_shutdown( 1ab

42 self, broker: dramatiq.Broker, worker: dramatiq.Worker 

43 ) -> None: 

44 event_loop_thread = get_event_loop_thread() 

45 assert event_loop_thread is not None 

46 event_loop_thread.run_coroutine(_close_redis())