Coverage for polar/custom_field/service.py: 24%
93 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 15:52 +0000
1import uuid 1a
2from collections.abc import Sequence 1a
3from typing import Any 1a
5from sqlalchemy import ( 1a
6 Select,
7 UnaryExpression,
8 asc,
9 delete,
10 desc,
11 func,
12 or_,
13 select,
14 update,
15)
16from sqlalchemy.orm import contains_eager 1a
18from polar.auth.models import AuthSubject, is_organization, is_user 1a
19from polar.custom_field.sorting import CustomFieldSortProperty 1a
20from polar.exceptions import PolarError, PolarRequestValidationError 1a
21from polar.kit.pagination import PaginationParams, paginate 1a
22from polar.kit.services import ResourceServiceReader 1a
23from polar.kit.sorting import Sorting 1a
24from polar.models import CustomField, Organization, User, UserOrganization 1a
25from polar.models.custom_field import CustomFieldType 1a
26from polar.organization.resolver import get_payload_organization 1a
27from polar.postgres import AsyncReadSession, AsyncSession 1a
29from .attachment import attached_custom_fields_models 1a
30from .data import custom_field_data_models 1a
31from .schemas import CustomFieldCreate, CustomFieldUpdate 1a
34class CustomFieldError(PolarError): ... 1a
37class CustomFieldService(ResourceServiceReader[CustomField]): 1a
38 async def list( 1a
39 self,
40 session: AsyncReadSession,
41 auth_subject: AuthSubject[User | Organization],
42 *,
43 organization_id: Sequence[uuid.UUID] | None = None,
44 query: str | None = None,
45 type: Sequence[CustomFieldType] | None = None,
46 pagination: PaginationParams,
47 sorting: list[Sorting[CustomFieldSortProperty]] = [
48 (CustomFieldSortProperty.slug, False)
49 ],
50 ) -> tuple[Sequence[CustomField], int]:
51 statement = self._get_readable_custom_field_statement(auth_subject)
53 if organization_id is not None:
54 statement = statement.where(
55 CustomField.organization_id.in_(organization_id)
56 )
58 if query is not None:
59 statement = statement.where(
60 or_(
61 CustomField.name.ilike(f"%{query}%"),
62 CustomField.slug.ilike(f"%{query}%"),
63 )
64 )
66 if type is not None:
67 statement = statement.where(CustomField.type.in_(type))
69 order_by_clauses: list[UnaryExpression[Any]] = []
70 for criterion, is_desc in sorting:
71 clause_function = desc if is_desc else asc
72 if criterion == CustomFieldSortProperty.created_at:
73 order_by_clauses.append(
74 clause_function(CustomFieldSortProperty.created_at)
75 )
76 elif criterion == CustomFieldSortProperty.slug:
77 order_by_clauses.append(clause_function(CustomFieldSortProperty.slug))
78 elif criterion == CustomFieldSortProperty.custom_field_name:
79 order_by_clauses.append(
80 clause_function(CustomFieldSortProperty.custom_field_name)
81 )
82 elif criterion == CustomFieldSortProperty.type:
83 order_by_clauses.append(clause_function(CustomFieldSortProperty.type))
84 statement = statement.order_by(*order_by_clauses)
86 return await paginate(session, statement, pagination=pagination)
88 async def get_by_id( 1a
89 self,
90 session: AsyncReadSession,
91 auth_subject: AuthSubject[User | Organization],
92 id: uuid.UUID,
93 ) -> CustomField | None:
94 statement = self._get_readable_custom_field_statement(auth_subject).where(
95 CustomField.id == id
96 )
97 result = await session.execute(statement)
98 return result.scalar_one_or_none()
100 async def create( 1a
101 self,
102 session: AsyncSession,
103 custom_field_create: CustomFieldCreate,
104 auth_subject: AuthSubject[User | Organization],
105 ) -> CustomField:
106 organization = await get_payload_organization(
107 session, auth_subject, custom_field_create
108 )
110 existing_field = await self._get_by_organization_id_and_slug(
111 session, organization.id, custom_field_create.slug
112 )
113 if existing_field is not None:
114 raise PolarRequestValidationError(
115 [
116 {
117 "type": "value_error",
118 "loc": ("body", "slug"),
119 "msg": "Custom field with this slug already exists.",
120 "input": custom_field_create.slug,
121 }
122 ]
123 )
125 custom_field = CustomField(
126 **custom_field_create.model_dump(
127 exclude={"organization_id"}, by_alias=True
128 ),
129 organization=organization,
130 )
131 session.add(custom_field)
133 return custom_field
135 async def update( 1a
136 self,
137 session: AsyncSession,
138 custom_field: CustomField,
139 custom_field_update: CustomFieldUpdate,
140 ) -> CustomField:
141 if custom_field.type != custom_field_update.type:
142 raise PolarRequestValidationError(
143 [
144 {
145 "type": "value_error",
146 "loc": ("body", "type"),
147 "msg": "The type of a custom field can't be changed.",
148 "input": custom_field_update.type,
149 }
150 ]
151 )
153 if (
154 custom_field_update.slug is not None
155 and custom_field.slug != custom_field_update.slug
156 ):
157 existing_field = await self._get_by_organization_id_and_slug(
158 session, custom_field.organization_id, custom_field_update.slug
159 )
160 if existing_field is not None and existing_field.id != custom_field.id:
161 raise PolarRequestValidationError(
162 [
163 {
164 "type": "value_error",
165 "loc": ("body", "slug"),
166 "msg": "Custom field with this slug already exists.",
167 "input": custom_field_update.slug,
168 }
169 ]
170 )
172 previous_slug = custom_field.slug
173 for attr, value in custom_field_update.model_dump(
174 exclude_unset=True, by_alias=True
175 ).items():
176 setattr(custom_field, attr, value)
178 # Update the slug from all custom_field_data JSONB
179 if previous_slug != custom_field.slug:
180 for model in custom_field_data_models:
181 update_statement = (
182 update(model)
183 .where(
184 model.organization == custom_field.organization,
185 model.custom_field_data.has_key(previous_slug),
186 )
187 .values(
188 custom_field_data=(
189 model.custom_field_data.op("-")(previous_slug)
190 ).op("||")(
191 func.jsonb_build_object(
192 custom_field.slug,
193 model.custom_field_data[previous_slug],
194 )
195 )
196 )
197 )
198 await session.execute(update_statement)
200 session.add(custom_field)
201 return custom_field
203 async def delete( 1a
204 self, session: AsyncSession, custom_field: CustomField
205 ) -> CustomField:
206 custom_field.set_deleted_at()
207 session.add(custom_field)
209 # Delete row with this custom field from all association tables
210 for model in attached_custom_fields_models:
211 delete_statement = delete(model).where(
212 model.custom_field_id == custom_field.id
213 )
214 await session.execute(delete_statement)
216 return custom_field
218 async def get_by_organization_and_id( 1a
219 self, session: AsyncSession, id: uuid.UUID, organization_id: uuid.UUID
220 ) -> CustomField | None:
221 statement = select(CustomField).where(
222 CustomField.deleted_at.is_(None),
223 CustomField.organization_id == organization_id,
224 CustomField.id == id,
225 )
226 result = await session.execute(statement)
227 return result.scalar_one_or_none()
229 async def _get_by_organization_id_and_slug( 1a
230 self, session: AsyncSession, organization_id: uuid.UUID, slug: str
231 ) -> CustomField | None:
232 statement = select(CustomField).where(
233 CustomField.organization_id == organization_id,
234 CustomField.slug == slug,
235 )
236 result = await session.execute(statement)
237 return result.scalar_one_or_none()
239 def _get_readable_custom_field_statement( 1a
240 self, auth_subject: AuthSubject[User | Organization]
241 ) -> Select[tuple[CustomField]]:
242 statement = (
243 select(CustomField)
244 .where(CustomField.deleted_at.is_(None))
245 .join(Organization, Organization.id == CustomField.organization_id)
246 .options(contains_eager(CustomField.organization))
247 )
249 if is_user(auth_subject):
250 user = auth_subject.subject
251 statement = statement.where(
252 CustomField.organization_id.in_(
253 select(UserOrganization.organization_id).where(
254 UserOrganization.user_id == user.id,
255 UserOrganization.deleted_at.is_(None),
256 )
257 )
258 )
259 elif is_organization(auth_subject):
260 statement = statement.where(
261 CustomField.organization_id == auth_subject.subject.id,
262 )
264 return statement
267custom_field = CustomFieldService(CustomField) 1a