Coverage for polar/campaign/service.py: 36%
18 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1from datetime import UTC, datetime 1a
3from sqlalchemy import select 1a
5from polar.kit.services import ResourceServiceReader 1a
6from polar.models import Campaign, User 1a
7from polar.postgres import AsyncSession 1a
10class CampaignService(ResourceServiceReader[Campaign]): 1a
11 async def get_eligible(self, session: AsyncSession, user: User) -> Campaign | None: 1a
12 code = user.campaign_code
13 if not code:
14 return None
16 now = datetime.now(UTC)
17 stmt = select(Campaign).where(
18 Campaign.code == code,
19 Campaign.starts_at <= now,
20 Campaign.ends_at > now,
21 Campaign.deleted_at.is_(None),
22 )
23 res = await session.execute(stmt)
24 campaign = res.scalars().unique().one_or_none()
25 if not campaign:
26 return None
28 # TODO: Check account threshold & user limits
29 return campaign
32campaign = CampaignService(Campaign) 1a