Coverage for polar/oauth2/service/oauth2_grant.py: 29%
38 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import uuid 1a
3from sqlalchemy import select 1a
5from polar.exceptions import PolarError 1a
6from polar.kit.db.postgres import Session as SyncSession 1a
7from polar.kit.services import ResourceServiceReader 1a
8from polar.models import OAuth2Grant 1a
10from ..sub_type import SubType 1a
13class OAuth2GrantError(PolarError): ... 1a
16class OAuth2GrantService(ResourceServiceReader[OAuth2Grant]): 1a
17 def create_or_update_grant( 1a
18 self,
19 session: SyncSession,
20 *,
21 sub_type: SubType,
22 sub_id: uuid.UUID,
23 client_id: str,
24 scope: str,
25 ) -> OAuth2Grant:
26 grant = self._get_by_sub_and_client_id(
27 session, sub_type=sub_type, sub_id=sub_id, client_id=client_id
28 )
29 if grant is None:
30 grant = OAuth2Grant(client_id=client_id, scope=scope)
31 if sub_type == SubType.user:
32 grant.user_id = sub_id
33 elif sub_type == SubType.organization:
34 grant.organization_id = sub_id
35 else:
36 raise NotImplementedError()
37 else:
38 grant.scope = scope
40 session.add(grant)
41 session.flush()
42 return grant
44 def has_granted_scope( 1a
45 self,
46 session: SyncSession,
47 *,
48 sub_type: SubType,
49 sub_id: uuid.UUID,
50 client_id: str,
51 scope: str,
52 ) -> bool:
53 grant = self._get_by_sub_and_client_id(
54 session, sub_type=sub_type, sub_id=sub_id, client_id=client_id
55 )
56 if grant is None:
57 return False
59 scopes = set(scope.strip().split())
60 return scopes.issubset(grant.scopes)
62 def _get_by_sub_and_client_id( 1a
63 self,
64 session: SyncSession,
65 *,
66 sub_type: SubType,
67 sub_id: uuid.UUID,
68 client_id: str,
69 ) -> OAuth2Grant | None:
70 statement = select(OAuth2Grant).where(OAuth2Grant.client_id == client_id)
71 if sub_type == SubType.user:
72 statement = statement.where(OAuth2Grant.user_id == sub_id)
73 elif sub_type == SubType.organization:
74 statement = statement.where(OAuth2Grant.organization_id == sub_id)
75 else:
76 raise NotImplementedError()
77 result = session.execute(statement)
78 return result.unique().scalar_one_or_none()
81oauth2_grant = OAuth2GrantService(OAuth2Grant) 1a