Coverage for polar/benefit/service.py: 33%

80 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import uuid 1a

2from collections.abc import Sequence 1a

3 

4from pydantic import BaseModel 1a

5from sqlalchemy import delete 1a

6 

7from polar.auth.models import AuthSubject 1a

8from polar.exceptions import NotPermitted, PolarError, PolarRequestValidationError 1a

9from polar.kit.db.postgres import AsyncSession 1a

10from polar.kit.metadata import MetadataQuery, apply_metadata_clause 1a

11from polar.kit.pagination import PaginationParams 1a

12from polar.kit.sorting import Sorting 1a

13from polar.models import Benefit, Organization, ProductBenefit, User 1a

14from polar.models.benefit import BenefitType 1a

15from polar.models.webhook_endpoint import WebhookEventType 1a

16from polar.organization.resolver import get_payload_organization 1a

17from polar.redis import Redis 1a

18from polar.webhook.service import webhook as webhook_service 1a

19from polar.worker import enqueue_job 1a

20 

21from .grant.service import benefit_grant as benefit_grant_service 1a

22from .registry import get_benefit_strategy 1a

23from .repository import BenefitRepository 1a

24from .schemas import BenefitCreate, BenefitUpdate 1a

25from .sorting import BenefitSortProperty 1a

26 

27 

28class BenefitError(PolarError): ... 1a

29 

30 

31class BenefitService: 1a

32 async def list( 1a

33 self, 

34 session: AsyncSession, 

35 auth_subject: AuthSubject[User | Organization], 

36 *, 

37 type: Sequence[BenefitType] | None = None, 

38 organization_id: Sequence[uuid.UUID] | None = None, 

39 metadata: MetadataQuery | None = None, 

40 pagination: PaginationParams, 

41 sorting: list[Sorting[BenefitSortProperty]] = [ 

42 (BenefitSortProperty.created_at, True) 

43 ], 

44 query: str | None = None, 

45 ) -> tuple[Sequence[Benefit], int]: 

46 repository = BenefitRepository.from_session(session) 

47 statement = repository.get_readable_statement(auth_subject) 

48 

49 if type is not None: 

50 statement = statement.where(Benefit.type.in_(type)) 

51 

52 if organization_id is not None: 

53 statement = statement.where(Benefit.organization_id.in_(organization_id)) 

54 

55 if query is not None: 

56 statement = statement.where(Benefit.description.ilike(f"%{query}%")) 

57 

58 if metadata is not None: 

59 statement = apply_metadata_clause(Benefit, statement, metadata) 

60 

61 statement = repository.apply_sorting(statement, sorting) 

62 

63 return await repository.paginate( 

64 statement, limit=pagination.limit, page=pagination.page 

65 ) 

66 

67 async def get( 1a

68 self, 

69 session: AsyncSession, 

70 auth_subject: AuthSubject[User | Organization], 

71 id: uuid.UUID, 

72 ) -> Benefit | None: 

73 repository = BenefitRepository.from_session(session) 

74 statement = ( 

75 repository.get_readable_statement(auth_subject) 

76 .where(Benefit.id == id) 

77 .options(*repository.get_eager_options()) 

78 ) 

79 return await repository.get_one_or_none(statement) 

80 

81 async def user_create( 1a

82 self, 

83 session: AsyncSession, 

84 redis: Redis, 

85 create_schema: BenefitCreate, 

86 auth_subject: AuthSubject[User | Organization], 

87 ) -> Benefit: 

88 organization = await get_payload_organization( 

89 session, auth_subject, create_schema 

90 ) 

91 

92 try: 

93 is_tax_applicable = getattr(create_schema, "is_tax_applicable") 

94 except AttributeError: 

95 is_tax_applicable = create_schema.type.is_tax_applicable() 

96 

97 benefit_strategy = get_benefit_strategy(create_schema.type, session, redis) 

98 properties = await benefit_strategy.validate_properties( 

99 auth_subject, 

100 create_schema.properties.model_dump(mode="json", by_alias=True), 

101 ) 

102 

103 benefit = Benefit( 

104 organization=organization, 

105 is_tax_applicable=is_tax_applicable, 

106 properties=properties, 

107 **create_schema.model_dump( 

108 by_alias=True, 

109 exclude={ 

110 "organization_id", 

111 "is_tax_applicable", 

112 "properties", 

113 }, 

114 ), 

115 ) 

116 session.add(benefit) 

117 await session.flush() 

118 

119 await webhook_service.send( 

120 session, organization, WebhookEventType.benefit_created, benefit 

121 ) 

122 

123 return benefit 

124 

125 async def update( 1a

126 self, 

127 session: AsyncSession, 

128 redis: Redis, 

129 benefit: Benefit, 

130 benefit_update: BenefitUpdate, 

131 auth_subject: AuthSubject[User | Organization], 

132 ) -> Benefit: 

133 if benefit_update.type != benefit.type: 

134 raise PolarRequestValidationError( 

135 [ 

136 { 

137 "type": "value_error", 

138 "loc": ("body", "type"), 

139 "msg": "Benefit type cannot be changed.", 

140 "input": benefit.type, 

141 } 

142 ] 

143 ) 

144 

145 update_dict = benefit_update.model_dump( 

146 by_alias=True, exclude_unset=True, exclude={"type", "properties"} 

147 ) 

148 

149 properties_update: BaseModel | None = getattr( 

150 benefit_update, "properties", None 

151 ) 

152 if properties_update is not None: 

153 benefit_strategy = get_benefit_strategy(benefit.type, session, redis) 

154 update_dict["properties"] = await benefit_strategy.validate_properties( 

155 auth_subject, 

156 properties_update.model_dump(mode="json", by_alias=True), 

157 ) 

158 

159 previous_properties = benefit.properties 

160 

161 for key, value in update_dict.items(): 

162 setattr(benefit, key, value) 

163 session.add(benefit) 

164 

165 await benefit_grant_service.enqueue_benefit_grant_updates( 

166 session, redis, benefit, previous_properties 

167 ) 

168 

169 await webhook_service.send( 

170 session, benefit.organization, WebhookEventType.benefit_updated, benefit 

171 ) 

172 

173 return benefit 

174 

175 async def delete(self, session: AsyncSession, benefit: Benefit) -> Benefit: 1a

176 if not benefit.deletable: 

177 raise NotPermitted() 

178 

179 repository = BenefitRepository.from_session(session) 

180 await repository.soft_delete(benefit) 

181 statement = delete(ProductBenefit).where( 

182 ProductBenefit.benefit_id == benefit.id 

183 ) 

184 await session.execute(statement) 

185 

186 enqueue_job("benefit.delete", benefit_id=benefit.id) 

187 

188 await webhook_service.send( 

189 session, benefit.organization, WebhookEventType.benefit_updated, benefit 

190 ) 

191 

192 return benefit 

193 

194 

195benefit = BenefitService() 1a