Coverage for polar/event_type/service.py: 21%
67 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1from collections.abc import Sequence 1a
2from typing import Any 1a
3from uuid import UUID 1a
5from sqlalchemy import UnaryExpression, asc, desc, text 1a
7from polar.auth.models import AuthSubject 1a
8from polar.event.repository import EventRepository 1a
9from polar.event.system import SYSTEM_EVENT_LABELS 1a
10from polar.event_type.repository import EventTypeRepository 1a
11from polar.event_type.schemas import EventTypeWithStats 1a
12from polar.event_type.sorting import EventTypesSortProperty 1a
13from polar.kit.pagination import PaginationParams, paginate 1a
14from polar.kit.sorting import Sorting 1a
15from polar.models import Event, EventType, Organization, User 1a
16from polar.models.event import EventSource 1a
17from polar.postgres import AsyncSession 1a
20class EventTypeService: 1a
21 async def get( 1a
22 self,
23 session: AsyncSession,
24 auth_subject: AuthSubject[User | Organization],
25 id: UUID,
26 ) -> EventType | None:
27 repository = EventTypeRepository.from_session(session)
28 statement = repository.get_readable_statement(auth_subject).where(
29 EventType.id == id
30 )
31 return await repository.get_one_or_none(statement)
33 async def list_with_stats( 1a
34 self,
35 session: AsyncSession,
36 auth_subject: AuthSubject[User | Organization],
37 *,
38 organization_id: Sequence[UUID] | None = None,
39 customer_id: Sequence[UUID] | None = None,
40 external_customer_id: Sequence[str] | None = None,
41 query: str | None = None,
42 root_events: bool = False,
43 parent_id: UUID | None = None,
44 source: EventSource | None = None,
45 pagination: PaginationParams,
46 sorting: Sequence[Sorting[EventTypesSortProperty]] = [
47 (EventTypesSortProperty.last_seen, True)
48 ],
49 ) -> tuple[Sequence[EventTypeWithStats], int]:
50 event_type_repository = EventTypeRepository.from_session(session)
51 event_repository = EventRepository.from_session(session)
52 statement = event_type_repository.get_event_types_with_stats_statement(
53 auth_subject
54 )
56 if organization_id is not None:
57 statement = statement.where(EventType.organization_id.in_(organization_id))
59 if customer_id is not None:
60 statement = statement.where(
61 event_repository.get_customer_id_filter_clause(customer_id)
62 )
64 if external_customer_id is not None:
65 statement = statement.where(
66 event_repository.get_external_customer_id_filter_clause(
67 external_customer_id
68 )
69 )
71 if query is not None:
72 statement = statement.where(
73 EventType.name.ilike(f"%{query}%") | EventType.label.ilike(f"%{query}%")
74 )
76 if root_events:
77 statement = statement.where(Event.parent_id.is_(None))
79 if parent_id is not None:
80 statement = statement.where(Event.parent_id == parent_id)
82 if source is not None:
83 statement = statement.where(Event.source == source)
85 order_by_clauses: list[UnaryExpression[Any]] = []
86 for criterion, is_desc in sorting:
87 clause_function = desc if is_desc else asc
88 if criterion == EventTypesSortProperty.event_type_name:
89 order_by_clauses.append(clause_function(EventType.name))
90 elif criterion == EventTypesSortProperty.event_type_label:
91 order_by_clauses.append(clause_function(EventType.label))
92 elif criterion == EventTypesSortProperty.first_seen:
93 order_by_clauses.append(clause_function(text("first_seen")))
94 elif criterion == EventTypesSortProperty.last_seen:
95 order_by_clauses.append(clause_function(text("last_seen")))
96 elif criterion == EventTypesSortProperty.occurrences:
97 order_by_clauses.append(clause_function(text("occurrences")))
98 statement = statement.order_by(*order_by_clauses)
100 results, count = await paginate(session, statement, pagination=pagination)
102 event_types_with_stats: list[EventTypeWithStats] = []
103 for result in results:
104 event_type, source, occurrences, first_seen, last_seen = result
106 # Use system event label for system events
107 if source == EventSource.system:
108 label = SYSTEM_EVENT_LABELS.get(event_type.name, event_type.label)
109 else:
110 label = event_type.label
112 event_types_with_stats.append(
113 EventTypeWithStats.model_validate(
114 {
115 "id": event_type.id,
116 "created_at": event_type.created_at,
117 "modified_at": event_type.modified_at,
118 "name": event_type.name,
119 "label": label,
120 "label_property_selector": event_type.label_property_selector,
121 "organization_id": event_type.organization_id,
122 "source": source,
123 "occurrences": occurrences,
124 "first_seen": first_seen,
125 "last_seen": last_seen,
126 }
127 )
128 )
130 return event_types_with_stats, count
132 async def update( 1a
133 self,
134 session: AsyncSession,
135 event_type: EventType,
136 label: str,
137 label_property_selector: str | None = None,
138 ) -> EventType:
139 event_type.label = label
140 event_type.label_property_selector = label_property_selector
141 session.add(event_type)
142 return event_type
145event_type_service = EventTypeService() 1a