Coverage for polar/event_type/repository.py: 35%

31 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from datetime import datetime 1a

2from uuid import UUID 1a

3 

4from sqlalchemy import Select, func, select 1a

5 

6from polar.auth.models import AuthSubject, Organization, User, is_organization, is_user 1a

7from polar.kit.repository import RepositoryBase, RepositoryIDMixin 1a

8from polar.models import Event, EventType, UserOrganization 1a

9from polar.models.event import EventSource 1a

10 

11 

12class EventTypeRepository( 1a

13 RepositoryBase[EventType], RepositoryIDMixin[EventType, UUID] 

14): 

15 model = EventType 1a

16 

17 def get_readable_statement( 1a

18 self, auth_subject: AuthSubject[User | Organization] 

19 ) -> Select[tuple[EventType]]: 

20 statement = self.get_base_statement() 

21 

22 if is_user(auth_subject): 

23 user = auth_subject.subject 

24 statement = statement.where( 

25 EventType.organization_id.in_( 

26 select(UserOrganization.organization_id).where( 

27 UserOrganization.user_id == user.id, 

28 UserOrganization.deleted_at.is_(None), 

29 ) 

30 ) 

31 ) 

32 elif is_organization(auth_subject): 

33 statement = statement.where( 

34 EventType.organization_id == auth_subject.subject.id 

35 ) 

36 

37 return statement 

38 

39 def get_event_types_with_stats_statement( 1a

40 self, auth_subject: AuthSubject[User | Organization] 

41 ) -> Select[tuple[EventType, EventSource, int, datetime, datetime]]: 

42 return ( 

43 self.get_readable_statement(auth_subject) 

44 .join(Event, EventType.id == Event.event_type_id) 

45 .with_only_columns( 

46 EventType, 

47 Event.source, 

48 func.count(Event.id).label("occurrences"), 

49 func.min(Event.timestamp).label("first_seen"), 

50 func.max(Event.timestamp).label("last_seen"), 

51 ) 

52 .group_by( 

53 EventType.id, 

54 EventType.created_at, 

55 EventType.modified_at, 

56 EventType.deleted_at, 

57 EventType.name, 

58 EventType.label, 

59 EventType.organization_id, 

60 Event.source, 

61 ) 

62 ) 

63 

64 async def get_by_name_and_organization( 1a

65 self, name: str, organization_id: UUID 

66 ) -> EventType | None: 

67 statement = select(EventType).where( 

68 EventType.name == name, 

69 EventType.organization_id == organization_id, 

70 EventType.deleted_at.is_(None), 

71 ) 

72 result = await self.session.execute(statement) 

73 return result.scalar_one_or_none() 

74 

75 async def get_or_create(self, name: str, organization_id: UUID) -> EventType: 1a

76 existing = await self.get_by_name_and_organization(name, organization_id) 

77 if existing: 

78 return existing 

79 

80 event_type = EventType(name=name, label=name, organization_id=organization_id) 

81 self.session.add(event_type) 

82 await self.session.flush() 

83 return event_type