Coverage for polar/integrations/discord/service.py: 34%
59 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import structlog 1a
3from polar.config import settings 1a
4from polar.exceptions import PolarError 1a
5from polar.logging import Logger 1a
6from polar.models import Customer, User 1a
8from .client import bot_client 1a
9from .schemas import DiscordGuild, DiscordGuildRole 1a
11log: Logger = structlog.get_logger() 1a
14class DiscordError(PolarError): ... 1a
17class DiscordAccountNotConnected(DiscordError): 1a
18 def __init__(self, user: User) -> None: 1a
19 self.user = user
20 message = "You don't have a Discord account connected."
21 super().__init__(message)
24class DiscordExpiredAccessToken(DiscordError): 1a
25 def __init__(self, user: User) -> None: 1a
26 self.user = user
27 message = "The access token is expired and no refresh token is available."
28 super().__init__(message, 401)
31class DiscordCustomerAccountDoesNotExist(DiscordError): 1a
32 def __init__(self, customer: Customer, account_id: str) -> None: 1a
33 self.customer = customer
34 self.account_id = account_id
35 message = (
36 f"The Discord account {account_id} does not exist "
37 f"on customer {customer.id}."
38 )
39 super().__init__(message)
42class DiscordCustomerExpiredAccessToken(DiscordError): 1a
43 def __init__(self, customer: Customer, account_id: str) -> None: 1a
44 self.customer = customer
45 self.account_id = account_id
46 message = "The access token is expired and no refresh token is available."
47 super().__init__(message, 401)
50class DiscordBotService: 1a
51 async def get_guild(self, id: str) -> DiscordGuild: 1a
52 guild = await bot_client.get_guild(id)
54 roles: list[DiscordGuildRole] = []
55 for role in sorted(guild["roles"], key=lambda r: r["position"], reverse=True):
56 # Keep standard roles
57 if not role["managed"]:
58 roles.append(
59 DiscordGuildRole.model_validate({**role, "is_polar_bot": False})
60 )
61 continue
63 # Keep only our bot role
64 if tags := role.get("tags"):
65 if tags.get("bot_id") == settings.DISCORD_CLIENT_ID:
66 roles.append(
67 DiscordGuildRole.model_validate({**role, "is_polar_bot": True})
68 )
70 return DiscordGuild(name=guild["name"], roles=roles)
72 async def add_member( 1a
73 self, guild_id: str, role_id: str, account_id: str, access_token: str
74 ) -> None:
75 await bot_client.add_member(
76 guild_id=guild_id,
77 discord_user_id=account_id,
78 discord_user_access_token=access_token,
79 role_id=role_id,
80 )
82 async def remove_member_role( 1a
83 self, guild_id: str, role_id: str, account_id: str
84 ) -> None:
85 await bot_client.remove_member_role(
86 guild_id=guild_id,
87 discord_user_id=account_id,
88 role_id=role_id,
89 )
91 async def remove_member(self, guild_id: str, account_id: str) -> None: 1a
92 await bot_client.remove_member(guild_id=guild_id, discord_user_id=account_id)
94 async def is_bot_role_above_role(self, guild_id: str, role_id: str) -> bool: 1a
95 """
96 Checks if our bot's role has a higher position than the one we want to grant.
98 There is a hierarchy in Discord roles. For our bot to grant a specific role,
99 it has to be *above* this role.
100 """
101 guild = await bot_client.get_guild(guild_id)
102 for role in sorted(guild["roles"], key=lambda r: r["position"]):
103 if tags := role.get("tags"):
104 if tags.get("bot_id") == settings.DISCORD_CLIENT_ID:
105 return False
106 if role["id"] == role_id:
107 return True
109 raise DiscordError("Roles not found")
112discord_bot = DiscordBotService() 1a