Coverage for polar/eventstream/service.py: 41%

49 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from typing import Any 1a

2from uuid import UUID 1a

3 

4import structlog 1a

5from pydantic import BaseModel 1a

6 

7from polar.kit.utils import generate_uuid 1a

8from polar.logging import Logger 1a

9from polar.postgres import AsyncSession 1a

10from polar.redis import Redis 1a

11from polar.user_organization.service import ( 1a

12 user_organization as user_organization_service, 

13) 

14from polar.worker import enqueue_job 1a

15 

16log: Logger = structlog.get_logger() 1a

17 

18 

19class Receivers(BaseModel): 1a

20 user_id: UUID | None = None 1a

21 organization_id: UUID | None = None 1a

22 checkout_client_secret: str | None = None 1a

23 customer_id: UUID | None = None 1a

24 

25 def generate_channel_name(self, scope: str, resource_id: UUID | str) -> str: 1a

26 return f"{scope}:{resource_id}" 

27 

28 def get_channels(self) -> list[str]: 1a

29 channels = [] 

30 if self.user_id: 

31 channels.append(self.generate_channel_name("user", self.user_id)) 

32 

33 if self.organization_id: 

34 channels.append(self.generate_channel_name("org", self.organization_id)) 

35 

36 if self.checkout_client_secret: 

37 channels.append( 

38 self.generate_channel_name("checkout", self.checkout_client_secret) 

39 ) 

40 

41 if self.customer_id: 

42 channels.append(self.generate_channel_name("customer", self.customer_id)) 

43 

44 return channels 

45 

46 

47class Event(BaseModel): 1a

48 id: UUID 1a

49 key: str 1a

50 payload: dict[str, Any] 1a

51 

52 

53async def send_event(redis: Redis, event_json: str, channels: list[str]) -> None: 1a

54 for channel in channels: 

55 await redis.publish(channel, event_json) 

56 log.debug( 

57 "Published event to eventstream", event_json=event_json, channels=channels 

58 ) 

59 

60 

61async def publish( 1a

62 key: str, 

63 payload: dict[str, Any], 

64 user_id: UUID | None = None, 

65 organization_id: UUID | None = None, 

66 checkout_client_secret: str | None = None, 

67 customer_id: UUID | None = None, 

68) -> None: 

69 receivers = Receivers( 

70 user_id=user_id, 

71 organization_id=organization_id, 

72 checkout_client_secret=checkout_client_secret, 

73 customer_id=customer_id, 

74 ) 

75 channels = receivers.get_channels() 

76 event = Event( 

77 id=generate_uuid(), 

78 key=key, 

79 payload=payload, 

80 ).model_dump_json() 

81 

82 enqueue_job("eventstream.publish", event, channels) 

83 

84 

85async def publish_members( 1a

86 session: AsyncSession, key: str, payload: dict[str, Any], organization_id: UUID 

87) -> None: 

88 members = await user_organization_service.list_by_org( 

89 session, org_id=organization_id 

90 ) 

91 

92 for m in members: 

93 receivers = Receivers(user_id=m.user_id) 

94 channels = receivers.get_channels() 

95 event = Event( 

96 id=generate_uuid(), 

97 key=key, 

98 payload=payload, 

99 ).model_dump_json() 

100 

101 enqueue_job("eventstream.publish", event, channels)