Coverage for polar/customer/tasks.py: 38%
40 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1import uuid 1a
2from typing import Literal 1a
4from sqlalchemy.orm import joinedload 1a
6from polar.event.service import event as event_service 1a
7from polar.event.system import CustomerUpdatedFields, SystemEvent, build_system_event 1a
8from polar.exceptions import PolarTaskError 1a
9from polar.models import Customer 1a
10from polar.models.webhook_endpoint import CustomerWebhookEventType 1a
11from polar.worker import AsyncSessionMaker, RedisMiddleware, TaskPriority, actor 1a
13from .repository import CustomerRepository 1a
14from .service import customer as customer_service 1a
17class CustomerTaskError(PolarTaskError): ... 1a
20class CustomerDoesNotExist(CustomerTaskError): 1a
21 def __init__(self, customer_id: uuid.UUID) -> None: 1a
22 self.customer_id = customer_id
23 message = f"The customer with id {customer_id} does not exist."
24 super().__init__(message)
27@actor(actor_name="customer.webhook", priority=TaskPriority.MEDIUM) 1a
28async def customer_webhook( 1a
29 event_type: CustomerWebhookEventType, customer_id: uuid.UUID
30) -> None:
31 async with AsyncSessionMaker() as session:
32 repository = CustomerRepository.from_session(session)
33 customer = await repository.get_by_id(
34 customer_id,
35 include_deleted=True,
36 options=(joinedload(Customer.organization),),
37 )
39 if customer is None:
40 raise CustomerDoesNotExist(customer_id)
42 await customer_service.webhook(
43 session, RedisMiddleware.get(), event_type, customer
44 )
47@actor(actor_name="customer.event", priority=TaskPriority.LOW) 1a
48async def customer_event( 1a
49 customer_id: uuid.UUID,
50 event_name: Literal[
51 SystemEvent.customer_created,
52 SystemEvent.customer_updated,
53 SystemEvent.customer_deleted,
54 ],
55 updated_fields: CustomerUpdatedFields | None = None,
56) -> None:
57 async with AsyncSessionMaker() as session:
58 repository = CustomerRepository.from_session(session)
59 customer = await repository.get_by_id(
60 customer_id,
61 include_deleted=True,
62 options=(joinedload(Customer.organization),),
63 )
65 if customer is None:
66 raise CustomerDoesNotExist(customer_id)
68 match event_name:
69 case SystemEvent.customer_created:
70 event = build_system_event(
71 event_name,
72 customer=customer,
73 organization=customer.organization,
74 metadata={
75 "customer_id": str(customer.id),
76 "customer_email": customer.email,
77 "customer_name": customer.name,
78 "customer_external_id": customer.external_id,
79 },
80 )
81 case SystemEvent.customer_deleted:
82 event = build_system_event(
83 event_name,
84 customer=customer,
85 organization=customer.organization,
86 metadata={
87 "customer_id": str(customer.id),
88 "customer_email": customer.email,
89 "customer_name": customer.name,
90 "customer_external_id": customer.external_id,
91 },
92 )
93 case SystemEvent.customer_updated:
94 event = build_system_event(
95 event_name,
96 customer=customer,
97 organization=customer.organization,
98 metadata={
99 "customer_id": str(customer.id),
100 "customer_email": customer.email,
101 "customer_name": customer.name,
102 "customer_external_id": customer.external_id,
103 "updated_fields": updated_fields or {},
104 },
105 )
107 await event_service.create_event(session, event)