Coverage for polar/worker/_health.py: 47%

80 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1import asyncio 1ab

2import contextlib 1ab

3import os 1ab

4from collections.abc import AsyncGenerator, Callable, Mapping 1ab

5from datetime import timedelta 1ab

6from typing import Any 1ab

7 

8import structlog 1ab

9import uvicorn 1ab

10from dramatiq.middleware import Middleware 1ab

11from redis import RedisError 1ab

12from starlette.applications import Starlette 1ab

13from starlette.exceptions import HTTPException 1ab

14from starlette.requests import Request 1ab

15from starlette.responses import JSONResponse 1ab

16from starlette.routing import Route 1ab

17 

18from polar.config import settings 1ab

19from polar.external_event.repository import ExternalEventRepository 1ab

20from polar.kit.db.postgres import AsyncSessionMaker, create_async_sessionmaker 1ab

21from polar.kit.utils import utc_now 1ab

22from polar.logging import Logger 1ab

23from polar.postgres import create_async_engine, create_async_read_engine 1ab

24from polar.redis import Redis, create_redis 1ab

25from polar.webhook.repository import WebhookEventRepository 1ab

26 

27log: Logger = structlog.get_logger() 1ab

28 

29HTTP_HOST = os.getenv("dramatiq_prom_host", "0.0.0.0") 1ab

30HTTP_PORT = int(os.getenv("dramatiq_prom_port", "9191")) 1ab

31 

32 

33class HealthMiddleware(Middleware): 1ab

34 @property 1ab

35 def forks(self) -> list[Callable[[], int]]: 1ab

36 return [_run_exposition_server] 

37 

38 

39async def health(request: Request) -> JSONResponse: 1ab

40 try: 

41 redis: Redis = request.state.redis 

42 await redis.ping() 

43 except RedisError as e: 

44 raise HTTPException(status_code=503, detail="Redis is not available") from e 

45 

46 return JSONResponse({"status": "ok"}) 

47 

48 

49UNDELIVERED_WEBHOOKS_MINIMUM_AGE = timedelta(minutes=5) 1ab

50UNDELIVERED_WEBHOOKS_ALERT_THRESHOLD = 10 1ab

51 

52UNHANDLED_EXTERNAL_EVENTS_MINIMUM_AGE = timedelta(minutes=5) 1ab

53UNHANDLED_EXTERNAL_EVENTS_ALERT_THRESHOLD = 10 1ab

54 

55 

56async def webhooks(request: Request) -> JSONResponse: 1ab

57 async_sessionmaker: AsyncSessionMaker = request.state.async_sessionmaker 

58 async with async_sessionmaker() as session: 

59 repository = WebhookEventRepository(session) 

60 undelivered_webhooks = await repository.get_all_undelivered( 

61 older_than=utc_now() - UNDELIVERED_WEBHOOKS_MINIMUM_AGE 

62 ) 

63 if len(undelivered_webhooks) > UNDELIVERED_WEBHOOKS_ALERT_THRESHOLD: 

64 return JSONResponse( 

65 { 

66 "status": "error", 

67 "undelivered_webhooks": len(undelivered_webhooks), 

68 }, 

69 status_code=503, 

70 ) 

71 

72 return JSONResponse({"status": "ok"}) 

73 

74 

75async def external_events(request: Request) -> JSONResponse: 1ab

76 async_sessionmaker: AsyncSessionMaker = request.state.async_sessionmaker 

77 async with async_sessionmaker() as session: 

78 repository = ExternalEventRepository(session) 

79 unhandled_events = await repository.get_all_unhandled( 

80 older_than=utc_now() - UNHANDLED_EXTERNAL_EVENTS_MINIMUM_AGE 

81 ) 

82 if len(unhandled_events) > UNHANDLED_EXTERNAL_EVENTS_ALERT_THRESHOLD: 

83 return JSONResponse( 

84 { 

85 "status": "error", 

86 "unhandled_external_events": len(unhandled_events), 

87 }, 

88 status_code=503, 

89 ) 

90 

91 return JSONResponse({"status": "ok"}) 

92 

93 

94@contextlib.asynccontextmanager 1ab

95async def lifespan(app: Starlette) -> AsyncGenerator[Mapping[str, Any]]: 1ab

96 if settings.is_read_replica_configured(): 

97 async_engine = create_async_read_engine("worker") 

98 else: 

99 async_engine = create_async_engine("worker") 

100 async_sessionmaker = create_async_sessionmaker(async_engine) 

101 redis = await create_redis("worker") 

102 yield { 

103 "redis": redis, 

104 "async_sessionmaker": async_sessionmaker, 

105 } 

106 await redis.close() 

107 await async_engine.dispose() 

108 

109 

110def create_app() -> Starlette: 1ab

111 routes = [ 

112 Route("/", health, methods=["GET"]), 

113 Route("/webhooks", webhooks, methods=["GET"]), 

114 Route("/unhandled-external-events", external_events, methods=["GET"]), 

115 ] 

116 return Starlette(routes=routes, lifespan=lifespan) 

117 

118 

119def _run_exposition_server() -> int: 1ab

120 log.debug("Starting exposition server...") 

121 app = create_app() 

122 config = uvicorn.Config( 

123 app, host=HTTP_HOST, port=HTTP_PORT, log_level="error", access_log=False 

124 ) 

125 server = uvicorn.Server(config) 

126 try: 

127 server.run() 

128 except asyncio.CancelledError: 

129 pass 

130 

131 return 0