Coverage for polar/eventstream/service.py: 41%
49 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1from typing import Any 1a
2from uuid import UUID 1a
4import structlog 1a
5from pydantic import BaseModel 1a
7from polar.kit.utils import generate_uuid 1a
8from polar.logging import Logger 1a
9from polar.postgres import AsyncSession 1a
10from polar.redis import Redis 1a
11from polar.user_organization.service import ( 1a
12 user_organization as user_organization_service,
13)
14from polar.worker import enqueue_job 1a
16log: Logger = structlog.get_logger() 1a
19class Receivers(BaseModel): 1a
20 user_id: UUID | None = None 1a
21 organization_id: UUID | None = None 1a
22 checkout_client_secret: str | None = None 1a
23 customer_id: UUID | None = None 1a
25 def generate_channel_name(self, scope: str, resource_id: UUID | str) -> str: 1a
26 return f"{scope}:{resource_id}"
28 def get_channels(self) -> list[str]: 1a
29 channels = []
30 if self.user_id:
31 channels.append(self.generate_channel_name("user", self.user_id))
33 if self.organization_id:
34 channels.append(self.generate_channel_name("org", self.organization_id))
36 if self.checkout_client_secret:
37 channels.append(
38 self.generate_channel_name("checkout", self.checkout_client_secret)
39 )
41 if self.customer_id:
42 channels.append(self.generate_channel_name("customer", self.customer_id))
44 return channels
47class Event(BaseModel): 1a
48 id: UUID 1a
49 key: str 1a
50 payload: dict[str, Any] 1a
53async def send_event(redis: Redis, event_json: str, channels: list[str]) -> None: 1a
54 for channel in channels:
55 await redis.publish(channel, event_json)
56 log.debug(
57 "Published event to eventstream", event_json=event_json, channels=channels
58 )
61async def publish( 1a
62 key: str,
63 payload: dict[str, Any],
64 user_id: UUID | None = None,
65 organization_id: UUID | None = None,
66 checkout_client_secret: str | None = None,
67 customer_id: UUID | None = None,
68) -> None:
69 receivers = Receivers(
70 user_id=user_id,
71 organization_id=organization_id,
72 checkout_client_secret=checkout_client_secret,
73 customer_id=customer_id,
74 )
75 channels = receivers.get_channels()
76 event = Event(
77 id=generate_uuid(),
78 key=key,
79 payload=payload,
80 ).model_dump_json()
82 enqueue_job("eventstream.publish", event, channels)
85async def publish_members( 1a
86 session: AsyncSession, key: str, payload: dict[str, Any], organization_id: UUID
87) -> None:
88 members = await user_organization_service.list_by_org(
89 session, org_id=organization_id
90 )
92 for m in members:
93 receivers = Receivers(user_id=m.user_id)
94 channels = receivers.get_channels()
95 event = Event(
96 id=generate_uuid(),
97 key=key,
98 payload=payload,
99 ).model_dump_json()
101 enqueue_job("eventstream.publish", event, channels)