Coverage for polar/customer_portal/service/organization.py: 73%

11 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from sqlalchemy import select 1a

2from sqlalchemy.orm import selectinload 1a

3 

4from polar.kit.services import ResourceServiceReader 1a

5from polar.models import Organization, Product 1a

6from polar.postgres import AsyncSession 1a

7 

8 

9class CustomerOrganizationService(ResourceServiceReader[Organization]): 1a

10 async def get_by_slug( 1a

11 self, session: AsyncSession, slug: str 

12 ) -> Organization | None: 

13 statement = ( 

14 select(Organization) 

15 .where( 

16 Organization.deleted_at.is_(None), 

17 Organization.blocked_at.is_(None), 

18 Organization.slug == slug, 

19 ) 

20 .options( 

21 selectinload(Organization.products).options( 

22 selectinload(Product.product_medias), 

23 ) 

24 ) 

25 ) 

26 result = await session.execute(statement) 

27 return result.unique().scalar_one_or_none() 

28 

29 

30customer_organization = CustomerOrganizationService(Organization) 1a