Coverage for polar/worker/_redis.py: 45%
25 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import dramatiq 1ab
2import structlog 1ab
3from dramatiq.asyncio import get_event_loop_thread 1ab
5from polar.logging import Logger 1ab
6from polar.redis import Redis, create_redis 1ab
8log: Logger = structlog.get_logger() 1ab
11_redis: Redis | None = None 1ab
14async def _close_redis() -> None: 1ab
15 global _redis
16 if _redis is not None:
17 await _redis.close(True)
18 log.info("Closed Redis client")
19 _redis = None
22class RedisMiddleware(dramatiq.Middleware): 1ab
23 """
24 Middleware managing the lifecycle of the Redis connection.
25 """
27 @classmethod 1ab
28 def get(cls) -> Redis: 1ab
29 global _redis
30 if _redis is None:
31 raise RuntimeError("Redis not initialized")
32 return _redis
34 def before_worker_boot( 1ab
35 self, broker: dramatiq.Broker, worker: dramatiq.Worker
36 ) -> None:
37 global _redis
38 _redis = create_redis("worker")
39 log.info("Created Redis client")
41 def after_worker_shutdown( 1ab
42 self, broker: dramatiq.Broker, worker: dramatiq.Worker
43 ) -> None:
44 event_loop_thread = get_event_loop_thread()
45 assert event_loop_thread is not None
46 event_loop_thread.run_coroutine(_close_redis())