Coverage for polar/customer_portal/service/organization.py: 73%
11 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1from sqlalchemy import select 1a
2from sqlalchemy.orm import selectinload 1a
4from polar.kit.services import ResourceServiceReader 1a
5from polar.models import Organization, Product 1a
6from polar.postgres import AsyncSession 1a
9class CustomerOrganizationService(ResourceServiceReader[Organization]): 1a
10 async def get_by_slug( 1a
11 self, session: AsyncSession, slug: str
12 ) -> Organization | None:
13 statement = (
14 select(Organization)
15 .where(
16 Organization.deleted_at.is_(None),
17 Organization.blocked_at.is_(None),
18 Organization.slug == slug,
19 )
20 .options(
21 selectinload(Organization.products).options(
22 selectinload(Product.product_medias),
23 )
24 )
25 )
26 result = await session.execute(statement)
27 return result.unique().scalar_one_or_none()
30customer_organization = CustomerOrganizationService(Organization) 1a