Coverage for polar/worker/_health.py: 47%
80 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1import asyncio 1ab
2import contextlib 1ab
3import os 1ab
4from collections.abc import AsyncGenerator, Callable, Mapping 1ab
5from datetime import timedelta 1ab
6from typing import Any 1ab
8import structlog 1ab
9import uvicorn 1ab
10from dramatiq.middleware import Middleware 1ab
11from redis import RedisError 1ab
12from starlette.applications import Starlette 1ab
13from starlette.exceptions import HTTPException 1ab
14from starlette.requests import Request 1ab
15from starlette.responses import JSONResponse 1ab
16from starlette.routing import Route 1ab
18from polar.config import settings 1ab
19from polar.external_event.repository import ExternalEventRepository 1ab
20from polar.kit.db.postgres import AsyncSessionMaker, create_async_sessionmaker 1ab
21from polar.kit.utils import utc_now 1ab
22from polar.logging import Logger 1ab
23from polar.postgres import create_async_engine, create_async_read_engine 1ab
24from polar.redis import Redis, create_redis 1ab
25from polar.webhook.repository import WebhookEventRepository 1ab
27log: Logger = structlog.get_logger() 1ab
29HTTP_HOST = os.getenv("dramatiq_prom_host", "0.0.0.0") 1ab
30HTTP_PORT = int(os.getenv("dramatiq_prom_port", "9191")) 1ab
33class HealthMiddleware(Middleware): 1ab
34 @property 1ab
35 def forks(self) -> list[Callable[[], int]]: 1ab
36 return [_run_exposition_server]
39async def health(request: Request) -> JSONResponse: 1ab
40 try:
41 redis: Redis = request.state.redis
42 await redis.ping()
43 except RedisError as e:
44 raise HTTPException(status_code=503, detail="Redis is not available") from e
46 return JSONResponse({"status": "ok"})
49UNDELIVERED_WEBHOOKS_MINIMUM_AGE = timedelta(minutes=5) 1ab
50UNDELIVERED_WEBHOOKS_ALERT_THRESHOLD = 10 1ab
52UNHANDLED_EXTERNAL_EVENTS_MINIMUM_AGE = timedelta(minutes=5) 1ab
53UNHANDLED_EXTERNAL_EVENTS_ALERT_THRESHOLD = 10 1ab
56async def webhooks(request: Request) -> JSONResponse: 1ab
57 async_sessionmaker: AsyncSessionMaker = request.state.async_sessionmaker
58 async with async_sessionmaker() as session:
59 repository = WebhookEventRepository(session)
60 undelivered_webhooks = await repository.get_all_undelivered(
61 older_than=utc_now() - UNDELIVERED_WEBHOOKS_MINIMUM_AGE
62 )
63 if len(undelivered_webhooks) > UNDELIVERED_WEBHOOKS_ALERT_THRESHOLD:
64 return JSONResponse(
65 {
66 "status": "error",
67 "undelivered_webhooks": len(undelivered_webhooks),
68 },
69 status_code=503,
70 )
72 return JSONResponse({"status": "ok"})
75async def external_events(request: Request) -> JSONResponse: 1ab
76 async_sessionmaker: AsyncSessionMaker = request.state.async_sessionmaker
77 async with async_sessionmaker() as session:
78 repository = ExternalEventRepository(session)
79 unhandled_events = await repository.get_all_unhandled(
80 older_than=utc_now() - UNHANDLED_EXTERNAL_EVENTS_MINIMUM_AGE
81 )
82 if len(unhandled_events) > UNHANDLED_EXTERNAL_EVENTS_ALERT_THRESHOLD:
83 return JSONResponse(
84 {
85 "status": "error",
86 "unhandled_external_events": len(unhandled_events),
87 },
88 status_code=503,
89 )
91 return JSONResponse({"status": "ok"})
94@contextlib.asynccontextmanager 1ab
95async def lifespan(app: Starlette) -> AsyncGenerator[Mapping[str, Any]]: 1ab
96 if settings.is_read_replica_configured():
97 async_engine = create_async_read_engine("worker")
98 else:
99 async_engine = create_async_engine("worker")
100 async_sessionmaker = create_async_sessionmaker(async_engine)
101 redis = await create_redis("worker")
102 yield {
103 "redis": redis,
104 "async_sessionmaker": async_sessionmaker,
105 }
106 await redis.close()
107 await async_engine.dispose()
110def create_app() -> Starlette: 1ab
111 routes = [
112 Route("/", health, methods=["GET"]),
113 Route("/webhooks", webhooks, methods=["GET"]),
114 Route("/unhandled-external-events", external_events, methods=["GET"]),
115 ]
116 return Starlette(routes=routes, lifespan=lifespan)
119def _run_exposition_server() -> int: 1ab
120 log.debug("Starting exposition server...")
121 app = create_app()
122 config = uvicorn.Config(
123 app, host=HTTP_HOST, port=HTTP_PORT, log_level="error", access_log=False
124 )
125 server = uvicorn.Server(config)
126 try:
127 server.run()
128 except asyncio.CancelledError:
129 pass
131 return 0