Coverage for polar/benefit/strategies/meter_credit/service.py: 30%
59 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1import uuid 1a
2from datetime import datetime 1a
3from typing import Any, cast 1a
5from polar.auth.models import AuthSubject 1a
6from polar.customer_meter.service import customer_meter as customer_meter_service 1a
7from polar.event.repository import EventRepository 1a
8from polar.event.service import event as event_service 1a
9from polar.event.system import SystemEvent, build_system_event 1a
10from polar.kit.utils import utc_now 1a
11from polar.locker import Locker 1a
12from polar.meter.repository import MeterRepository 1a
13from polar.models import Benefit, Customer, Organization, User 1a
15from ..base.service import BenefitPropertiesValidationError, BenefitServiceProtocol 1a
16from .properties import BenefitGrantMeterCreditProperties, BenefitMeterCreditProperties 1a
19class BenefitMeterCreditService( 1a
20 BenefitServiceProtocol[
21 BenefitMeterCreditProperties, BenefitGrantMeterCreditProperties
22 ]
23):
24 async def grant( 1a
25 self,
26 benefit: Benefit,
27 customer: Customer,
28 grant_properties: BenefitGrantMeterCreditProperties,
29 *,
30 update: bool = False,
31 attempt: int = 1,
32 ) -> BenefitGrantMeterCreditProperties:
33 properties = self._get_properties(benefit)
34 meter_id = uuid.UUID(properties["meter_id"])
35 return await self._create_event(
36 customer,
37 benefit.organization,
38 meter_id=meter_id,
39 units=properties["units"],
40 rollover=properties["rollover"],
41 )
43 async def cycle( 1a
44 self,
45 benefit: Benefit,
46 customer: Customer,
47 grant_properties: BenefitGrantMeterCreditProperties,
48 *,
49 attempt: int = 1,
50 ) -> BenefitGrantMeterCreditProperties:
51 properties = self._get_properties(benefit)
52 meter_id = uuid.UUID(properties["meter_id"])
54 # Reset the meter on cycle
55 event_repository = EventRepository.from_session(self.session)
56 latest_meter_reset = await event_repository.get_latest_meter_reset(
57 customer, meter_id
58 )
59 last_credited_at = datetime.fromisoformat(grant_properties["last_credited_at"])
60 # Do it only if the meter wasn't reset during the last cycle
61 # It happens because the billing logic trigger a reset after generating the invoice
62 if (
63 latest_meter_reset is None
64 or latest_meter_reset.ingested_at < last_credited_at
65 ):
66 rollover_units = 0
67 if properties["rollover"]:
68 meter_repository = MeterRepository.from_session(self.session)
69 meter = await meter_repository.get_by_id(meter_id)
70 assert meter is not None
71 rollover_units = await customer_meter_service.get_rollover_units(
72 self.session, customer, meter
73 )
75 await event_service.create_event(
76 self.session,
77 build_system_event(
78 SystemEvent.meter_reset,
79 customer=customer,
80 organization=benefit.organization,
81 metadata={
82 "meter_id": str(meter_id),
83 },
84 ),
85 )
87 if rollover_units > 0:
88 await event_service.create_event(
89 self.session,
90 build_system_event(
91 SystemEvent.meter_credited,
92 customer=customer,
93 organization=benefit.organization,
94 metadata={
95 "meter_id": str(meter_id),
96 "units": rollover_units,
97 "rollover": True,
98 },
99 ),
100 )
102 return await self._create_event(
103 customer,
104 benefit.organization,
105 meter_id=meter_id,
106 units=properties["units"],
107 rollover=properties["rollover"],
108 )
110 async def revoke( 1a
111 self,
112 benefit: Benefit,
113 customer: Customer,
114 grant_properties: BenefitGrantMeterCreditProperties,
115 *,
116 attempt: int = 1,
117 ) -> BenefitGrantMeterCreditProperties:
118 properties = self._get_properties(benefit)
120 # Skip if not granted before
121 meter_id = grant_properties.get("last_credited_meter_id")
122 if meter_id is None:
123 return grant_properties
125 units = -grant_properties.get("last_credited_units", properties["units"])
126 return await self._create_event(
127 customer,
128 benefit.organization,
129 meter_id=uuid.UUID(meter_id),
130 units=units,
131 rollover=properties["rollover"],
132 )
134 async def requires_update( 1a
135 self, benefit: Benefit, previous_properties: BenefitMeterCreditProperties
136 ) -> bool:
137 return False
139 async def validate_properties( 1a
140 self, auth_subject: AuthSubject[User | Organization], properties: dict[str, Any]
141 ) -> BenefitMeterCreditProperties:
142 meter_repository = MeterRepository.from_session(self.session)
143 meter = await meter_repository.get_readable_by_id(
144 properties["meter_id"], auth_subject
145 )
146 if meter is None:
147 raise BenefitPropertiesValidationError(
148 [
149 {
150 "type": "value_error",
151 "msg": "This meter does not exist.",
152 "loc": ("meter_id",),
153 "input": properties["meter_id"],
154 }
155 ]
156 )
158 return cast(BenefitMeterCreditProperties, properties)
160 async def _create_event( 1a
161 self,
162 customer: Customer,
163 organization: Organization,
164 *,
165 meter_id: uuid.UUID,
166 units: int,
167 rollover: bool,
168 ) -> BenefitGrantMeterCreditProperties:
169 await event_service.create_event(
170 self.session,
171 build_system_event(
172 SystemEvent.meter_credited,
173 customer=customer,
174 organization=organization,
175 metadata={
176 "meter_id": str(meter_id),
177 "units": units,
178 "rollover": rollover,
179 },
180 ),
181 )
183 meter_repository = MeterRepository.from_session(self.session)
184 meter = await meter_repository.get_by_id(meter_id)
185 if meter is not None:
186 locker = Locker(self.redis)
187 await customer_meter_service.update_customer_meter(
188 self.session, locker, customer, meter
189 )
191 return {
192 "last_credited_meter_id": str(meter_id),
193 "last_credited_units": units,
194 "last_credited_at": utc_now().isoformat(),
195 }