Coverage for polar/customer/tasks.py: 38%

40 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import uuid 1a

2from typing import Literal 1a

3 

4from sqlalchemy.orm import joinedload 1a

5 

6from polar.event.service import event as event_service 1a

7from polar.event.system import CustomerUpdatedFields, SystemEvent, build_system_event 1a

8from polar.exceptions import PolarTaskError 1a

9from polar.models import Customer 1a

10from polar.models.webhook_endpoint import CustomerWebhookEventType 1a

11from polar.worker import AsyncSessionMaker, RedisMiddleware, TaskPriority, actor 1a

12 

13from .repository import CustomerRepository 1a

14from .service import customer as customer_service 1a

15 

16 

17class CustomerTaskError(PolarTaskError): ... 1a

18 

19 

20class CustomerDoesNotExist(CustomerTaskError): 1a

21 def __init__(self, customer_id: uuid.UUID) -> None: 1a

22 self.customer_id = customer_id 

23 message = f"The customer with id {customer_id} does not exist." 

24 super().__init__(message) 

25 

26 

27@actor(actor_name="customer.webhook", priority=TaskPriority.MEDIUM) 1a

28async def customer_webhook( 1a

29 event_type: CustomerWebhookEventType, customer_id: uuid.UUID 

30) -> None: 

31 async with AsyncSessionMaker() as session: 

32 repository = CustomerRepository.from_session(session) 

33 customer = await repository.get_by_id( 

34 customer_id, 

35 include_deleted=True, 

36 options=(joinedload(Customer.organization),), 

37 ) 

38 

39 if customer is None: 

40 raise CustomerDoesNotExist(customer_id) 

41 

42 await customer_service.webhook( 

43 session, RedisMiddleware.get(), event_type, customer 

44 ) 

45 

46 

47@actor(actor_name="customer.event", priority=TaskPriority.LOW) 1a

48async def customer_event( 1a

49 customer_id: uuid.UUID, 

50 event_name: Literal[ 

51 SystemEvent.customer_created, 

52 SystemEvent.customer_updated, 

53 SystemEvent.customer_deleted, 

54 ], 

55 updated_fields: CustomerUpdatedFields | None = None, 

56) -> None: 

57 async with AsyncSessionMaker() as session: 

58 repository = CustomerRepository.from_session(session) 

59 customer = await repository.get_by_id( 

60 customer_id, 

61 include_deleted=True, 

62 options=(joinedload(Customer.organization),), 

63 ) 

64 

65 if customer is None: 

66 raise CustomerDoesNotExist(customer_id) 

67 

68 match event_name: 

69 case SystemEvent.customer_created: 

70 event = build_system_event( 

71 event_name, 

72 customer=customer, 

73 organization=customer.organization, 

74 metadata={ 

75 "customer_id": str(customer.id), 

76 "customer_email": customer.email, 

77 "customer_name": customer.name, 

78 "customer_external_id": customer.external_id, 

79 }, 

80 ) 

81 case SystemEvent.customer_deleted: 

82 event = build_system_event( 

83 event_name, 

84 customer=customer, 

85 organization=customer.organization, 

86 metadata={ 

87 "customer_id": str(customer.id), 

88 "customer_email": customer.email, 

89 "customer_name": customer.name, 

90 "customer_external_id": customer.external_id, 

91 }, 

92 ) 

93 case SystemEvent.customer_updated: 

94 event = build_system_event( 

95 event_name, 

96 customer=customer, 

97 organization=customer.organization, 

98 metadata={ 

99 "customer_id": str(customer.id), 

100 "customer_email": customer.email, 

101 "customer_name": customer.name, 

102 "customer_external_id": customer.external_id, 

103 "updated_fields": updated_fields or {}, 

104 }, 

105 ) 

106 

107 await event_service.create_event(session, event)