Coverage for polar/webhook/tasks.py: 26%

84 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1import base64 1a

2from collections.abc import Mapping 1a

3from ssl import SSLError 1a

4from uuid import UUID 1a

5 

6import httpx 1a

7import structlog 1a

8from apscheduler.triggers.cron import CronTrigger 1a

9from dramatiq import Retry 1a

10from standardwebhooks.webhooks import Webhook as StandardWebhook 1a

11 

12from polar.config import settings 1a

13from polar.kit.db.postgres import AsyncSession 1a

14from polar.kit.utils import utc_now 1a

15from polar.logging import Logger 1a

16from polar.models.webhook_delivery import WebhookDelivery 1a

17from polar.worker import AsyncSessionMaker, TaskPriority, actor, can_retry, enqueue_job 1a

18 

19from .service import webhook as webhook_service 1a

20 

21log: Logger = structlog.get_logger() 1a

22 

23 

24@actor( 1a

25 actor_name="webhook_event.send", 

26 max_retries=settings.WEBHOOK_MAX_RETRIES, 

27 priority=TaskPriority.MEDIUM, 

28) 

29async def webhook_event_send(webhook_event_id: UUID, redeliver: bool = False) -> None: 1a

30 async with AsyncSessionMaker() as session: 

31 return await _webhook_event_send( 

32 session, webhook_event_id=webhook_event_id, redeliver=redeliver 

33 ) 

34 

35 

36async def _webhook_event_send( 1a

37 session: AsyncSession, *, webhook_event_id: UUID, redeliver: bool = False 

38) -> None: 

39 event = await webhook_service.get_event_by_id(session, webhook_event_id) 

40 if not event: 

41 raise Exception(f"webhook event not found id={webhook_event_id}") 

42 

43 bound_log = log.bind( 

44 id=webhook_event_id, 

45 type=event.type, 

46 webhook_endpoint_id=event.webhook_endpoint_id, 

47 ) 

48 

49 if not event.webhook_endpoint.enabled: 

50 bound_log.info("Webhook endpoint is disabled, skipping") 

51 event.skipped = True 

52 session.add(event) 

53 return 

54 

55 if event.payload is None: 

56 bound_log.info("Archived event, skipping") 

57 return 

58 

59 if event.succeeded and not redeliver: 

60 bound_log.info("Event already succeeded, skipping") 

61 return 

62 

63 if not await webhook_service.is_latest_event(session, event): 

64 log.info( 

65 "Earlier events need to be delivered first, retrying later", 

66 id=event.id, 

67 type=event.type, 

68 webhook_endpoint_id=event.webhook_endpoint_id, 

69 ) 

70 raise Retry() 

71 

72 if event.skipped: 

73 event.skipped = False 

74 session.add(event) 

75 

76 ts = utc_now() 

77 

78 b64secret = base64.b64encode(event.webhook_endpoint.secret.encode("utf-8")).decode( 

79 "utf-8" 

80 ) 

81 

82 # Sign the payload 

83 wh = StandardWebhook(b64secret) 

84 signature = wh.sign(str(event.id), ts, event.payload) 

85 

86 headers: Mapping[str, str] = { 

87 "user-agent": "polar.sh webhooks", 

88 "content-type": "application/json", 

89 "webhook-id": str(event.id), 

90 "webhook-timestamp": str(int(ts.timestamp())), 

91 "webhook-signature": signature, 

92 } 

93 

94 delivery = WebhookDelivery( 

95 webhook_event_id=webhook_event_id, webhook_endpoint_id=event.webhook_endpoint_id 

96 ) 

97 

98 async with httpx.AsyncClient() as client: 

99 try: 

100 response = await client.post( 

101 event.webhook_endpoint.url, 

102 content=event.payload, 

103 headers=headers, 

104 timeout=20.0, 

105 ) 

106 delivery.http_code = response.status_code 

107 delivery.response = ( 

108 # Limit to first 2048 characters to avoid bloating the DB 

109 response.text[:2048] if response.text else None 

110 ) 

111 event.last_http_code = response.status_code 

112 response.raise_for_status() 

113 # Error 

114 except (httpx.HTTPError, SSLError) as e: 

115 bound_log.info("An error occurred while sending a webhook", error=e) 

116 delivery.succeeded = False 

117 if delivery.response is None: 

118 delivery.response = str(e) 

119 

120 # Permanent failure 

121 if not can_retry(): 

122 event.succeeded = False 

123 enqueue_job("webhook_event.failed", webhook_event_id=webhook_event_id) 

124 # Retry 

125 else: 

126 raise Retry() from e 

127 # Success 

128 else: 

129 delivery.succeeded = True 

130 event.succeeded = True 

131 enqueue_job("webhook_event.success", webhook_event_id=webhook_event_id) 

132 # Either way, save the delivery 

133 finally: 

134 assert delivery.succeeded is not None 

135 session.add(delivery) 

136 session.add(event) 

137 await session.commit() 

138 

139 

140@actor(actor_name="webhook_event.success", priority=TaskPriority.HIGH) 1a

141async def webhook_event_success(webhook_event_id: UUID) -> None: 1a

142 async with AsyncSessionMaker() as session: 

143 return await webhook_service.on_event_success(session, webhook_event_id) 

144 

145 

146@actor(actor_name="webhook_event.failed", priority=TaskPriority.HIGH) 1a

147async def webhook_event_failed(webhook_event_id: UUID) -> None: 1a

148 async with AsyncSessionMaker() as session: 

149 return await webhook_service.on_event_failed(session, webhook_event_id) 

150 

151 

152@actor( 1a

153 actor_name="webhook_event.archive", 

154 cron_trigger=CronTrigger(hour=0, minute=0), 

155 priority=TaskPriority.LOW, 

156) 

157async def webhook_event_archive() -> None: 1a

158 async with AsyncSessionMaker() as session: 

159 return await webhook_service.archive_events( 

160 session, older_than=utc_now() - settings.WEBHOOK_EVENT_RETENTION_PERIOD 

161 )