Coverage for polar/event_type/service.py: 21%

67 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1from collections.abc import Sequence 1a

2from typing import Any 1a

3from uuid import UUID 1a

4 

5from sqlalchemy import UnaryExpression, asc, desc, text 1a

6 

7from polar.auth.models import AuthSubject 1a

8from polar.event.repository import EventRepository 1a

9from polar.event.system import SYSTEM_EVENT_LABELS 1a

10from polar.event_type.repository import EventTypeRepository 1a

11from polar.event_type.schemas import EventTypeWithStats 1a

12from polar.event_type.sorting import EventTypesSortProperty 1a

13from polar.kit.pagination import PaginationParams, paginate 1a

14from polar.kit.sorting import Sorting 1a

15from polar.models import Event, EventType, Organization, User 1a

16from polar.models.event import EventSource 1a

17from polar.postgres import AsyncSession 1a

18 

19 

20class EventTypeService: 1a

21 async def get( 1a

22 self, 

23 session: AsyncSession, 

24 auth_subject: AuthSubject[User | Organization], 

25 id: UUID, 

26 ) -> EventType | None: 

27 repository = EventTypeRepository.from_session(session) 

28 statement = repository.get_readable_statement(auth_subject).where( 

29 EventType.id == id 

30 ) 

31 return await repository.get_one_or_none(statement) 

32 

33 async def list_with_stats( 1a

34 self, 

35 session: AsyncSession, 

36 auth_subject: AuthSubject[User | Organization], 

37 *, 

38 organization_id: Sequence[UUID] | None = None, 

39 customer_id: Sequence[UUID] | None = None, 

40 external_customer_id: Sequence[str] | None = None, 

41 query: str | None = None, 

42 root_events: bool = False, 

43 parent_id: UUID | None = None, 

44 source: EventSource | None = None, 

45 pagination: PaginationParams, 

46 sorting: Sequence[Sorting[EventTypesSortProperty]] = [ 

47 (EventTypesSortProperty.last_seen, True) 

48 ], 

49 ) -> tuple[Sequence[EventTypeWithStats], int]: 

50 event_type_repository = EventTypeRepository.from_session(session) 

51 event_repository = EventRepository.from_session(session) 

52 statement = event_type_repository.get_event_types_with_stats_statement( 

53 auth_subject 

54 ) 

55 

56 if organization_id is not None: 

57 statement = statement.where(EventType.organization_id.in_(organization_id)) 

58 

59 if customer_id is not None: 

60 statement = statement.where( 

61 event_repository.get_customer_id_filter_clause(customer_id) 

62 ) 

63 

64 if external_customer_id is not None: 

65 statement = statement.where( 

66 event_repository.get_external_customer_id_filter_clause( 

67 external_customer_id 

68 ) 

69 ) 

70 

71 if query is not None: 

72 statement = statement.where( 

73 EventType.name.ilike(f"%{query}%") | EventType.label.ilike(f"%{query}%") 

74 ) 

75 

76 if root_events: 

77 statement = statement.where(Event.parent_id.is_(None)) 

78 

79 if parent_id is not None: 

80 statement = statement.where(Event.parent_id == parent_id) 

81 

82 if source is not None: 

83 statement = statement.where(Event.source == source) 

84 

85 order_by_clauses: list[UnaryExpression[Any]] = [] 

86 for criterion, is_desc in sorting: 

87 clause_function = desc if is_desc else asc 

88 if criterion == EventTypesSortProperty.event_type_name: 

89 order_by_clauses.append(clause_function(EventType.name)) 

90 elif criterion == EventTypesSortProperty.event_type_label: 

91 order_by_clauses.append(clause_function(EventType.label)) 

92 elif criterion == EventTypesSortProperty.first_seen: 

93 order_by_clauses.append(clause_function(text("first_seen"))) 

94 elif criterion == EventTypesSortProperty.last_seen: 

95 order_by_clauses.append(clause_function(text("last_seen"))) 

96 elif criterion == EventTypesSortProperty.occurrences: 

97 order_by_clauses.append(clause_function(text("occurrences"))) 

98 statement = statement.order_by(*order_by_clauses) 

99 

100 results, count = await paginate(session, statement, pagination=pagination) 

101 

102 event_types_with_stats: list[EventTypeWithStats] = [] 

103 for result in results: 

104 event_type, source, occurrences, first_seen, last_seen = result 

105 

106 # Use system event label for system events 

107 if source == EventSource.system: 

108 label = SYSTEM_EVENT_LABELS.get(event_type.name, event_type.label) 

109 else: 

110 label = event_type.label 

111 

112 event_types_with_stats.append( 

113 EventTypeWithStats.model_validate( 

114 { 

115 "id": event_type.id, 

116 "created_at": event_type.created_at, 

117 "modified_at": event_type.modified_at, 

118 "name": event_type.name, 

119 "label": label, 

120 "label_property_selector": event_type.label_property_selector, 

121 "organization_id": event_type.organization_id, 

122 "source": source, 

123 "occurrences": occurrences, 

124 "first_seen": first_seen, 

125 "last_seen": last_seen, 

126 } 

127 ) 

128 ) 

129 

130 return event_types_with_stats, count 

131 

132 async def update( 1a

133 self, 

134 session: AsyncSession, 

135 event_type: EventType, 

136 label: str, 

137 label_property_selector: str | None = None, 

138 ) -> EventType: 

139 event_type.label = label 

140 event_type.label_property_selector = label_property_selector 

141 session.add(event_type) 

142 return event_type 

143 

144 

145event_type_service = EventTypeService() 1a