Coverage for polar/webhook/service.py: 41%
251 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import datetime 1a
2import json 1a
3from collections.abc import Sequence 1a
4from typing import Literal, cast, overload 1a
5from uuid import UUID 1a
7import structlog 1a
8from sqlalchemy import CursorResult, Select, desc, func, select, text, update 1a
9from sqlalchemy.orm import contains_eager, joinedload 1a
11from polar.auth.models import AuthSubject, is_organization, is_user 1a
12from polar.checkout.eventstream import CheckoutEvent, publish_checkout_event 1a
13from polar.checkout.repository import CheckoutRepository 1a
14from polar.config import settings 1a
15from polar.customer.schemas.state import CustomerState 1a
16from polar.email.react import render_email_template 1a
17from polar.email.schemas import EmailAdapter 1a
18from polar.email.sender import enqueue_email 1a
19from polar.exceptions import PolarError, ResourceNotFound 1a
20from polar.integrations.loops.service import loops as loops_service 1a
21from polar.kit.crypto import generate_token 1a
22from polar.kit.db.postgres import AsyncSession 1a
23from polar.kit.pagination import PaginationParams, paginate 1a
24from polar.kit.utils import utc_now 1a
25from polar.logging import Logger 1a
26from polar.models import ( 1a
27 Benefit,
28 BenefitGrant,
29 Checkout,
30 Customer,
31 CustomerSeat,
32 Order,
33 Organization,
34 Product,
35 Refund,
36 Subscription,
37 User,
38 UserOrganization,
39 WebhookDelivery,
40 WebhookEvent,
41)
42from polar.models.webhook_endpoint import ( 1a
43 WebhookEndpoint,
44 WebhookEventType,
45 WebhookFormat,
46)
47from polar.oauth2.constants import WEBHOOK_SECRET_PREFIX 1a
48from polar.organization.resolver import get_payload_organization 1a
49from polar.user_organization.service import ( 1a
50 user_organization as user_organization_service,
51)
52from polar.webhook.repository import ( 1a
53 WebhookEndpointRepository,
54 WebhookEventRepository,
55)
56from polar.webhook.schemas import ( 1a
57 WebhookEndpointCreate,
58 WebhookEndpointUpdate,
59)
60from polar.worker import enqueue_job 1a
62from .webhooks import SkipEvent, UnsupportedTarget, WebhookPayloadTypeAdapter 1a
64log: Logger = structlog.get_logger() 1a
67class WebhookError(PolarError): ... 1a
70class EventDoesNotExist(WebhookError): 1a
71 def __init__(self, event_id: UUID) -> None: 1a
72 self.event_id = event_id
73 message = f"Event with ID {event_id} does not exist."
74 super().__init__(message)
77class EventNotSuccessul(WebhookError): 1a
78 def __init__(self, event_id: UUID) -> None: 1a
79 self.event_id = event_id
80 message = f"Event with ID {event_id} is not successful."
81 super().__init__(message)
84class WebhookService: 1a
85 async def list_endpoints( 1a
86 self,
87 session: AsyncSession,
88 auth_subject: AuthSubject[User | Organization],
89 *,
90 organization_id: Sequence[UUID] | None,
91 pagination: PaginationParams,
92 ) -> tuple[Sequence[WebhookEndpoint], int]:
93 statement = self._get_readable_endpoints_statement(auth_subject)
95 if organization_id is not None:
96 statement = statement.where(
97 WebhookEndpoint.organization_id.in_(organization_id)
98 )
100 statement = statement.order_by(WebhookEndpoint.created_at.desc())
102 results, count = await paginate(session, statement, pagination=pagination)
104 return results, count
106 async def get_endpoint( 1a
107 self,
108 session: AsyncSession,
109 auth_subject: AuthSubject[User | Organization],
110 id: UUID,
111 ) -> WebhookEndpoint | None:
112 statement = self._get_readable_endpoints_statement(auth_subject).where(
113 WebhookEndpoint.id == id
114 )
115 res = await session.execute(statement)
116 return res.scalars().unique().one_or_none()
118 async def create_endpoint( 1a
119 self,
120 session: AsyncSession,
121 auth_subject: AuthSubject[User | Organization],
122 create_schema: WebhookEndpointCreate,
123 ) -> WebhookEndpoint:
124 organization = await get_payload_organization(
125 session, auth_subject, create_schema
126 )
127 if create_schema.secret is not None:
128 secret = create_schema.secret
129 else:
130 secret = generate_token(prefix=WEBHOOK_SECRET_PREFIX)
131 endpoint = WebhookEndpoint(
132 **create_schema.model_dump(exclude={"secret"}, by_alias=True),
133 secret=secret,
134 organization=organization,
135 )
136 session.add(endpoint)
138 # Store it in Loops in case we need to announce technical things regarding webhooks
139 user_organizations = await user_organization_service.list_by_org(
140 session, organization.id
141 )
142 for user_organization in user_organizations:
143 await loops_service.user_update(
144 session, user_organization.user, webhooksCreated=True
145 )
147 return endpoint
149 async def update_endpoint( 1a
150 self,
151 session: AsyncSession,
152 *,
153 endpoint: WebhookEndpoint,
154 update_schema: WebhookEndpointUpdate,
155 ) -> WebhookEndpoint:
156 for attr, value in update_schema.model_dump(
157 exclude_unset=True, exclude_none=True
158 ).items():
159 setattr(endpoint, attr, value)
160 session.add(endpoint)
161 return endpoint
163 async def reset_endpoint_secret( 1a
164 self, session: AsyncSession, *, endpoint: WebhookEndpoint
165 ) -> WebhookEndpoint:
166 endpoint.secret = generate_token(prefix=WEBHOOK_SECRET_PREFIX)
167 session.add(endpoint)
168 return endpoint
170 async def delete_endpoint( 1a
171 self,
172 session: AsyncSession,
173 endpoint: WebhookEndpoint,
174 ) -> WebhookEndpoint:
175 endpoint.deleted_at = utc_now()
176 session.add(endpoint)
177 return endpoint
179 async def list_deliveries( 1a
180 self,
181 session: AsyncSession,
182 auth_subject: AuthSubject[User | Organization],
183 *,
184 endpoint_id: Sequence[UUID] | None = None,
185 start_timestamp: datetime.datetime | None = None,
186 end_timestamp: datetime.datetime | None = None,
187 pagination: PaginationParams,
188 ) -> tuple[Sequence[WebhookDelivery], int]:
189 readable_endpoints_statement = self._get_readable_endpoints_statement(
190 auth_subject
191 )
192 statement = (
193 select(WebhookDelivery)
194 .join(WebhookEndpoint)
195 .where(
196 WebhookDelivery.deleted_at.is_(None),
197 WebhookEndpoint.id.in_(
198 readable_endpoints_statement.with_only_columns(WebhookEndpoint.id)
199 ),
200 )
201 .options(joinedload(WebhookDelivery.webhook_event))
202 .order_by(desc(WebhookDelivery.created_at))
203 )
205 if endpoint_id is not None:
206 statement = statement.where(
207 WebhookDelivery.webhook_endpoint_id.in_(endpoint_id)
208 )
210 if start_timestamp is not None:
211 statement = statement.where(WebhookDelivery.created_at > start_timestamp)
213 if end_timestamp is not None:
214 statement = statement.where(WebhookDelivery.created_at < end_timestamp)
216 return await paginate(session, statement, pagination=pagination)
218 async def redeliver_event( 1a
219 self,
220 session: AsyncSession,
221 auth_subject: AuthSubject[User | Organization],
222 id: UUID,
223 ) -> None:
224 readable_endpoints_statement = self._get_readable_endpoints_statement(
225 auth_subject
226 )
227 statement = (
228 select(WebhookEvent)
229 .join(WebhookEndpoint)
230 .where(
231 WebhookEvent.id == id,
232 WebhookEvent.deleted_at.is_(None),
233 WebhookEvent.is_archived.is_(False),
234 WebhookEndpoint.id.in_(
235 readable_endpoints_statement.with_only_columns(WebhookEndpoint.id)
236 ),
237 )
238 .options(contains_eager(WebhookEvent.webhook_endpoint))
239 )
241 res = await session.execute(statement)
242 event = res.scalars().unique().one_or_none()
243 if event is None:
244 raise ResourceNotFound()
246 enqueue_job("webhook_event.send", webhook_event_id=event.id, redeliver=True)
248 async def on_event_success(self, session: AsyncSession, id: UUID) -> None: 1a
249 """
250 Helper to hook into the event success event.
252 Useful to trigger logic that might wait for an event to be delivered.
253 """
254 event = await self.get_event_by_id(session, id)
255 if event is None:
256 raise EventDoesNotExist(id)
258 if not event.succeeded:
259 raise EventNotSuccessul(id)
261 if event.webhook_endpoint.format != WebhookFormat.raw:
262 return
264 if event.payload is None:
265 return
267 if event.type == WebhookEventType.checkout_updated:
268 checkout_repository = CheckoutRepository.from_session(session)
269 payload = json.loads(event.payload)
270 checkout = await checkout_repository.get_by_id(UUID(payload["data"]["id"]))
271 assert checkout is not None
272 await publish_checkout_event(
273 checkout.client_secret,
274 CheckoutEvent.webhook_event_delivered,
275 {"status": checkout.status},
276 )
278 async def on_event_failed(self, session: AsyncSession, id: UUID) -> None: 1a
279 """
280 Helper to hook into the event failed event.
282 Detects consecutive failures and disables the endpoint if threshold is exceeded.
283 """
284 event = await self.get_event_by_id(session, id)
285 if event is None:
286 raise EventDoesNotExist(id)
288 if event.succeeded is not False:
289 return
291 endpoint = event.webhook_endpoint
292 if not endpoint.enabled:
293 return
295 # Get recent events to count the streak
296 webhook_event_repository = WebhookEventRepository.from_session(session)
297 recent_events = await webhook_event_repository.get_recent_by_endpoint(
298 endpoint.id, limit=settings.WEBHOOK_FAILURE_THRESHOLD
299 )
301 # Check if all recent events are failures
302 if len(recent_events) >= settings.WEBHOOK_FAILURE_THRESHOLD and all(
303 event.succeeded is False for event in recent_events
304 ):
305 log.warning(
306 "Disabling webhook endpoint due to consecutive failures",
307 webhook_endpoint_id=endpoint.id,
308 failure_count=len(recent_events),
309 )
310 webhook_endpoint_repository = WebhookEndpointRepository.from_session(
311 session
312 )
313 await webhook_endpoint_repository.update(
314 endpoint, update_dict={"enabled": False}, flush=True
315 )
317 # Send email to all organization members
318 organization_id = endpoint.organization_id
319 user_organizations = await user_organization_service.list_by_org(
320 session, organization_id
321 )
323 if user_organizations:
324 # User and Organization are eagerly loaded
325 organization = user_organizations[0].organization
326 dashboard_url = f"{settings.FRONTEND_BASE_URL}/dashboard/{organization.slug}/settings/webhooks"
328 for user_org in user_organizations:
329 user = user_org.user
330 email = EmailAdapter.validate_python(
331 {
332 "template": "webhook_endpoint_disabled",
333 "props": {
334 "email": user.email,
335 "organization": organization,
336 "webhook_endpoint_url": endpoint.url,
337 "dashboard_url": dashboard_url,
338 },
339 }
340 )
342 body = render_email_template(email)
344 enqueue_email(
345 to_email_addr=user.email,
346 subject=f"Webhook endpoint disabled for {organization.name}",
347 html_content=body,
348 )
350 async def get_event_by_id( 1a
351 self, session: AsyncSession, id: UUID
352 ) -> WebhookEvent | None:
353 statement = (
354 select(WebhookEvent)
355 .where(WebhookEvent.deleted_at.is_(None), WebhookEvent.id == id)
356 .options(joinedload(WebhookEvent.webhook_endpoint))
357 )
358 res = await session.execute(statement)
359 return res.scalars().unique().one_or_none()
361 async def is_latest_event(self, session: AsyncSession, event: WebhookEvent) -> bool: 1a
362 age_limit = utc_now() - datetime.timedelta(minutes=1)
363 statement = (
364 select(func.count(WebhookEvent.id))
365 .join(
366 WebhookDelivery,
367 WebhookDelivery.webhook_event_id == WebhookEvent.id,
368 isouter=True,
369 )
370 .where(
371 WebhookEvent.deleted_at.is_(None),
372 WebhookEvent.webhook_endpoint_id == event.webhook_endpoint_id,
373 WebhookEvent.id != event.id, # Not the current event
374 WebhookDelivery.id.is_(None), # Not delivered yet
375 WebhookEvent.created_at
376 < event.created_at, # Older than the current event
377 WebhookEvent.created_at >= age_limit, # Not too old
378 )
379 .limit(1)
380 )
381 res = await session.execute(statement)
382 count = res.scalar_one()
383 return count == 0
385 @overload 1a
386 async def send( 386 ↛ exitline 386 didn't return from function 'send' because 1a
387 self,
388 session: AsyncSession,
389 target: Organization,
390 event: Literal[WebhookEventType.checkout_created],
391 data: Checkout,
392 ) -> list[WebhookEvent]: ...
394 @overload 1a
395 async def send( 395 ↛ exitline 395 didn't return from function 'send' because 1a
396 self,
397 session: AsyncSession,
398 target: Organization,
399 event: Literal[WebhookEventType.checkout_updated],
400 data: Checkout,
401 ) -> list[WebhookEvent]: ...
403 @overload 1a
404 async def send( 404 ↛ exitline 404 didn't return from function 'send' because 1a
405 self,
406 session: AsyncSession,
407 target: Organization,
408 event: Literal[WebhookEventType.customer_created],
409 data: Customer,
410 ) -> list[WebhookEvent]: ...
412 @overload 1a
413 async def send( 413 ↛ exitline 413 didn't return from function 'send' because 1a
414 self,
415 session: AsyncSession,
416 target: Organization,
417 event: Literal[WebhookEventType.customer_updated],
418 data: Customer,
419 ) -> list[WebhookEvent]: ...
421 @overload 1a
422 async def send( 422 ↛ exitline 422 didn't return from function 'send' because 1a
423 self,
424 session: AsyncSession,
425 target: Organization,
426 event: Literal[WebhookEventType.customer_deleted],
427 data: Customer,
428 ) -> list[WebhookEvent]: ...
430 @overload 1a
431 async def send( 431 ↛ exitline 431 didn't return from function 'send' because 1a
432 self,
433 session: AsyncSession,
434 target: Organization,
435 event: Literal[WebhookEventType.customer_state_changed],
436 data: CustomerState,
437 ) -> list[WebhookEvent]: ...
439 @overload 1a
440 async def send( 440 ↛ exitline 440 didn't return from function 'send' because 1a
441 self,
442 session: AsyncSession,
443 target: Organization,
444 event: Literal[WebhookEventType.customer_seat_assigned],
445 data: CustomerSeat,
446 ) -> list[WebhookEvent]: ...
448 @overload 1a
449 async def send( 449 ↛ exitline 449 didn't return from function 'send' because 1a
450 self,
451 session: AsyncSession,
452 target: Organization,
453 event: Literal[WebhookEventType.customer_seat_claimed],
454 data: CustomerSeat,
455 ) -> list[WebhookEvent]: ...
457 @overload 1a
458 async def send( 458 ↛ exitline 458 didn't return from function 'send' because 1a
459 self,
460 session: AsyncSession,
461 target: Organization,
462 event: Literal[WebhookEventType.customer_seat_revoked],
463 data: CustomerSeat,
464 ) -> list[WebhookEvent]: ...
466 @overload 1a
467 async def send( 467 ↛ exitline 467 didn't return from function 'send' because 1a
468 self,
469 session: AsyncSession,
470 target: Organization,
471 event: Literal[WebhookEventType.order_created],
472 data: Order,
473 ) -> list[WebhookEvent]: ...
475 @overload 1a
476 async def send( 476 ↛ exitline 476 didn't return from function 'send' because 1a
477 self,
478 session: AsyncSession,
479 target: Organization,
480 event: Literal[WebhookEventType.order_updated],
481 data: Order,
482 ) -> list[WebhookEvent]: ...
484 @overload 1a
485 async def send( 485 ↛ exitline 485 didn't return from function 'send' because 1a
486 self,
487 session: AsyncSession,
488 target: Organization,
489 event: Literal[WebhookEventType.order_paid],
490 data: Order,
491 ) -> list[WebhookEvent]: ...
493 @overload 1a
494 async def send( 494 ↛ exitline 494 didn't return from function 'send' because 1a
495 self,
496 session: AsyncSession,
497 target: Organization,
498 event: Literal[WebhookEventType.order_refunded],
499 data: Order,
500 ) -> list[WebhookEvent]: ...
502 @overload 1a
503 async def send( 503 ↛ exitline 503 didn't return from function 'send' because 1a
504 self,
505 session: AsyncSession,
506 target: Organization,
507 event: Literal[WebhookEventType.subscription_created],
508 data: Subscription,
509 ) -> list[WebhookEvent]: ...
511 @overload 1a
512 async def send( 512 ↛ exitline 512 didn't return from function 'send' because 1a
513 self,
514 session: AsyncSession,
515 target: Organization,
516 event: Literal[WebhookEventType.subscription_updated],
517 data: Subscription,
518 ) -> list[WebhookEvent]: ...
520 @overload 1a
521 async def send( 521 ↛ exitline 521 didn't return from function 'send' because 1a
522 self,
523 session: AsyncSession,
524 target: Organization,
525 event: Literal[WebhookEventType.subscription_active],
526 data: Subscription,
527 ) -> list[WebhookEvent]: ...
529 @overload 1a
530 async def send( 530 ↛ exitline 530 didn't return from function 'send' because 1a
531 self,
532 session: AsyncSession,
533 target: Organization,
534 event: Literal[WebhookEventType.subscription_canceled],
535 data: Subscription,
536 ) -> list[WebhookEvent]: ...
538 @overload 1a
539 async def send( 539 ↛ exitline 539 didn't return from function 'send' because 1a
540 self,
541 session: AsyncSession,
542 target: Organization,
543 event: Literal[WebhookEventType.subscription_uncanceled],
544 data: Subscription,
545 ) -> list[WebhookEvent]: ...
547 @overload 1a
548 async def send( 548 ↛ exitline 548 didn't return from function 'send' because 1a
549 self,
550 session: AsyncSession,
551 target: Organization,
552 event: Literal[WebhookEventType.subscription_revoked],
553 data: Subscription,
554 ) -> list[WebhookEvent]: ...
556 @overload 1a
557 async def send( 557 ↛ exitline 557 didn't return from function 'send' because 1a
558 self,
559 session: AsyncSession,
560 target: Organization,
561 event: Literal[WebhookEventType.refund_created],
562 data: Refund,
563 ) -> list[WebhookEvent]: ...
565 @overload 1a
566 async def send( 566 ↛ exitline 566 didn't return from function 'send' because 1a
567 self,
568 session: AsyncSession,
569 target: Organization,
570 event: Literal[WebhookEventType.refund_updated],
571 data: Refund,
572 ) -> list[WebhookEvent]: ...
574 @overload 1a
575 async def send( 575 ↛ exitline 575 didn't return from function 'send' because 1a
576 self,
577 session: AsyncSession,
578 target: Organization,
579 event: Literal[WebhookEventType.product_created],
580 data: Product,
581 ) -> list[WebhookEvent]: ...
583 @overload 1a
584 async def send( 584 ↛ exitline 584 didn't return from function 'send' because 1a
585 self,
586 session: AsyncSession,
587 target: Organization,
588 event: Literal[WebhookEventType.product_updated],
589 data: Product,
590 ) -> list[WebhookEvent]: ...
592 @overload 1a
593 async def send( 593 ↛ exitline 593 didn't return from function 'send' because 1a
594 self,
595 session: AsyncSession,
596 target: Organization,
597 event: Literal[WebhookEventType.organization_updated],
598 data: Organization,
599 ) -> list[WebhookEvent]: ...
601 @overload 1a
602 async def send( 602 ↛ exitline 602 didn't return from function 'send' because 1a
603 self,
604 session: AsyncSession,
605 target: Organization,
606 event: Literal[WebhookEventType.benefit_created],
607 data: Benefit,
608 ) -> list[WebhookEvent]: ...
610 @overload 1a
611 async def send( 611 ↛ exitline 611 didn't return from function 'send' because 1a
612 self,
613 session: AsyncSession,
614 target: Organization,
615 event: Literal[WebhookEventType.benefit_updated],
616 data: Benefit,
617 ) -> list[WebhookEvent]: ...
619 @overload 1a
620 async def send( 620 ↛ exitline 620 didn't return from function 'send' because 1a
621 self,
622 session: AsyncSession,
623 target: Organization,
624 event: Literal[WebhookEventType.benefit_grant_created],
625 data: BenefitGrant,
626 ) -> list[WebhookEvent]: ...
628 @overload 1a
629 async def send( 629 ↛ exitline 629 didn't return from function 'send' because 1a
630 self,
631 session: AsyncSession,
632 target: Organization,
633 event: Literal[WebhookEventType.benefit_grant_updated],
634 data: BenefitGrant,
635 ) -> list[WebhookEvent]: ...
637 @overload 1a
638 async def send( 638 ↛ exitline 638 didn't return from function 'send' because 1a
639 self,
640 session: AsyncSession,
641 target: Organization,
642 event: Literal[WebhookEventType.benefit_grant_cycled],
643 data: BenefitGrant,
644 ) -> list[WebhookEvent]: ...
646 @overload 1a
647 async def send( 647 ↛ exitline 647 didn't return from function 'send' because 1a
648 self,
649 session: AsyncSession,
650 target: Organization,
651 event: Literal[WebhookEventType.benefit_grant_revoked],
652 data: BenefitGrant,
653 ) -> list[WebhookEvent]: ...
655 async def send( 1a
656 self,
657 session: AsyncSession,
658 target: Organization,
659 event: WebhookEventType,
660 data: object,
661 ) -> list[WebhookEvent]:
662 now = utc_now()
663 payload = WebhookPayloadTypeAdapter.validate_python(
664 {"type": event, "timestamp": now, "data": data}
665 )
667 events: list[WebhookEvent] = []
668 for endpoint in await self._get_event_target_endpoints(
669 session, event=event, target=target
670 ):
671 try:
672 payload_data = payload.get_payload(endpoint.format, target)
673 event_type = WebhookEvent(
674 created_at=payload.timestamp,
675 webhook_endpoint=endpoint,
676 type=event,
677 payload=payload_data,
678 )
679 session.add(event_type)
680 events.append(event_type)
681 await session.flush()
682 enqueue_job("webhook_event.send", webhook_event_id=event_type.id)
683 except UnsupportedTarget as e:
684 # Log the error but do not raise to not fail the whole request
685 log.error(e.message)
686 continue
687 except SkipEvent:
688 continue
690 return events
692 async def archive_events( 1a
693 self,
694 session: AsyncSession,
695 older_than: datetime.datetime,
696 batch_size: int = 5000,
697 ) -> None:
698 log.debug(
699 "Archive webhook events", older_than=older_than, batch_size=batch_size
700 )
702 while True:
703 batch_subquery = (
704 select(WebhookEvent.id)
705 .where(
706 WebhookEvent.created_at < older_than,
707 WebhookEvent.payload.is_not(None),
708 )
709 .order_by(WebhookEvent.created_at.asc())
710 .limit(batch_size)
711 )
712 statement = (
713 update(WebhookEvent)
714 .where(WebhookEvent.id.in_(batch_subquery))
715 .values(payload=None)
716 )
718 # https://github.com/sqlalchemy/sqlalchemy/commit/67f62aac5b49b6d048ca39019e5bd123d3c9cfb2
719 result = cast(CursorResult[WebhookEvent], await session.execute(statement))
720 updated_count = result.rowcount
722 await session.commit()
724 log.debug("Archived webhook events batch", updated_count=updated_count)
726 if updated_count < batch_size:
727 break
729 def _get_readable_endpoints_statement( 1a
730 self, auth_subject: AuthSubject[User | Organization]
731 ) -> Select[tuple[WebhookEndpoint]]:
732 statement = select(WebhookEndpoint).where(WebhookEndpoint.deleted_at.is_(None))
734 if is_user(auth_subject):
735 user = auth_subject.subject
736 statement = statement.where(
737 WebhookEndpoint.organization_id.in_(
738 select(UserOrganization.organization_id).where(
739 UserOrganization.user_id == user.id,
740 UserOrganization.deleted_at.is_(None),
741 )
742 )
743 )
744 elif is_organization(auth_subject):
745 statement = statement.where(
746 WebhookEndpoint.organization_id == auth_subject.subject.id
747 )
749 return statement
751 async def _get_event_target_endpoints( 1a
752 self,
753 session: AsyncSession,
754 *,
755 event: WebhookEventType,
756 target: Organization,
757 ) -> Sequence[WebhookEndpoint]:
758 statement = select(WebhookEndpoint).where(
759 WebhookEndpoint.deleted_at.is_(None),
760 WebhookEndpoint.enabled.is_(True),
761 WebhookEndpoint.events.bool_op("@>")(text(f"'[\"{event}\"]'")),
762 WebhookEndpoint.organization_id == target.id,
763 )
764 res = await session.execute(statement)
765 return res.scalars().unique().all()
768webhook = WebhookService() 1a