Coverage for polar/notification_recipient/service.py: 48%

25 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from collections.abc import Sequence 1a

2from uuid import UUID 1a

3 

4from polar.auth.models import AuthSubject 1a

5from polar.exceptions import PolarRequestValidationError, ValidationError 1a

6from polar.models.notification_recipient import NotificationRecipient 1a

7from polar.models.user import User 1a

8from polar.postgres import AsyncSession 1a

9 

10from .repository import NotificationRecipientRepository 1a

11from .schemas import ( 1a

12 NotificationRecipientCreate, 

13 NotificationRecipientPlatform, 

14) 

15 

16 

17class NotificationRecipientService: 1a

18 async def list_by_user( 1a

19 self, 

20 session: AsyncSession, 

21 user_id: UUID, 

22 expo_push_token: str | None, 

23 platform: NotificationRecipientPlatform | None, 

24 ) -> Sequence[NotificationRecipient]: 

25 repository = NotificationRecipientRepository.from_session(session) 

26 return await repository.list_by_user( 

27 user_id, expo_push_token=expo_push_token, platform=platform 

28 ) 

29 

30 async def create( 1a

31 self, 

32 session: AsyncSession, 

33 notification_recipient_create: NotificationRecipientCreate, 

34 auth_subject: AuthSubject[User], 

35 ) -> NotificationRecipient: 

36 repository = NotificationRecipientRepository.from_session(session) 

37 

38 errors: list[ValidationError] = [] 

39 

40 if await repository.get_by_expo_token( 

41 notification_recipient_create.expo_push_token 

42 ): 

43 errors.append( 

44 { 

45 "type": "value_error", 

46 "loc": ("body", "expo_push_token"), 

47 "msg": "A notification recipient with this Expo push token already exists.", 

48 "input": notification_recipient_create.expo_push_token, 

49 } 

50 ) 

51 

52 if errors: 

53 raise PolarRequestValidationError(errors) 

54 

55 return await repository.create( 

56 NotificationRecipient( 

57 user_id=auth_subject.subject.id, 

58 platform=notification_recipient_create.platform, 

59 expo_push_token=notification_recipient_create.expo_push_token, 

60 ), 

61 flush=True, 

62 ) 

63 

64 async def delete( 1a

65 self, session: AsyncSession, auth_subject: AuthSubject[User], id: UUID 

66 ) -> None: 

67 repository = NotificationRecipientRepository.from_session(session) 

68 await repository.delete(id, auth_subject.subject.id) 

69 

70 

71notification_recipient = NotificationRecipientService() 1a