Coverage for polar/webhook/service.py: 41%

251 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1import datetime 1a

2import json 1a

3from collections.abc import Sequence 1a

4from typing import Literal, cast, overload 1a

5from uuid import UUID 1a

6 

7import structlog 1a

8from sqlalchemy import CursorResult, Select, desc, func, select, text, update 1a

9from sqlalchemy.orm import contains_eager, joinedload 1a

10 

11from polar.auth.models import AuthSubject, is_organization, is_user 1a

12from polar.checkout.eventstream import CheckoutEvent, publish_checkout_event 1a

13from polar.checkout.repository import CheckoutRepository 1a

14from polar.config import settings 1a

15from polar.customer.schemas.state import CustomerState 1a

16from polar.email.react import render_email_template 1a

17from polar.email.schemas import EmailAdapter 1a

18from polar.email.sender import enqueue_email 1a

19from polar.exceptions import PolarError, ResourceNotFound 1a

20from polar.integrations.loops.service import loops as loops_service 1a

21from polar.kit.crypto import generate_token 1a

22from polar.kit.db.postgres import AsyncSession 1a

23from polar.kit.pagination import PaginationParams, paginate 1a

24from polar.kit.utils import utc_now 1a

25from polar.logging import Logger 1a

26from polar.models import ( 1a

27 Benefit, 

28 BenefitGrant, 

29 Checkout, 

30 Customer, 

31 CustomerSeat, 

32 Order, 

33 Organization, 

34 Product, 

35 Refund, 

36 Subscription, 

37 User, 

38 UserOrganization, 

39 WebhookDelivery, 

40 WebhookEvent, 

41) 

42from polar.models.webhook_endpoint import ( 1a

43 WebhookEndpoint, 

44 WebhookEventType, 

45 WebhookFormat, 

46) 

47from polar.oauth2.constants import WEBHOOK_SECRET_PREFIX 1a

48from polar.organization.resolver import get_payload_organization 1a

49from polar.user_organization.service import ( 1a

50 user_organization as user_organization_service, 

51) 

52from polar.webhook.repository import ( 1a

53 WebhookEndpointRepository, 

54 WebhookEventRepository, 

55) 

56from polar.webhook.schemas import ( 1a

57 WebhookEndpointCreate, 

58 WebhookEndpointUpdate, 

59) 

60from polar.worker import enqueue_job 1a

61 

62from .webhooks import SkipEvent, UnsupportedTarget, WebhookPayloadTypeAdapter 1a

63 

64log: Logger = structlog.get_logger() 1a

65 

66 

67class WebhookError(PolarError): ... 1a

68 

69 

70class EventDoesNotExist(WebhookError): 1a

71 def __init__(self, event_id: UUID) -> None: 1a

72 self.event_id = event_id 

73 message = f"Event with ID {event_id} does not exist." 

74 super().__init__(message) 

75 

76 

77class EventNotSuccessul(WebhookError): 1a

78 def __init__(self, event_id: UUID) -> None: 1a

79 self.event_id = event_id 

80 message = f"Event with ID {event_id} is not successful." 

81 super().__init__(message) 

82 

83 

84class WebhookService: 1a

85 async def list_endpoints( 1a

86 self, 

87 session: AsyncSession, 

88 auth_subject: AuthSubject[User | Organization], 

89 *, 

90 organization_id: Sequence[UUID] | None, 

91 pagination: PaginationParams, 

92 ) -> tuple[Sequence[WebhookEndpoint], int]: 

93 statement = self._get_readable_endpoints_statement(auth_subject) 

94 

95 if organization_id is not None: 

96 statement = statement.where( 

97 WebhookEndpoint.organization_id.in_(organization_id) 

98 ) 

99 

100 statement = statement.order_by(WebhookEndpoint.created_at.desc()) 

101 

102 results, count = await paginate(session, statement, pagination=pagination) 

103 

104 return results, count 

105 

106 async def get_endpoint( 1a

107 self, 

108 session: AsyncSession, 

109 auth_subject: AuthSubject[User | Organization], 

110 id: UUID, 

111 ) -> WebhookEndpoint | None: 

112 statement = self._get_readable_endpoints_statement(auth_subject).where( 

113 WebhookEndpoint.id == id 

114 ) 

115 res = await session.execute(statement) 

116 return res.scalars().unique().one_or_none() 

117 

118 async def create_endpoint( 1a

119 self, 

120 session: AsyncSession, 

121 auth_subject: AuthSubject[User | Organization], 

122 create_schema: WebhookEndpointCreate, 

123 ) -> WebhookEndpoint: 

124 organization = await get_payload_organization( 

125 session, auth_subject, create_schema 

126 ) 

127 if create_schema.secret is not None: 

128 secret = create_schema.secret 

129 else: 

130 secret = generate_token(prefix=WEBHOOK_SECRET_PREFIX) 

131 endpoint = WebhookEndpoint( 

132 **create_schema.model_dump(exclude={"secret"}, by_alias=True), 

133 secret=secret, 

134 organization=organization, 

135 ) 

136 session.add(endpoint) 

137 

138 # Store it in Loops in case we need to announce technical things regarding webhooks 

139 user_organizations = await user_organization_service.list_by_org( 

140 session, organization.id 

141 ) 

142 for user_organization in user_organizations: 

143 await loops_service.user_update( 

144 session, user_organization.user, webhooksCreated=True 

145 ) 

146 

147 return endpoint 

148 

149 async def update_endpoint( 1a

150 self, 

151 session: AsyncSession, 

152 *, 

153 endpoint: WebhookEndpoint, 

154 update_schema: WebhookEndpointUpdate, 

155 ) -> WebhookEndpoint: 

156 for attr, value in update_schema.model_dump( 

157 exclude_unset=True, exclude_none=True 

158 ).items(): 

159 setattr(endpoint, attr, value) 

160 session.add(endpoint) 

161 return endpoint 

162 

163 async def reset_endpoint_secret( 1a

164 self, session: AsyncSession, *, endpoint: WebhookEndpoint 

165 ) -> WebhookEndpoint: 

166 endpoint.secret = generate_token(prefix=WEBHOOK_SECRET_PREFIX) 

167 session.add(endpoint) 

168 return endpoint 

169 

170 async def delete_endpoint( 1a

171 self, 

172 session: AsyncSession, 

173 endpoint: WebhookEndpoint, 

174 ) -> WebhookEndpoint: 

175 endpoint.deleted_at = utc_now() 

176 session.add(endpoint) 

177 return endpoint 

178 

179 async def list_deliveries( 1a

180 self, 

181 session: AsyncSession, 

182 auth_subject: AuthSubject[User | Organization], 

183 *, 

184 endpoint_id: Sequence[UUID] | None = None, 

185 start_timestamp: datetime.datetime | None = None, 

186 end_timestamp: datetime.datetime | None = None, 

187 pagination: PaginationParams, 

188 ) -> tuple[Sequence[WebhookDelivery], int]: 

189 readable_endpoints_statement = self._get_readable_endpoints_statement( 

190 auth_subject 

191 ) 

192 statement = ( 

193 select(WebhookDelivery) 

194 .join(WebhookEndpoint) 

195 .where( 

196 WebhookDelivery.deleted_at.is_(None), 

197 WebhookEndpoint.id.in_( 

198 readable_endpoints_statement.with_only_columns(WebhookEndpoint.id) 

199 ), 

200 ) 

201 .options(joinedload(WebhookDelivery.webhook_event)) 

202 .order_by(desc(WebhookDelivery.created_at)) 

203 ) 

204 

205 if endpoint_id is not None: 

206 statement = statement.where( 

207 WebhookDelivery.webhook_endpoint_id.in_(endpoint_id) 

208 ) 

209 

210 if start_timestamp is not None: 

211 statement = statement.where(WebhookDelivery.created_at > start_timestamp) 

212 

213 if end_timestamp is not None: 

214 statement = statement.where(WebhookDelivery.created_at < end_timestamp) 

215 

216 return await paginate(session, statement, pagination=pagination) 

217 

218 async def redeliver_event( 1a

219 self, 

220 session: AsyncSession, 

221 auth_subject: AuthSubject[User | Organization], 

222 id: UUID, 

223 ) -> None: 

224 readable_endpoints_statement = self._get_readable_endpoints_statement( 

225 auth_subject 

226 ) 

227 statement = ( 

228 select(WebhookEvent) 

229 .join(WebhookEndpoint) 

230 .where( 

231 WebhookEvent.id == id, 

232 WebhookEvent.deleted_at.is_(None), 

233 WebhookEvent.is_archived.is_(False), 

234 WebhookEndpoint.id.in_( 

235 readable_endpoints_statement.with_only_columns(WebhookEndpoint.id) 

236 ), 

237 ) 

238 .options(contains_eager(WebhookEvent.webhook_endpoint)) 

239 ) 

240 

241 res = await session.execute(statement) 

242 event = res.scalars().unique().one_or_none() 

243 if event is None: 

244 raise ResourceNotFound() 

245 

246 enqueue_job("webhook_event.send", webhook_event_id=event.id, redeliver=True) 

247 

248 async def on_event_success(self, session: AsyncSession, id: UUID) -> None: 1a

249 """ 

250 Helper to hook into the event success event. 

251 

252 Useful to trigger logic that might wait for an event to be delivered. 

253 """ 

254 event = await self.get_event_by_id(session, id) 

255 if event is None: 

256 raise EventDoesNotExist(id) 

257 

258 if not event.succeeded: 

259 raise EventNotSuccessul(id) 

260 

261 if event.webhook_endpoint.format != WebhookFormat.raw: 

262 return 

263 

264 if event.payload is None: 

265 return 

266 

267 if event.type == WebhookEventType.checkout_updated: 

268 checkout_repository = CheckoutRepository.from_session(session) 

269 payload = json.loads(event.payload) 

270 checkout = await checkout_repository.get_by_id(UUID(payload["data"]["id"])) 

271 assert checkout is not None 

272 await publish_checkout_event( 

273 checkout.client_secret, 

274 CheckoutEvent.webhook_event_delivered, 

275 {"status": checkout.status}, 

276 ) 

277 

278 async def on_event_failed(self, session: AsyncSession, id: UUID) -> None: 1a

279 """ 

280 Helper to hook into the event failed event. 

281 

282 Detects consecutive failures and disables the endpoint if threshold is exceeded. 

283 """ 

284 event = await self.get_event_by_id(session, id) 

285 if event is None: 

286 raise EventDoesNotExist(id) 

287 

288 if event.succeeded is not False: 

289 return 

290 

291 endpoint = event.webhook_endpoint 

292 if not endpoint.enabled: 

293 return 

294 

295 # Get recent events to count the streak 

296 webhook_event_repository = WebhookEventRepository.from_session(session) 

297 recent_events = await webhook_event_repository.get_recent_by_endpoint( 

298 endpoint.id, limit=settings.WEBHOOK_FAILURE_THRESHOLD 

299 ) 

300 

301 # Check if all recent events are failures 

302 if len(recent_events) >= settings.WEBHOOK_FAILURE_THRESHOLD and all( 

303 event.succeeded is False for event in recent_events 

304 ): 

305 log.warning( 

306 "Disabling webhook endpoint due to consecutive failures", 

307 webhook_endpoint_id=endpoint.id, 

308 failure_count=len(recent_events), 

309 ) 

310 webhook_endpoint_repository = WebhookEndpointRepository.from_session( 

311 session 

312 ) 

313 await webhook_endpoint_repository.update( 

314 endpoint, update_dict={"enabled": False}, flush=True 

315 ) 

316 

317 # Send email to all organization members 

318 organization_id = endpoint.organization_id 

319 user_organizations = await user_organization_service.list_by_org( 

320 session, organization_id 

321 ) 

322 

323 if user_organizations: 

324 # User and Organization are eagerly loaded 

325 organization = user_organizations[0].organization 

326 dashboard_url = f"{settings.FRONTEND_BASE_URL}/dashboard/{organization.slug}/settings/webhooks" 

327 

328 for user_org in user_organizations: 

329 user = user_org.user 

330 email = EmailAdapter.validate_python( 

331 { 

332 "template": "webhook_endpoint_disabled", 

333 "props": { 

334 "email": user.email, 

335 "organization": organization, 

336 "webhook_endpoint_url": endpoint.url, 

337 "dashboard_url": dashboard_url, 

338 }, 

339 } 

340 ) 

341 

342 body = render_email_template(email) 

343 

344 enqueue_email( 

345 to_email_addr=user.email, 

346 subject=f"Webhook endpoint disabled for {organization.name}", 

347 html_content=body, 

348 ) 

349 

350 async def get_event_by_id( 1a

351 self, session: AsyncSession, id: UUID 

352 ) -> WebhookEvent | None: 

353 statement = ( 

354 select(WebhookEvent) 

355 .where(WebhookEvent.deleted_at.is_(None), WebhookEvent.id == id) 

356 .options(joinedload(WebhookEvent.webhook_endpoint)) 

357 ) 

358 res = await session.execute(statement) 

359 return res.scalars().unique().one_or_none() 

360 

361 async def is_latest_event(self, session: AsyncSession, event: WebhookEvent) -> bool: 1a

362 age_limit = utc_now() - datetime.timedelta(minutes=1) 

363 statement = ( 

364 select(func.count(WebhookEvent.id)) 

365 .join( 

366 WebhookDelivery, 

367 WebhookDelivery.webhook_event_id == WebhookEvent.id, 

368 isouter=True, 

369 ) 

370 .where( 

371 WebhookEvent.deleted_at.is_(None), 

372 WebhookEvent.webhook_endpoint_id == event.webhook_endpoint_id, 

373 WebhookEvent.id != event.id, # Not the current event 

374 WebhookDelivery.id.is_(None), # Not delivered yet 

375 WebhookEvent.created_at 

376 < event.created_at, # Older than the current event 

377 WebhookEvent.created_at >= age_limit, # Not too old 

378 ) 

379 .limit(1) 

380 ) 

381 res = await session.execute(statement) 

382 count = res.scalar_one() 

383 return count == 0 

384 

385 @overload 1a

386 async def send( 386 ↛ exitline 386 didn't return from function 'send' because 1a

387 self, 

388 session: AsyncSession, 

389 target: Organization, 

390 event: Literal[WebhookEventType.checkout_created], 

391 data: Checkout, 

392 ) -> list[WebhookEvent]: ... 

393 

394 @overload 1a

395 async def send( 395 ↛ exitline 395 didn't return from function 'send' because 1a

396 self, 

397 session: AsyncSession, 

398 target: Organization, 

399 event: Literal[WebhookEventType.checkout_updated], 

400 data: Checkout, 

401 ) -> list[WebhookEvent]: ... 

402 

403 @overload 1a

404 async def send( 404 ↛ exitline 404 didn't return from function 'send' because 1a

405 self, 

406 session: AsyncSession, 

407 target: Organization, 

408 event: Literal[WebhookEventType.customer_created], 

409 data: Customer, 

410 ) -> list[WebhookEvent]: ... 

411 

412 @overload 1a

413 async def send( 413 ↛ exitline 413 didn't return from function 'send' because 1a

414 self, 

415 session: AsyncSession, 

416 target: Organization, 

417 event: Literal[WebhookEventType.customer_updated], 

418 data: Customer, 

419 ) -> list[WebhookEvent]: ... 

420 

421 @overload 1a

422 async def send( 422 ↛ exitline 422 didn't return from function 'send' because 1a

423 self, 

424 session: AsyncSession, 

425 target: Organization, 

426 event: Literal[WebhookEventType.customer_deleted], 

427 data: Customer, 

428 ) -> list[WebhookEvent]: ... 

429 

430 @overload 1a

431 async def send( 431 ↛ exitline 431 didn't return from function 'send' because 1a

432 self, 

433 session: AsyncSession, 

434 target: Organization, 

435 event: Literal[WebhookEventType.customer_state_changed], 

436 data: CustomerState, 

437 ) -> list[WebhookEvent]: ... 

438 

439 @overload 1a

440 async def send( 440 ↛ exitline 440 didn't return from function 'send' because 1a

441 self, 

442 session: AsyncSession, 

443 target: Organization, 

444 event: Literal[WebhookEventType.customer_seat_assigned], 

445 data: CustomerSeat, 

446 ) -> list[WebhookEvent]: ... 

447 

448 @overload 1a

449 async def send( 449 ↛ exitline 449 didn't return from function 'send' because 1a

450 self, 

451 session: AsyncSession, 

452 target: Organization, 

453 event: Literal[WebhookEventType.customer_seat_claimed], 

454 data: CustomerSeat, 

455 ) -> list[WebhookEvent]: ... 

456 

457 @overload 1a

458 async def send( 458 ↛ exitline 458 didn't return from function 'send' because 1a

459 self, 

460 session: AsyncSession, 

461 target: Organization, 

462 event: Literal[WebhookEventType.customer_seat_revoked], 

463 data: CustomerSeat, 

464 ) -> list[WebhookEvent]: ... 

465 

466 @overload 1a

467 async def send( 467 ↛ exitline 467 didn't return from function 'send' because 1a

468 self, 

469 session: AsyncSession, 

470 target: Organization, 

471 event: Literal[WebhookEventType.order_created], 

472 data: Order, 

473 ) -> list[WebhookEvent]: ... 

474 

475 @overload 1a

476 async def send( 476 ↛ exitline 476 didn't return from function 'send' because 1a

477 self, 

478 session: AsyncSession, 

479 target: Organization, 

480 event: Literal[WebhookEventType.order_updated], 

481 data: Order, 

482 ) -> list[WebhookEvent]: ... 

483 

484 @overload 1a

485 async def send( 485 ↛ exitline 485 didn't return from function 'send' because 1a

486 self, 

487 session: AsyncSession, 

488 target: Organization, 

489 event: Literal[WebhookEventType.order_paid], 

490 data: Order, 

491 ) -> list[WebhookEvent]: ... 

492 

493 @overload 1a

494 async def send( 494 ↛ exitline 494 didn't return from function 'send' because 1a

495 self, 

496 session: AsyncSession, 

497 target: Organization, 

498 event: Literal[WebhookEventType.order_refunded], 

499 data: Order, 

500 ) -> list[WebhookEvent]: ... 

501 

502 @overload 1a

503 async def send( 503 ↛ exitline 503 didn't return from function 'send' because 1a

504 self, 

505 session: AsyncSession, 

506 target: Organization, 

507 event: Literal[WebhookEventType.subscription_created], 

508 data: Subscription, 

509 ) -> list[WebhookEvent]: ... 

510 

511 @overload 1a

512 async def send( 512 ↛ exitline 512 didn't return from function 'send' because 1a

513 self, 

514 session: AsyncSession, 

515 target: Organization, 

516 event: Literal[WebhookEventType.subscription_updated], 

517 data: Subscription, 

518 ) -> list[WebhookEvent]: ... 

519 

520 @overload 1a

521 async def send( 521 ↛ exitline 521 didn't return from function 'send' because 1a

522 self, 

523 session: AsyncSession, 

524 target: Organization, 

525 event: Literal[WebhookEventType.subscription_active], 

526 data: Subscription, 

527 ) -> list[WebhookEvent]: ... 

528 

529 @overload 1a

530 async def send( 530 ↛ exitline 530 didn't return from function 'send' because 1a

531 self, 

532 session: AsyncSession, 

533 target: Organization, 

534 event: Literal[WebhookEventType.subscription_canceled], 

535 data: Subscription, 

536 ) -> list[WebhookEvent]: ... 

537 

538 @overload 1a

539 async def send( 539 ↛ exitline 539 didn't return from function 'send' because 1a

540 self, 

541 session: AsyncSession, 

542 target: Organization, 

543 event: Literal[WebhookEventType.subscription_uncanceled], 

544 data: Subscription, 

545 ) -> list[WebhookEvent]: ... 

546 

547 @overload 1a

548 async def send( 548 ↛ exitline 548 didn't return from function 'send' because 1a

549 self, 

550 session: AsyncSession, 

551 target: Organization, 

552 event: Literal[WebhookEventType.subscription_revoked], 

553 data: Subscription, 

554 ) -> list[WebhookEvent]: ... 

555 

556 @overload 1a

557 async def send( 557 ↛ exitline 557 didn't return from function 'send' because 1a

558 self, 

559 session: AsyncSession, 

560 target: Organization, 

561 event: Literal[WebhookEventType.refund_created], 

562 data: Refund, 

563 ) -> list[WebhookEvent]: ... 

564 

565 @overload 1a

566 async def send( 566 ↛ exitline 566 didn't return from function 'send' because 1a

567 self, 

568 session: AsyncSession, 

569 target: Organization, 

570 event: Literal[WebhookEventType.refund_updated], 

571 data: Refund, 

572 ) -> list[WebhookEvent]: ... 

573 

574 @overload 1a

575 async def send( 575 ↛ exitline 575 didn't return from function 'send' because 1a

576 self, 

577 session: AsyncSession, 

578 target: Organization, 

579 event: Literal[WebhookEventType.product_created], 

580 data: Product, 

581 ) -> list[WebhookEvent]: ... 

582 

583 @overload 1a

584 async def send( 584 ↛ exitline 584 didn't return from function 'send' because 1a

585 self, 

586 session: AsyncSession, 

587 target: Organization, 

588 event: Literal[WebhookEventType.product_updated], 

589 data: Product, 

590 ) -> list[WebhookEvent]: ... 

591 

592 @overload 1a

593 async def send( 593 ↛ exitline 593 didn't return from function 'send' because 1a

594 self, 

595 session: AsyncSession, 

596 target: Organization, 

597 event: Literal[WebhookEventType.organization_updated], 

598 data: Organization, 

599 ) -> list[WebhookEvent]: ... 

600 

601 @overload 1a

602 async def send( 602 ↛ exitline 602 didn't return from function 'send' because 1a

603 self, 

604 session: AsyncSession, 

605 target: Organization, 

606 event: Literal[WebhookEventType.benefit_created], 

607 data: Benefit, 

608 ) -> list[WebhookEvent]: ... 

609 

610 @overload 1a

611 async def send( 611 ↛ exitline 611 didn't return from function 'send' because 1a

612 self, 

613 session: AsyncSession, 

614 target: Organization, 

615 event: Literal[WebhookEventType.benefit_updated], 

616 data: Benefit, 

617 ) -> list[WebhookEvent]: ... 

618 

619 @overload 1a

620 async def send( 620 ↛ exitline 620 didn't return from function 'send' because 1a

621 self, 

622 session: AsyncSession, 

623 target: Organization, 

624 event: Literal[WebhookEventType.benefit_grant_created], 

625 data: BenefitGrant, 

626 ) -> list[WebhookEvent]: ... 

627 

628 @overload 1a

629 async def send( 629 ↛ exitline 629 didn't return from function 'send' because 1a

630 self, 

631 session: AsyncSession, 

632 target: Organization, 

633 event: Literal[WebhookEventType.benefit_grant_updated], 

634 data: BenefitGrant, 

635 ) -> list[WebhookEvent]: ... 

636 

637 @overload 1a

638 async def send( 638 ↛ exitline 638 didn't return from function 'send' because 1a

639 self, 

640 session: AsyncSession, 

641 target: Organization, 

642 event: Literal[WebhookEventType.benefit_grant_cycled], 

643 data: BenefitGrant, 

644 ) -> list[WebhookEvent]: ... 

645 

646 @overload 1a

647 async def send( 647 ↛ exitline 647 didn't return from function 'send' because 1a

648 self, 

649 session: AsyncSession, 

650 target: Organization, 

651 event: Literal[WebhookEventType.benefit_grant_revoked], 

652 data: BenefitGrant, 

653 ) -> list[WebhookEvent]: ... 

654 

655 async def send( 1a

656 self, 

657 session: AsyncSession, 

658 target: Organization, 

659 event: WebhookEventType, 

660 data: object, 

661 ) -> list[WebhookEvent]: 

662 now = utc_now() 

663 payload = WebhookPayloadTypeAdapter.validate_python( 

664 {"type": event, "timestamp": now, "data": data} 

665 ) 

666 

667 events: list[WebhookEvent] = [] 

668 for endpoint in await self._get_event_target_endpoints( 

669 session, event=event, target=target 

670 ): 

671 try: 

672 payload_data = payload.get_payload(endpoint.format, target) 

673 event_type = WebhookEvent( 

674 created_at=payload.timestamp, 

675 webhook_endpoint=endpoint, 

676 type=event, 

677 payload=payload_data, 

678 ) 

679 session.add(event_type) 

680 events.append(event_type) 

681 await session.flush() 

682 enqueue_job("webhook_event.send", webhook_event_id=event_type.id) 

683 except UnsupportedTarget as e: 

684 # Log the error but do not raise to not fail the whole request 

685 log.error(e.message) 

686 continue 

687 except SkipEvent: 

688 continue 

689 

690 return events 

691 

692 async def archive_events( 1a

693 self, 

694 session: AsyncSession, 

695 older_than: datetime.datetime, 

696 batch_size: int = 5000, 

697 ) -> None: 

698 log.debug( 

699 "Archive webhook events", older_than=older_than, batch_size=batch_size 

700 ) 

701 

702 while True: 

703 batch_subquery = ( 

704 select(WebhookEvent.id) 

705 .where( 

706 WebhookEvent.created_at < older_than, 

707 WebhookEvent.payload.is_not(None), 

708 ) 

709 .order_by(WebhookEvent.created_at.asc()) 

710 .limit(batch_size) 

711 ) 

712 statement = ( 

713 update(WebhookEvent) 

714 .where(WebhookEvent.id.in_(batch_subquery)) 

715 .values(payload=None) 

716 ) 

717 

718 # https://github.com/sqlalchemy/sqlalchemy/commit/67f62aac5b49b6d048ca39019e5bd123d3c9cfb2 

719 result = cast(CursorResult[WebhookEvent], await session.execute(statement)) 

720 updated_count = result.rowcount 

721 

722 await session.commit() 

723 

724 log.debug("Archived webhook events batch", updated_count=updated_count) 

725 

726 if updated_count < batch_size: 

727 break 

728 

729 def _get_readable_endpoints_statement( 1a

730 self, auth_subject: AuthSubject[User | Organization] 

731 ) -> Select[tuple[WebhookEndpoint]]: 

732 statement = select(WebhookEndpoint).where(WebhookEndpoint.deleted_at.is_(None)) 

733 

734 if is_user(auth_subject): 

735 user = auth_subject.subject 

736 statement = statement.where( 

737 WebhookEndpoint.organization_id.in_( 

738 select(UserOrganization.organization_id).where( 

739 UserOrganization.user_id == user.id, 

740 UserOrganization.deleted_at.is_(None), 

741 ) 

742 ) 

743 ) 

744 elif is_organization(auth_subject): 

745 statement = statement.where( 

746 WebhookEndpoint.organization_id == auth_subject.subject.id 

747 ) 

748 

749 return statement 

750 

751 async def _get_event_target_endpoints( 1a

752 self, 

753 session: AsyncSession, 

754 *, 

755 event: WebhookEventType, 

756 target: Organization, 

757 ) -> Sequence[WebhookEndpoint]: 

758 statement = select(WebhookEndpoint).where( 

759 WebhookEndpoint.deleted_at.is_(None), 

760 WebhookEndpoint.enabled.is_(True), 

761 WebhookEndpoint.events.bool_op("@>")(text(f"'[\"{event}\"]'")), 

762 WebhookEndpoint.organization_id == target.id, 

763 ) 

764 res = await session.execute(statement) 

765 return res.scalars().unique().all() 

766 

767 

768webhook = WebhookService() 1a