Coverage for polar/custom_field/service.py: 24%

93 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1import uuid 1a

2from collections.abc import Sequence 1a

3from typing import Any 1a

4 

5from sqlalchemy import ( 1a

6 Select, 

7 UnaryExpression, 

8 asc, 

9 delete, 

10 desc, 

11 func, 

12 or_, 

13 select, 

14 update, 

15) 

16from sqlalchemy.orm import contains_eager 1a

17 

18from polar.auth.models import AuthSubject, is_organization, is_user 1a

19from polar.custom_field.sorting import CustomFieldSortProperty 1a

20from polar.exceptions import PolarError, PolarRequestValidationError 1a

21from polar.kit.pagination import PaginationParams, paginate 1a

22from polar.kit.services import ResourceServiceReader 1a

23from polar.kit.sorting import Sorting 1a

24from polar.models import CustomField, Organization, User, UserOrganization 1a

25from polar.models.custom_field import CustomFieldType 1a

26from polar.organization.resolver import get_payload_organization 1a

27from polar.postgres import AsyncReadSession, AsyncSession 1a

28 

29from .attachment import attached_custom_fields_models 1a

30from .data import custom_field_data_models 1a

31from .schemas import CustomFieldCreate, CustomFieldUpdate 1a

32 

33 

34class CustomFieldError(PolarError): ... 1a

35 

36 

37class CustomFieldService(ResourceServiceReader[CustomField]): 1a

38 async def list( 1a

39 self, 

40 session: AsyncReadSession, 

41 auth_subject: AuthSubject[User | Organization], 

42 *, 

43 organization_id: Sequence[uuid.UUID] | None = None, 

44 query: str | None = None, 

45 type: Sequence[CustomFieldType] | None = None, 

46 pagination: PaginationParams, 

47 sorting: list[Sorting[CustomFieldSortProperty]] = [ 

48 (CustomFieldSortProperty.slug, False) 

49 ], 

50 ) -> tuple[Sequence[CustomField], int]: 

51 statement = self._get_readable_custom_field_statement(auth_subject) 

52 

53 if organization_id is not None: 

54 statement = statement.where( 

55 CustomField.organization_id.in_(organization_id) 

56 ) 

57 

58 if query is not None: 

59 statement = statement.where( 

60 or_( 

61 CustomField.name.ilike(f"%{query}%"), 

62 CustomField.slug.ilike(f"%{query}%"), 

63 ) 

64 ) 

65 

66 if type is not None: 

67 statement = statement.where(CustomField.type.in_(type)) 

68 

69 order_by_clauses: list[UnaryExpression[Any]] = [] 

70 for criterion, is_desc in sorting: 

71 clause_function = desc if is_desc else asc 

72 if criterion == CustomFieldSortProperty.created_at: 

73 order_by_clauses.append( 

74 clause_function(CustomFieldSortProperty.created_at) 

75 ) 

76 elif criterion == CustomFieldSortProperty.slug: 

77 order_by_clauses.append(clause_function(CustomFieldSortProperty.slug)) 

78 elif criterion == CustomFieldSortProperty.custom_field_name: 

79 order_by_clauses.append( 

80 clause_function(CustomFieldSortProperty.custom_field_name) 

81 ) 

82 elif criterion == CustomFieldSortProperty.type: 

83 order_by_clauses.append(clause_function(CustomFieldSortProperty.type)) 

84 statement = statement.order_by(*order_by_clauses) 

85 

86 return await paginate(session, statement, pagination=pagination) 

87 

88 async def get_by_id( 1a

89 self, 

90 session: AsyncReadSession, 

91 auth_subject: AuthSubject[User | Organization], 

92 id: uuid.UUID, 

93 ) -> CustomField | None: 

94 statement = self._get_readable_custom_field_statement(auth_subject).where( 

95 CustomField.id == id 

96 ) 

97 result = await session.execute(statement) 

98 return result.scalar_one_or_none() 

99 

100 async def create( 1a

101 self, 

102 session: AsyncSession, 

103 custom_field_create: CustomFieldCreate, 

104 auth_subject: AuthSubject[User | Organization], 

105 ) -> CustomField: 

106 organization = await get_payload_organization( 

107 session, auth_subject, custom_field_create 

108 ) 

109 

110 existing_field = await self._get_by_organization_id_and_slug( 

111 session, organization.id, custom_field_create.slug 

112 ) 

113 if existing_field is not None: 

114 raise PolarRequestValidationError( 

115 [ 

116 { 

117 "type": "value_error", 

118 "loc": ("body", "slug"), 

119 "msg": "Custom field with this slug already exists.", 

120 "input": custom_field_create.slug, 

121 } 

122 ] 

123 ) 

124 

125 custom_field = CustomField( 

126 **custom_field_create.model_dump( 

127 exclude={"organization_id"}, by_alias=True 

128 ), 

129 organization=organization, 

130 ) 

131 session.add(custom_field) 

132 

133 return custom_field 

134 

135 async def update( 1a

136 self, 

137 session: AsyncSession, 

138 custom_field: CustomField, 

139 custom_field_update: CustomFieldUpdate, 

140 ) -> CustomField: 

141 if custom_field.type != custom_field_update.type: 

142 raise PolarRequestValidationError( 

143 [ 

144 { 

145 "type": "value_error", 

146 "loc": ("body", "type"), 

147 "msg": "The type of a custom field can't be changed.", 

148 "input": custom_field_update.type, 

149 } 

150 ] 

151 ) 

152 

153 if ( 

154 custom_field_update.slug is not None 

155 and custom_field.slug != custom_field_update.slug 

156 ): 

157 existing_field = await self._get_by_organization_id_and_slug( 

158 session, custom_field.organization_id, custom_field_update.slug 

159 ) 

160 if existing_field is not None and existing_field.id != custom_field.id: 

161 raise PolarRequestValidationError( 

162 [ 

163 { 

164 "type": "value_error", 

165 "loc": ("body", "slug"), 

166 "msg": "Custom field with this slug already exists.", 

167 "input": custom_field_update.slug, 

168 } 

169 ] 

170 ) 

171 

172 previous_slug = custom_field.slug 

173 for attr, value in custom_field_update.model_dump( 

174 exclude_unset=True, by_alias=True 

175 ).items(): 

176 setattr(custom_field, attr, value) 

177 

178 # Update the slug from all custom_field_data JSONB 

179 if previous_slug != custom_field.slug: 

180 for model in custom_field_data_models: 

181 update_statement = ( 

182 update(model) 

183 .where( 

184 model.organization == custom_field.organization, 

185 model.custom_field_data.has_key(previous_slug), 

186 ) 

187 .values( 

188 custom_field_data=( 

189 model.custom_field_data.op("-")(previous_slug) 

190 ).op("||")( 

191 func.jsonb_build_object( 

192 custom_field.slug, 

193 model.custom_field_data[previous_slug], 

194 ) 

195 ) 

196 ) 

197 ) 

198 await session.execute(update_statement) 

199 

200 session.add(custom_field) 

201 return custom_field 

202 

203 async def delete( 1a

204 self, session: AsyncSession, custom_field: CustomField 

205 ) -> CustomField: 

206 custom_field.set_deleted_at() 

207 session.add(custom_field) 

208 

209 # Delete row with this custom field from all association tables 

210 for model in attached_custom_fields_models: 

211 delete_statement = delete(model).where( 

212 model.custom_field_id == custom_field.id 

213 ) 

214 await session.execute(delete_statement) 

215 

216 return custom_field 

217 

218 async def get_by_organization_and_id( 1a

219 self, session: AsyncSession, id: uuid.UUID, organization_id: uuid.UUID 

220 ) -> CustomField | None: 

221 statement = select(CustomField).where( 

222 CustomField.deleted_at.is_(None), 

223 CustomField.organization_id == organization_id, 

224 CustomField.id == id, 

225 ) 

226 result = await session.execute(statement) 

227 return result.scalar_one_or_none() 

228 

229 async def _get_by_organization_id_and_slug( 1a

230 self, session: AsyncSession, organization_id: uuid.UUID, slug: str 

231 ) -> CustomField | None: 

232 statement = select(CustomField).where( 

233 CustomField.organization_id == organization_id, 

234 CustomField.slug == slug, 

235 ) 

236 result = await session.execute(statement) 

237 return result.scalar_one_or_none() 

238 

239 def _get_readable_custom_field_statement( 1a

240 self, auth_subject: AuthSubject[User | Organization] 

241 ) -> Select[tuple[CustomField]]: 

242 statement = ( 

243 select(CustomField) 

244 .where(CustomField.deleted_at.is_(None)) 

245 .join(Organization, Organization.id == CustomField.organization_id) 

246 .options(contains_eager(CustomField.organization)) 

247 ) 

248 

249 if is_user(auth_subject): 

250 user = auth_subject.subject 

251 statement = statement.where( 

252 CustomField.organization_id.in_( 

253 select(UserOrganization.organization_id).where( 

254 UserOrganization.user_id == user.id, 

255 UserOrganization.deleted_at.is_(None), 

256 ) 

257 ) 

258 ) 

259 elif is_organization(auth_subject): 

260 statement = statement.where( 

261 CustomField.organization_id == auth_subject.subject.id, 

262 ) 

263 

264 return statement 

265 

266 

267custom_field = CustomFieldService(CustomField) 1a