Coverage for polar/kit/services.py: 37%

43 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 16:17 +0000

1from __future__ import annotations 1a

2 

3from collections.abc import Sequence 1a

4from typing import Any 1a

5from uuid import UUID 1a

6 

7from sqlalchemy.orm import InstrumentedAttribute 1a

8from sqlalchemy.sql.base import ExecutableOption 1a

9 

10from polar.kit.utils import utc_now 1a

11 

12from .db.models import RecordModel 1a

13from .db.postgres import AsyncSession, sql 1a

14from .schemas import Schema 1a

15 

16 

17class ResourceServiceReader[ModelType: RecordModel]: 1a

18 def __init__(self, model: type[ModelType]) -> None: 1a

19 self.model = model 1a

20 

21 async def get( 1a

22 self, 

23 session: AsyncSession, 

24 id: UUID, 

25 allow_deleted: bool = False, 

26 *, 

27 options: Sequence[ExecutableOption] | None = None, 

28 ) -> ModelType | None: 

29 query = sql.select(self.model).where(self.model.id == id) 

30 if not allow_deleted: 

31 query = query.where(self.model.deleted_at.is_(None)) 

32 if options is not None: 

33 query = query.options(*options) 

34 res = await session.execute(query) 

35 return res.scalars().unique().one_or_none() 

36 

37 async def get_by(self, session: AsyncSession, **clauses: Any) -> ModelType | None: 1a

38 query = sql.select(self.model).filter_by(**clauses) 

39 res = await session.execute(query) 

40 return res.scalars().unique().one_or_none() 

41 

42 async def soft_delete(self, session: AsyncSession, id: UUID) -> None: 1a

43 stmt = ( 

44 sql.update(self.model) 

45 .where(self.model.id == id, self.model.deleted_at.is_(None)) 

46 .values( 

47 deleted_at=utc_now(), 

48 ) 

49 ) 

50 await session.execute(stmt) 

51 await session.flush() 

52 

53 

54class ResourceService[ 1a

55 ModelType: RecordModel, 

56 CreateSchemaType: Schema, 

57 UpdateSchemaType: Schema, 

58]( 

59 ResourceServiceReader[ModelType], 

60): 

61 # Ideally, actions would only contain class methods since there is 

62 # no state to retain. Unable to achieve this with mapping the model 

63 # and schema as class attributes though without breaking typing. 

64 

65 # TODO: Investigate new bulk methods in SQLALchemy 2.0 for upsert_many 

66 async def upsert_many( 1a

67 self, 

68 session: AsyncSession, 

69 create_schemas: list[CreateSchemaType], 

70 constraints: list[InstrumentedAttribute[Any]], 

71 mutable_keys: set[str], 

72 autocommit: bool = True, 

73 ) -> Sequence[ModelType]: 

74 return await self._db_upsert_many( 

75 session, 

76 create_schemas, 

77 constraints=constraints, 

78 mutable_keys=mutable_keys, 

79 autocommit=autocommit, 

80 ) 

81 

82 async def _db_upsert_many( 1a

83 self, 

84 session: AsyncSession, 

85 objects: list[CreateSchemaType], 

86 constraints: list[InstrumentedAttribute[Any]], 

87 mutable_keys: set[str], 

88 autocommit: bool = True, 

89 ) -> Sequence[ModelType]: 

90 values = [obj.model_dump(by_alias=True) for obj in objects] 

91 if not values: 

92 raise ValueError("Zero values provided") 

93 

94 insert_stmt = sql.insert(self.model).values(values) 

95 

96 # Update the insert statement with what to update on conflict, i.e mutable keys. 

97 upsert_stmt = ( 

98 insert_stmt.on_conflict_do_update( 

99 index_elements=constraints, 

100 set_={k: getattr(insert_stmt.excluded, k) for k in mutable_keys}, 

101 ) 

102 .returning(self.model) 

103 .execution_options(populate_existing=True) 

104 ) 

105 

106 res = await session.execute(upsert_stmt) 

107 instances = res.scalars().all() 

108 if autocommit: 

109 await session.commit() 

110 return instances