Coverage for polar/event_type/repository.py: 35%
31 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 17:15 +0000
1from datetime import datetime 1a
2from uuid import UUID 1a
4from sqlalchemy import Select, func, select 1a
6from polar.auth.models import AuthSubject, Organization, User, is_organization, is_user 1a
7from polar.kit.repository import RepositoryBase, RepositoryIDMixin 1a
8from polar.models import Event, EventType, UserOrganization 1a
9from polar.models.event import EventSource 1a
12class EventTypeRepository( 1a
13 RepositoryBase[EventType], RepositoryIDMixin[EventType, UUID]
14):
15 model = EventType 1a
17 def get_readable_statement( 1a
18 self, auth_subject: AuthSubject[User | Organization]
19 ) -> Select[tuple[EventType]]:
20 statement = self.get_base_statement()
22 if is_user(auth_subject):
23 user = auth_subject.subject
24 statement = statement.where(
25 EventType.organization_id.in_(
26 select(UserOrganization.organization_id).where(
27 UserOrganization.user_id == user.id,
28 UserOrganization.deleted_at.is_(None),
29 )
30 )
31 )
32 elif is_organization(auth_subject):
33 statement = statement.where(
34 EventType.organization_id == auth_subject.subject.id
35 )
37 return statement
39 def get_event_types_with_stats_statement( 1a
40 self, auth_subject: AuthSubject[User | Organization]
41 ) -> Select[tuple[EventType, EventSource, int, datetime, datetime]]:
42 return (
43 self.get_readable_statement(auth_subject)
44 .join(Event, EventType.id == Event.event_type_id)
45 .with_only_columns(
46 EventType,
47 Event.source,
48 func.count(Event.id).label("occurrences"),
49 func.min(Event.timestamp).label("first_seen"),
50 func.max(Event.timestamp).label("last_seen"),
51 )
52 .group_by(
53 EventType.id,
54 EventType.created_at,
55 EventType.modified_at,
56 EventType.deleted_at,
57 EventType.name,
58 EventType.label,
59 EventType.organization_id,
60 Event.source,
61 )
62 )
64 async def get_by_name_and_organization( 1a
65 self, name: str, organization_id: UUID
66 ) -> EventType | None:
67 statement = select(EventType).where(
68 EventType.name == name,
69 EventType.organization_id == organization_id,
70 EventType.deleted_at.is_(None),
71 )
72 result = await self.session.execute(statement)
73 return result.scalar_one_or_none()
75 async def get_or_create(self, name: str, organization_id: UUID) -> EventType: 1a
76 existing = await self.get_by_name_and_organization(name, organization_id)
77 if existing:
78 return existing
80 event_type = EventType(name=name, label=name, organization_id=organization_id)
81 self.session.add(event_type)
82 await self.session.flush()
83 return event_type