Coverage for polar/notifications/endpoints.py: 71%

35 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from typing import Annotated 1a

2 

3from fastapi import Depends, Path, Query 1a

4from pydantic import UUID4 1a

5 

6from polar.kit.pagination import ListResource, PaginationParamsQuery 1a

7from polar.notification_recipient import ( 1a

8 auth as notification_recipient_auth, 

9) 

10from polar.notification_recipient.schemas import ( 1a

11 NotificationRecipientCreate, 

12 NotificationRecipientPlatform, 

13 NotificationRecipientSchema, 

14) 

15from polar.notification_recipient.service import ( 1a

16 notification_recipient as notification_recipient_service, 

17) 

18from polar.notifications import ( 1a

19 auth as notifications_auth, 

20) 

21from polar.openapi import APITag 1a

22from polar.postgres import AsyncSession, get_db_session 1a

23from polar.routing import APIRouter 1a

24 

25from .schemas import NotificationsList, NotificationsMarkRead 1a

26from .service import notifications 1a

27 

28NotificationRecipientID = Annotated[ 1a

29 UUID4, Path(description="The notification recipient ID.") 

30] 

31 

32router = APIRouter(tags=["notifications", APITag.private]) 1a

33 

34 

35@router.get("/notifications", response_model=NotificationsList) 1a

36async def get( 1a

37 auth_subject: notifications_auth.NotificationsRead, 

38 session: AsyncSession = Depends(get_db_session), 

39) -> NotificationsList: 

40 notifs = await notifications.get_for_user(session, auth_subject.subject.id) 

41 last_read_notification_id = await notifications.get_user_last_read( 

42 session, auth_subject.subject.id 

43 ) 

44 

45 return NotificationsList( 

46 notifications=notifs, # type: ignore 

47 last_read_notification_id=last_read_notification_id, 

48 ) 

49 

50 

51@router.post("/notifications/read") 1a

52async def mark_read( 1a

53 read: NotificationsMarkRead, 

54 auth_subject: notifications_auth.NotificationsWrite, 

55 session: AsyncSession = Depends(get_db_session), 

56) -> None: 

57 await notifications.set_user_last_read( 

58 session, auth_subject.subject.id, read.notification_id 

59 ) 

60 return None 

61 

62 

63@router.post( 1a

64 "/notifications/recipients", 

65 response_model=NotificationRecipientSchema, 

66 status_code=201, 

67 summary="Subscribes a device to notifications", 

68 responses={201: {"description": "Device subscribed to notifications."}}, 

69) 

70async def create( 1a

71 notification_recipient_create: NotificationRecipientCreate, 

72 auth_subject: notification_recipient_auth.NotificationRecipientWrite, 

73 session: AsyncSession = Depends(get_db_session), 

74) -> NotificationRecipientSchema: 

75 """Create a notification recipient.""" 

76 notification_recipient = await notification_recipient_service.create( 

77 session, notification_recipient_create, auth_subject 

78 ) 

79 return NotificationRecipientSchema.model_validate(notification_recipient) 

80 

81 

82@router.get( 1a

83 "/notifications/recipients", 

84 response_model=ListResource[NotificationRecipientSchema], 

85 status_code=200, 

86 summary="Lists all notification recipients subscribed to notifications", 

87) 

88async def list( 1a

89 auth_subject: notification_recipient_auth.NotificationRecipientRead, 

90 pagination: PaginationParamsQuery, 

91 session: AsyncSession = Depends(get_db_session), 

92 expo_push_token: str | None = Query(None, description="Filter by Expo push token."), 

93 platform: NotificationRecipientPlatform | None = Query( 

94 None, description="Filter by platform." 

95 ), 

96) -> ListResource[NotificationRecipientSchema]: 

97 """List all devices subscribed to notifications.""" 

98 notification_recipients = await notification_recipient_service.list_by_user( 

99 session, auth_subject.subject.id, expo_push_token, platform 

100 ) 

101 

102 return ListResource.from_paginated_results( 

103 [ 

104 NotificationRecipientSchema.model_validate(result) 

105 for result in notification_recipients 

106 ], 

107 len(notification_recipients), 

108 pagination, 

109 ) 

110 

111 

112@router.delete( 1a

113 "/notifications/recipients/{id}", 

114 status_code=204, 

115 responses={ 

116 204: {"description": "Notification recipient unsubscribed from notifications."}, 

117 404: {"description": "Notification recipient not found."}, 

118 }, 

119) 

120async def delete( 1a

121 id: NotificationRecipientID, 

122 auth_subject: notification_recipient_auth.NotificationRecipientWrite, 

123 session: AsyncSession = Depends(get_db_session), 

124) -> None: 

125 """Delete a notification recipient.""" 

126 await notification_recipient_service.delete(session, auth_subject, id)