Coverage for polar/benefit/service.py: 33%
80 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1import uuid 1a
2from collections.abc import Sequence 1a
4from pydantic import BaseModel 1a
5from sqlalchemy import delete 1a
7from polar.auth.models import AuthSubject 1a
8from polar.exceptions import NotPermitted, PolarError, PolarRequestValidationError 1a
9from polar.kit.db.postgres import AsyncSession 1a
10from polar.kit.metadata import MetadataQuery, apply_metadata_clause 1a
11from polar.kit.pagination import PaginationParams 1a
12from polar.kit.sorting import Sorting 1a
13from polar.models import Benefit, Organization, ProductBenefit, User 1a
14from polar.models.benefit import BenefitType 1a
15from polar.models.webhook_endpoint import WebhookEventType 1a
16from polar.organization.resolver import get_payload_organization 1a
17from polar.redis import Redis 1a
18from polar.webhook.service import webhook as webhook_service 1a
19from polar.worker import enqueue_job 1a
21from .grant.service import benefit_grant as benefit_grant_service 1a
22from .registry import get_benefit_strategy 1a
23from .repository import BenefitRepository 1a
24from .schemas import BenefitCreate, BenefitUpdate 1a
25from .sorting import BenefitSortProperty 1a
28class BenefitError(PolarError): ... 1a
31class BenefitService: 1a
32 async def list( 1a
33 self,
34 session: AsyncSession,
35 auth_subject: AuthSubject[User | Organization],
36 *,
37 type: Sequence[BenefitType] | None = None,
38 organization_id: Sequence[uuid.UUID] | None = None,
39 metadata: MetadataQuery | None = None,
40 pagination: PaginationParams,
41 sorting: list[Sorting[BenefitSortProperty]] = [
42 (BenefitSortProperty.created_at, True)
43 ],
44 query: str | None = None,
45 ) -> tuple[Sequence[Benefit], int]:
46 repository = BenefitRepository.from_session(session)
47 statement = repository.get_readable_statement(auth_subject)
49 if type is not None:
50 statement = statement.where(Benefit.type.in_(type))
52 if organization_id is not None:
53 statement = statement.where(Benefit.organization_id.in_(organization_id))
55 if query is not None:
56 statement = statement.where(Benefit.description.ilike(f"%{query}%"))
58 if metadata is not None:
59 statement = apply_metadata_clause(Benefit, statement, metadata)
61 statement = repository.apply_sorting(statement, sorting)
63 return await repository.paginate(
64 statement, limit=pagination.limit, page=pagination.page
65 )
67 async def get( 1a
68 self,
69 session: AsyncSession,
70 auth_subject: AuthSubject[User | Organization],
71 id: uuid.UUID,
72 ) -> Benefit | None:
73 repository = BenefitRepository.from_session(session)
74 statement = (
75 repository.get_readable_statement(auth_subject)
76 .where(Benefit.id == id)
77 .options(*repository.get_eager_options())
78 )
79 return await repository.get_one_or_none(statement)
81 async def user_create( 1a
82 self,
83 session: AsyncSession,
84 redis: Redis,
85 create_schema: BenefitCreate,
86 auth_subject: AuthSubject[User | Organization],
87 ) -> Benefit:
88 organization = await get_payload_organization(
89 session, auth_subject, create_schema
90 )
92 try:
93 is_tax_applicable = getattr(create_schema, "is_tax_applicable")
94 except AttributeError:
95 is_tax_applicable = create_schema.type.is_tax_applicable()
97 benefit_strategy = get_benefit_strategy(create_schema.type, session, redis)
98 properties = await benefit_strategy.validate_properties(
99 auth_subject,
100 create_schema.properties.model_dump(mode="json", by_alias=True),
101 )
103 benefit = Benefit(
104 organization=organization,
105 is_tax_applicable=is_tax_applicable,
106 properties=properties,
107 **create_schema.model_dump(
108 by_alias=True,
109 exclude={
110 "organization_id",
111 "is_tax_applicable",
112 "properties",
113 },
114 ),
115 )
116 session.add(benefit)
117 await session.flush()
119 await webhook_service.send(
120 session, organization, WebhookEventType.benefit_created, benefit
121 )
123 return benefit
125 async def update( 1a
126 self,
127 session: AsyncSession,
128 redis: Redis,
129 benefit: Benefit,
130 benefit_update: BenefitUpdate,
131 auth_subject: AuthSubject[User | Organization],
132 ) -> Benefit:
133 if benefit_update.type != benefit.type:
134 raise PolarRequestValidationError(
135 [
136 {
137 "type": "value_error",
138 "loc": ("body", "type"),
139 "msg": "Benefit type cannot be changed.",
140 "input": benefit.type,
141 }
142 ]
143 )
145 update_dict = benefit_update.model_dump(
146 by_alias=True, exclude_unset=True, exclude={"type", "properties"}
147 )
149 properties_update: BaseModel | None = getattr(
150 benefit_update, "properties", None
151 )
152 if properties_update is not None:
153 benefit_strategy = get_benefit_strategy(benefit.type, session, redis)
154 update_dict["properties"] = await benefit_strategy.validate_properties(
155 auth_subject,
156 properties_update.model_dump(mode="json", by_alias=True),
157 )
159 previous_properties = benefit.properties
161 for key, value in update_dict.items():
162 setattr(benefit, key, value)
163 session.add(benefit)
165 await benefit_grant_service.enqueue_benefit_grant_updates(
166 session, redis, benefit, previous_properties
167 )
169 await webhook_service.send(
170 session, benefit.organization, WebhookEventType.benefit_updated, benefit
171 )
173 return benefit
175 async def delete(self, session: AsyncSession, benefit: Benefit) -> Benefit: 1a
176 if not benefit.deletable:
177 raise NotPermitted()
179 repository = BenefitRepository.from_session(session)
180 await repository.soft_delete(benefit)
181 statement = delete(ProductBenefit).where(
182 ProductBenefit.benefit_id == benefit.id
183 )
184 await session.execute(statement)
186 enqueue_job("benefit.delete", benefit_id=benefit.id)
188 await webhook_service.send(
189 session, benefit.organization, WebhookEventType.benefit_updated, benefit
190 )
192 return benefit
195benefit = BenefitService() 1a