Coverage for polar/external_event/service.py: 41%

53 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import contextlib 1a

2import uuid 1a

3from collections.abc import AsyncIterator 1a

4from typing import Any, cast 1a

5 

6from polar.exceptions import PolarError 1a

7from polar.kit.utils import utc_now 1a

8from polar.models import ExternalEvent 1a

9from polar.models.external_event import ExternalEventSource, StripeEvent 1a

10from polar.postgres import AsyncSession 1a

11from polar.worker import enqueue_job 1a

12 

13from .repository import ExternalEventRepository 1a

14 

15 

16class ExternalEventError(PolarError): ... 1a

17 

18 

19class ExternalEventDoesNotExist(ExternalEventError): 1a

20 def __init__(self, event_id: uuid.UUID) -> None: 1a

21 self.event_id = event_id 

22 message = f"External event {event_id} does not exist." 

23 super().__init__(message) 

24 

25 

26class ExternalEventAlreadyHandled(ExternalEventError): 1a

27 def __init__(self, event_id: uuid.UUID) -> None: 1a

28 self.event_id = event_id 

29 message = f"External event {event_id} has already been handled." 

30 super().__init__(message) 

31 

32 

33class ExternalEventService: 1a

34 async def enqueue( 1a

35 self, 

36 session: AsyncSession, 

37 source: ExternalEventSource, 

38 task_name: str, 

39 external_id: str, 

40 data: dict[str, Any], 

41 ) -> ExternalEvent: 

42 repository = ExternalEventRepository.from_session(session) 

43 

44 event = await repository.get_by_source_and_external_id(source, external_id) 

45 if event is not None: 

46 return event 

47 

48 event = await repository.create( 

49 ExternalEvent( 

50 source=source, task_name=task_name, external_id=external_id, data=data 

51 ), 

52 flush=True, 

53 ) 

54 enqueue_job(task_name, event.id) 

55 return event 

56 

57 async def resend(self, event: ExternalEvent) -> None: 1a

58 if event.is_handled: 

59 raise ExternalEventAlreadyHandled(event.id) 

60 enqueue_job(event.task_name, event.id) 

61 

62 @contextlib.asynccontextmanager 1a

63 async def handle( 1a

64 self, session: AsyncSession, source: ExternalEventSource, event_id: uuid.UUID 

65 ) -> AsyncIterator[ExternalEvent]: 

66 repository = ExternalEventRepository.from_session(session) 

67 event = await repository.get_by_source_and_id(source, event_id) 

68 if event is None: 

69 raise ExternalEventDoesNotExist(event_id) 

70 if event.is_handled: 

71 raise ExternalEventAlreadyHandled(event_id) 

72 

73 try: 

74 yield event 

75 except Exception: 

76 raise 

77 else: 

78 await repository.update(event, update_dict={"handled_at": utc_now()}) 

79 

80 @contextlib.asynccontextmanager 1a

81 async def handle_stripe( 1a

82 self, session: AsyncSession, event_id: uuid.UUID 

83 ) -> AsyncIterator[StripeEvent]: 

84 async with self.handle(session, ExternalEventSource.stripe, event_id) as event: 

85 yield cast(StripeEvent, event) 

86 

87 

88external_event = ExternalEventService() 1a