Coverage for polar/customer_portal/service/downloadables.py: 34%

90 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from collections.abc import Sequence 1a

2from datetime import timedelta 1a

3from uuid import UUID 1a

4 

5import structlog 1a

6from itsdangerous import BadSignature, SignatureExpired, URLSafeTimedSerializer 1a

7from sqlalchemy.orm import contains_eager, joinedload 1a

8 

9from polar.auth.models import AuthSubject 1a

10from polar.config import settings 1a

11from polar.exceptions import ( 1a

12 BadRequest, 

13 ResourceNotFound, 

14 ResourceUnavailable, 

15) 

16from polar.file.repository import FileRepository 1a

17from polar.file.schemas import FileDownload 1a

18from polar.file.service import file as file_service 1a

19from polar.kit.pagination import PaginationParams, paginate 1a

20from polar.kit.services import ResourceService 1a

21from polar.kit.utils import utc_now 1a

22from polar.models import Benefit, Customer 1a

23from polar.models.downloadable import Downloadable, DownloadableStatus 1a

24from polar.models.file import File 1a

25from polar.postgres import AsyncSession, sql 1a

26 

27from ..schemas.downloadables import ( 1a

28 DownloadableCreate, 

29 DownloadableRead, 

30 DownloadableUpdate, 

31 DownloadableURL, 

32) 

33 

34log = structlog.get_logger() 1a

35 

36token_serializer = URLSafeTimedSerializer( 1a

37 settings.S3_FILES_DOWNLOAD_SECRET, settings.S3_FILES_DOWNLOAD_SALT 

38) 

39 

40 

41class DownloadableService( 1a

42 ResourceService[Downloadable, DownloadableCreate, DownloadableUpdate] 

43): 

44 async def get_list( 1a

45 self, 

46 session: AsyncSession, 

47 auth_subject: AuthSubject[Customer], 

48 *, 

49 pagination: PaginationParams, 

50 benefit_id: Sequence[UUID] | None = None, 

51 ) -> tuple[Sequence[Downloadable], int]: 

52 statement = self._get_base_query(auth_subject) 

53 

54 if benefit_id: 

55 statement = statement.where(Downloadable.benefit_id.in_(benefit_id)) 

56 

57 return await paginate(session, statement, pagination=pagination) 

58 

59 async def grant_for_benefit_file( 1a

60 self, 

61 session: AsyncSession, 

62 customer: Customer, 

63 benefit_id: UUID, 

64 file_id: UUID, 

65 ) -> Downloadable | None: 

66 file_repository = FileRepository.from_session(session) 

67 file = await file_repository.get_by_id(file_id) 

68 if not file: 

69 log.info( 

70 "downloadables.grant.file_not_found", 

71 file_id=file_id, 

72 customer_id=customer.id, 

73 benefit_id=benefit_id, 

74 granted=False, 

75 ) 

76 return None 

77 

78 create_schema = DownloadableCreate( 

79 file_id=file.id, 

80 customer_id=customer.id, 

81 benefit_id=benefit_id, 

82 status=DownloadableStatus.granted, 

83 ) 

84 records = await self.upsert_many( 

85 session, 

86 create_schemas=[create_schema], 

87 constraints=[ 

88 Downloadable.file_id, 

89 Downloadable.customer_id, 

90 Downloadable.benefit_id, 

91 ], 

92 mutable_keys={ 

93 "status", 

94 }, 

95 autocommit=False, 

96 ) 

97 await session.flush() 

98 instance = records[0] 

99 assert instance.id is not None 

100 

101 log.info( 

102 "downloadables.grant", 

103 file_id=file.id, 

104 customer_id=customer.id, 

105 downloadables_id=instance.id, 

106 benefit_id=benefit_id, 

107 granted=True, 

108 ) 

109 return instance 

110 

111 async def revoke_for_benefit( 1a

112 self, 

113 session: AsyncSession, 

114 customer: Customer, 

115 benefit_id: UUID, 

116 ) -> None: 

117 statement = ( 

118 sql.update(Downloadable) 

119 .where( 

120 Downloadable.customer_id == customer.id, 

121 Downloadable.benefit_id == benefit_id, 

122 Downloadable.status == DownloadableStatus.granted, 

123 Downloadable.deleted_at.is_(None), 

124 ) 

125 .values( 

126 status=DownloadableStatus.revoked, 

127 modified_at=utc_now(), 

128 ) 

129 ) 

130 log.info( 

131 "downloadables.revoked", 

132 customer_id=customer.id, 

133 benefit_id=benefit_id, 

134 ) 

135 await session.execute(statement) 

136 

137 async def increment_download_count( 1a

138 self, 

139 session: AsyncSession, 

140 downloadable: Downloadable, 

141 ) -> Downloadable: 

142 downloadable.downloaded += 1 

143 downloadable.last_downloaded_at = utc_now() 

144 session.add(downloadable) 

145 await session.flush() 

146 return downloadable 

147 

148 def generate_downloadable_schemas( 1a

149 self, downloadables: Sequence[Downloadable] 

150 ) -> list[DownloadableRead]: 

151 items = [] 

152 for downloadable in downloadables: 

153 item = self.generate_downloadable_schema(downloadable) 

154 items.append(item) 

155 return items 

156 

157 def generate_downloadable_schema( 1a

158 self, downloadable: Downloadable 

159 ) -> DownloadableRead: 

160 token = self.create_download_token(downloadable) 

161 file_download = FileDownload.from_presigned( 

162 downloadable.file, 

163 url=token.url, 

164 expires_at=token.expires_at, 

165 ) 

166 return DownloadableRead( 

167 id=downloadable.id, 

168 benefit_id=downloadable.benefit_id, 

169 file=file_download, 

170 ) 

171 

172 def create_download_token(self, downloadable: Downloadable) -> DownloadableURL: 1a

173 expires_at = utc_now() + timedelta(seconds=settings.S3_FILES_PRESIGN_TTL) 

174 

175 last_downloaded_at = 0.0 

176 if downloadable.last_downloaded_at: 

177 last_downloaded_at = downloadable.last_downloaded_at.timestamp() 

178 

179 token = token_serializer.dumps( 

180 dict( 

181 id=str(downloadable.id), 

182 # Not used initially, but good for future rate limiting 

183 downloaded=downloadable.downloaded, 

184 last_downloaded_at=last_downloaded_at, 

185 ) 

186 ) 

187 redirect_to = f"{settings.BASE_URL}/v1/customer-portal/downloadables/{token}" 

188 return DownloadableURL(url=redirect_to, expires_at=expires_at) 

189 

190 async def get_from_token_or_raise( 1a

191 self, session: AsyncSession, token: str 

192 ) -> Downloadable: 

193 try: 

194 unpacked = token_serializer.loads( 

195 token, max_age=settings.S3_FILES_PRESIGN_TTL 

196 ) 

197 id = UUID(unpacked["id"]) 

198 except SignatureExpired: 

199 raise ResourceUnavailable() 

200 except BadSignature: 

201 raise ResourceNotFound() 

202 except KeyError: 

203 raise BadRequest() 

204 

205 downloadable = await self.get( 

206 session, id, options=(joinedload(Downloadable.file),) 

207 ) 

208 if not downloadable: 

209 raise ResourceNotFound() 

210 

211 await self.increment_download_count(session, downloadable) 

212 return downloadable 

213 

214 def generate_download_schema(self, downloadable: Downloadable) -> DownloadableRead: 1a

215 file_schema = file_service.generate_downloadable_schema(downloadable.file) 

216 return DownloadableRead( 

217 id=downloadable.id, 

218 benefit_id=downloadable.benefit_id, 

219 file=file_schema, 

220 ) 

221 

222 def _get_base_query( 1a

223 self, auth_subject: AuthSubject[Customer] 

224 ) -> sql.Select[tuple[Downloadable]]: 

225 return ( 

226 sql.select(Downloadable) 

227 .join(File) 

228 .join(Benefit) 

229 .options(contains_eager(Downloadable.file)) 

230 .where( 

231 Downloadable.status == DownloadableStatus.granted, 

232 Downloadable.deleted_at.is_(None), 

233 File.deleted_at.is_(None), 

234 File.is_uploaded == True, # noqa 

235 File.is_enabled == True, # noqa 

236 Benefit.deleted_at.is_(None), 

237 Downloadable.customer_id == auth_subject.subject.id, 

238 ) 

239 .order_by(Downloadable.created_at.desc()) 

240 ) 

241 

242 

243downloadable = DownloadableService(Downloadable) 1a