Coverage for polar/event_type/endpoints.py: 67%

25 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1from uuid import UUID 1a

2 

3from fastapi import Depends, Query 1a

4 

5from polar.customer.schemas.customer import CustomerID 1a

6from polar.event_type import auth, schemas, sorting 1a

7from polar.event_type.service import event_type_service 1a

8from polar.exceptions import ResourceNotFound 1a

9from polar.kit.pagination import ListResource, PaginationParamsQuery 1a

10from polar.kit.schemas import MultipleQueryFilter 1a

11from polar.models.event import EventSource 1a

12from polar.openapi import APITag 1a

13from polar.organization.schemas import OrganizationID 1a

14from polar.postgres import AsyncSession, get_db_session 1a

15from polar.routing import APIRouter 1a

16 

17router = APIRouter(prefix="/event-types", tags=["event-types", APITag.public]) 1a

18 

19 

20@router.get( 1a

21 "/", 

22 summary="List Event Types", 

23 response_model=ListResource[schemas.EventTypeWithStats], 

24) 

25async def list( 1a

26 auth_subject: auth.EventTypeRead, 

27 pagination: PaginationParamsQuery, 

28 sorting: sorting.EventTypesSorting, 

29 session: AsyncSession = Depends(get_db_session), 

30 organization_id: MultipleQueryFilter[OrganizationID] | None = Query( 

31 None, title="OrganizationID Filter", description="Filter by organization ID." 

32 ), 

33 customer_id: MultipleQueryFilter[CustomerID] | None = Query( 

34 None, title="CustomerID Filter", description="Filter by customer ID." 

35 ), 

36 external_customer_id: MultipleQueryFilter[str] | None = Query( 

37 None, 

38 title="ExternalCustomerID Filter", 

39 description="Filter by external customer ID.", 

40 ), 

41 query: str | None = Query( 

42 None, 

43 title="Query", 

44 description="Query to filter event types by name or label.", 

45 ), 

46 root_events: bool = Query( 

47 False, 

48 title="Root Events Filter", 

49 description="When true, only return event types with root events (parent_id IS NULL).", 

50 ), 

51 parent_id: UUID | None = Query( 

52 None, 

53 title="ParentID Filter", 

54 description="Filter by specific parent event ID.", 

55 ), 

56 source: EventSource | None = Query( 

57 None, 

58 title="EventSource Filter", 

59 description="Filter by event source (system or user).", 

60 ), 

61) -> ListResource[schemas.EventTypeWithStats]: 

62 """List event types with aggregated statistics.""" 

63 results, count = await event_type_service.list_with_stats( 

64 session, 

65 auth_subject, 

66 organization_id=organization_id, 

67 customer_id=customer_id, 

68 external_customer_id=external_customer_id, 

69 query=query, 

70 root_events=root_events, 

71 parent_id=parent_id, 

72 source=source, 

73 pagination=pagination, 

74 sorting=sorting, 

75 ) 

76 return ListResource.from_paginated_results(results, count, pagination) 

77 

78 

79@router.patch( 1a

80 "/{id}", 

81 response_model=schemas.EventType, 

82 summary="Update Event Type", 

83 description="Update an event type's label.", 

84 status_code=200, 

85 responses={404: {}}, 

86) 

87async def update( 1a

88 id: schemas.EventTypeID, 

89 body: schemas.EventTypeUpdate, 

90 auth_subject: auth.EventTypeWrite, 

91 session: AsyncSession = Depends(get_db_session), 

92) -> schemas.EventType: 

93 event_type = await event_type_service.get(session, auth_subject, id) 

94 if event_type is None: 

95 raise ResourceNotFound() 

96 

97 updated_event_type = await event_type_service.update( 

98 session, event_type, body.label, body.label_property_selector 

99 ) 

100 return schemas.EventType.model_validate(updated_event_type)