Coverage for polar/external_event/service.py: 41%
53 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1import contextlib 1a
2import uuid 1a
3from collections.abc import AsyncIterator 1a
4from typing import Any, cast 1a
6from polar.exceptions import PolarError 1a
7from polar.kit.utils import utc_now 1a
8from polar.models import ExternalEvent 1a
9from polar.models.external_event import ExternalEventSource, StripeEvent 1a
10from polar.postgres import AsyncSession 1a
11from polar.worker import enqueue_job 1a
13from .repository import ExternalEventRepository 1a
16class ExternalEventError(PolarError): ... 1a
19class ExternalEventDoesNotExist(ExternalEventError): 1a
20 def __init__(self, event_id: uuid.UUID) -> None: 1a
21 self.event_id = event_id
22 message = f"External event {event_id} does not exist."
23 super().__init__(message)
26class ExternalEventAlreadyHandled(ExternalEventError): 1a
27 def __init__(self, event_id: uuid.UUID) -> None: 1a
28 self.event_id = event_id
29 message = f"External event {event_id} has already been handled."
30 super().__init__(message)
33class ExternalEventService: 1a
34 async def enqueue( 1a
35 self,
36 session: AsyncSession,
37 source: ExternalEventSource,
38 task_name: str,
39 external_id: str,
40 data: dict[str, Any],
41 ) -> ExternalEvent:
42 repository = ExternalEventRepository.from_session(session)
44 event = await repository.get_by_source_and_external_id(source, external_id)
45 if event is not None:
46 return event
48 event = await repository.create(
49 ExternalEvent(
50 source=source, task_name=task_name, external_id=external_id, data=data
51 ),
52 flush=True,
53 )
54 enqueue_job(task_name, event.id)
55 return event
57 async def resend(self, event: ExternalEvent) -> None: 1a
58 if event.is_handled:
59 raise ExternalEventAlreadyHandled(event.id)
60 enqueue_job(event.task_name, event.id)
62 @contextlib.asynccontextmanager 1a
63 async def handle( 1a
64 self, session: AsyncSession, source: ExternalEventSource, event_id: uuid.UUID
65 ) -> AsyncIterator[ExternalEvent]:
66 repository = ExternalEventRepository.from_session(session)
67 event = await repository.get_by_source_and_id(source, event_id)
68 if event is None:
69 raise ExternalEventDoesNotExist(event_id)
70 if event.is_handled:
71 raise ExternalEventAlreadyHandled(event_id)
73 try:
74 yield event
75 except Exception:
76 raise
77 else:
78 await repository.update(event, update_dict={"handled_at": utc_now()})
80 @contextlib.asynccontextmanager 1a
81 async def handle_stripe( 1a
82 self, session: AsyncSession, event_id: uuid.UUID
83 ) -> AsyncIterator[StripeEvent]:
84 async with self.handle(session, ExternalEventSource.stripe, event_id) as event:
85 yield cast(StripeEvent, event)
88external_event = ExternalEventService() 1a