Coverage for polar/kit/services.py: 37%
43 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1from __future__ import annotations 1a
3from collections.abc import Sequence 1a
4from typing import Any 1a
5from uuid import UUID 1a
7from sqlalchemy.orm import InstrumentedAttribute 1a
8from sqlalchemy.sql.base import ExecutableOption 1a
10from polar.kit.utils import utc_now 1a
12from .db.models import RecordModel 1a
13from .db.postgres import AsyncSession, sql 1a
14from .schemas import Schema 1a
17class ResourceServiceReader[ModelType: RecordModel]: 1a
18 def __init__(self, model: type[ModelType]) -> None: 1a
19 self.model = model 1a
21 async def get( 1a
22 self,
23 session: AsyncSession,
24 id: UUID,
25 allow_deleted: bool = False,
26 *,
27 options: Sequence[ExecutableOption] | None = None,
28 ) -> ModelType | None:
29 query = sql.select(self.model).where(self.model.id == id)
30 if not allow_deleted:
31 query = query.where(self.model.deleted_at.is_(None))
32 if options is not None:
33 query = query.options(*options)
34 res = await session.execute(query)
35 return res.scalars().unique().one_or_none()
37 async def get_by(self, session: AsyncSession, **clauses: Any) -> ModelType | None: 1a
38 query = sql.select(self.model).filter_by(**clauses)
39 res = await session.execute(query)
40 return res.scalars().unique().one_or_none()
42 async def soft_delete(self, session: AsyncSession, id: UUID) -> None: 1a
43 stmt = (
44 sql.update(self.model)
45 .where(self.model.id == id, self.model.deleted_at.is_(None))
46 .values(
47 deleted_at=utc_now(),
48 )
49 )
50 await session.execute(stmt)
51 await session.flush()
54class ResourceService[ 1a
55 ModelType: RecordModel,
56 CreateSchemaType: Schema,
57 UpdateSchemaType: Schema,
58](
59 ResourceServiceReader[ModelType],
60):
61 # Ideally, actions would only contain class methods since there is
62 # no state to retain. Unable to achieve this with mapping the model
63 # and schema as class attributes though without breaking typing.
65 # TODO: Investigate new bulk methods in SQLALchemy 2.0 for upsert_many
66 async def upsert_many( 1a
67 self,
68 session: AsyncSession,
69 create_schemas: list[CreateSchemaType],
70 constraints: list[InstrumentedAttribute[Any]],
71 mutable_keys: set[str],
72 autocommit: bool = True,
73 ) -> Sequence[ModelType]:
74 return await self._db_upsert_many(
75 session,
76 create_schemas,
77 constraints=constraints,
78 mutable_keys=mutable_keys,
79 autocommit=autocommit,
80 )
82 async def _db_upsert_many( 1a
83 self,
84 session: AsyncSession,
85 objects: list[CreateSchemaType],
86 constraints: list[InstrumentedAttribute[Any]],
87 mutable_keys: set[str],
88 autocommit: bool = True,
89 ) -> Sequence[ModelType]:
90 values = [obj.model_dump(by_alias=True) for obj in objects]
91 if not values:
92 raise ValueError("Zero values provided")
94 insert_stmt = sql.insert(self.model).values(values)
96 # Update the insert statement with what to update on conflict, i.e mutable keys.
97 upsert_stmt = (
98 insert_stmt.on_conflict_do_update(
99 index_elements=constraints,
100 set_={k: getattr(insert_stmt.excluded, k) for k in mutable_keys},
101 )
102 .returning(self.model)
103 .execution_options(populate_existing=True)
104 )
106 res = await session.execute(upsert_stmt)
107 instances = res.scalars().all()
108 if autocommit:
109 await session.commit()
110 return instances