Coverage for polar/storefront/service.py: 50%

24 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from collections.abc import Sequence 1a

2 

3from sqlalchemy import select 1a

4from sqlalchemy.orm import selectinload 1a

5 

6from polar.kit.pagination import PaginationParams, paginate 1a

7from polar.models import Customer, Order, Organization, Product, Subscription 1a

8from polar.postgres import AsyncSession 1a

9 

10 

11class StorefrontService: 1a

12 async def get(self, session: AsyncSession, slug: str) -> Organization | None: 1a

13 statement = ( 

14 select(Organization) 

15 .where( 

16 Organization.deleted_at.is_(None), 

17 Organization.blocked_at.is_(None), 

18 Organization.slug == slug, 

19 Organization.storefront_enabled.is_(True), 

20 ) 

21 .options( 

22 selectinload(Organization.products).options( 

23 selectinload(Product.product_medias) 

24 ) 

25 ) 

26 ) 

27 result = await session.execute(statement) 

28 return result.unique().scalar_one_or_none() 

29 

30 async def get_organization_slug_by_product_id( 1a

31 self, session: AsyncSession, product_id: str 

32 ) -> str | None: 

33 """Get organization slug by product ID for legacy redirect purposes.""" 

34 statement = ( 

35 select(Organization.slug) 

36 .join(Product, Product.organization_id == Organization.id) 

37 .where( 

38 Product.id == product_id, 

39 Product.deleted_at.is_(None), 

40 Organization.deleted_at.is_(None), 

41 Organization.blocked_at.is_(None), 

42 ) 

43 ) 

44 result = await session.execute(statement) 

45 return result.scalar_one_or_none() 

46 

47 async def get_organization_slug_by_subscription_id( 1a

48 self, session: AsyncSession, subscription_id: str 

49 ) -> str | None: 

50 """Get organization slug by subscription ID for legacy redirect purposes.""" 

51 statement = ( 

52 select(Organization.slug) 

53 .join(Product, Product.organization_id == Organization.id) 

54 .join(Subscription, Subscription.product_id == Product.id) 

55 .where( 

56 Subscription.id == subscription_id, 

57 Subscription.deleted_at.is_(None), 

58 Product.deleted_at.is_(None), 

59 Organization.deleted_at.is_(None), 

60 Organization.blocked_at.is_(None), 

61 ) 

62 ) 

63 result = await session.execute(statement) 

64 return result.scalar_one_or_none() 

65 

66 async def list_customers( 1a

67 self, 

68 session: AsyncSession, 

69 organization: Organization, 

70 *, 

71 pagination: PaginationParams, 

72 ) -> tuple[Sequence[Customer], int]: 

73 statement = select(Customer).where( 

74 Customer.id.in_( 

75 select(Order.customer_id) 

76 .join(Product, Product.id == Order.product_id, isouter=True) 

77 .where( 

78 Order.deleted_at.is_(None), 

79 Product.organization_id == organization.id, 

80 ) 

81 ) 

82 ) 

83 results, count = await paginate(session, statement, pagination=pagination) 

84 return results, count 

85 

86 

87storefront = StorefrontService() 1a