Coverage for polar/notification_recipient/service.py: 48%
25 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1from collections.abc import Sequence 1a
2from uuid import UUID 1a
4from polar.auth.models import AuthSubject 1a
5from polar.exceptions import PolarRequestValidationError, ValidationError 1a
6from polar.models.notification_recipient import NotificationRecipient 1a
7from polar.models.user import User 1a
8from polar.postgres import AsyncSession 1a
10from .repository import NotificationRecipientRepository 1a
11from .schemas import ( 1a
12 NotificationRecipientCreate,
13 NotificationRecipientPlatform,
14)
17class NotificationRecipientService: 1a
18 async def list_by_user( 1a
19 self,
20 session: AsyncSession,
21 user_id: UUID,
22 expo_push_token: str | None,
23 platform: NotificationRecipientPlatform | None,
24 ) -> Sequence[NotificationRecipient]:
25 repository = NotificationRecipientRepository.from_session(session)
26 return await repository.list_by_user(
27 user_id, expo_push_token=expo_push_token, platform=platform
28 )
30 async def create( 1a
31 self,
32 session: AsyncSession,
33 notification_recipient_create: NotificationRecipientCreate,
34 auth_subject: AuthSubject[User],
35 ) -> NotificationRecipient:
36 repository = NotificationRecipientRepository.from_session(session)
38 errors: list[ValidationError] = []
40 if await repository.get_by_expo_token(
41 notification_recipient_create.expo_push_token
42 ):
43 errors.append(
44 {
45 "type": "value_error",
46 "loc": ("body", "expo_push_token"),
47 "msg": "A notification recipient with this Expo push token already exists.",
48 "input": notification_recipient_create.expo_push_token,
49 }
50 )
52 if errors:
53 raise PolarRequestValidationError(errors)
55 return await repository.create(
56 NotificationRecipient(
57 user_id=auth_subject.subject.id,
58 platform=notification_recipient_create.platform,
59 expo_push_token=notification_recipient_create.expo_push_token,
60 ),
61 flush=True,
62 )
64 async def delete( 1a
65 self, session: AsyncSession, auth_subject: AuthSubject[User], id: UUID
66 ) -> None:
67 repository = NotificationRecipientRepository.from_session(session)
68 await repository.delete(id, auth_subject.subject.id)
71notification_recipient = NotificationRecipientService() 1a