Coverage for polar/oauth2/service/oauth2_grant.py: 29%

38 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import uuid 1a

2 

3from sqlalchemy import select 1a

4 

5from polar.exceptions import PolarError 1a

6from polar.kit.db.postgres import Session as SyncSession 1a

7from polar.kit.services import ResourceServiceReader 1a

8from polar.models import OAuth2Grant 1a

9 

10from ..sub_type import SubType 1a

11 

12 

13class OAuth2GrantError(PolarError): ... 1a

14 

15 

16class OAuth2GrantService(ResourceServiceReader[OAuth2Grant]): 1a

17 def create_or_update_grant( 1a

18 self, 

19 session: SyncSession, 

20 *, 

21 sub_type: SubType, 

22 sub_id: uuid.UUID, 

23 client_id: str, 

24 scope: str, 

25 ) -> OAuth2Grant: 

26 grant = self._get_by_sub_and_client_id( 

27 session, sub_type=sub_type, sub_id=sub_id, client_id=client_id 

28 ) 

29 if grant is None: 

30 grant = OAuth2Grant(client_id=client_id, scope=scope) 

31 if sub_type == SubType.user: 

32 grant.user_id = sub_id 

33 elif sub_type == SubType.organization: 

34 grant.organization_id = sub_id 

35 else: 

36 raise NotImplementedError() 

37 else: 

38 grant.scope = scope 

39 

40 session.add(grant) 

41 session.flush() 

42 return grant 

43 

44 def has_granted_scope( 1a

45 self, 

46 session: SyncSession, 

47 *, 

48 sub_type: SubType, 

49 sub_id: uuid.UUID, 

50 client_id: str, 

51 scope: str, 

52 ) -> bool: 

53 grant = self._get_by_sub_and_client_id( 

54 session, sub_type=sub_type, sub_id=sub_id, client_id=client_id 

55 ) 

56 if grant is None: 

57 return False 

58 

59 scopes = set(scope.strip().split()) 

60 return scopes.issubset(grant.scopes) 

61 

62 def _get_by_sub_and_client_id( 1a

63 self, 

64 session: SyncSession, 

65 *, 

66 sub_type: SubType, 

67 sub_id: uuid.UUID, 

68 client_id: str, 

69 ) -> OAuth2Grant | None: 

70 statement = select(OAuth2Grant).where(OAuth2Grant.client_id == client_id) 

71 if sub_type == SubType.user: 

72 statement = statement.where(OAuth2Grant.user_id == sub_id) 

73 elif sub_type == SubType.organization: 

74 statement = statement.where(OAuth2Grant.organization_id == sub_id) 

75 else: 

76 raise NotImplementedError() 

77 result = session.execute(statement) 

78 return result.unique().scalar_one_or_none() 

79 

80 

81oauth2_grant = OAuth2GrantService(OAuth2Grant) 1a