Coverage for polar/kit/db/postgres.py: 81%

42 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 17:15 +0000

1import json 1ab

2from decimal import Decimal 1ab

3from typing import Any, NewType, TypeAlias 1ab

4 

5from sqlalchemy import Engine 1ab

6from sqlalchemy import create_engine as _create_engine 1ab

7from sqlalchemy.ext.asyncio import AsyncEngine, async_sessionmaker 1ab

8from sqlalchemy.ext.asyncio import AsyncSession as _AsyncSession 1ab

9from sqlalchemy.ext.asyncio import ( 1ab

10 create_async_engine as _create_async_engine, 

11) 

12from sqlalchemy.orm import Session, sessionmaker 1ab

13 

14from ..extensions.sqlalchemy import sql 1ab

15 

16AsyncReadSession = NewType("AsyncReadSession", _AsyncSession) 1ab

17""" 1ab

18A type alias for read-only database sessions. 

19 

20This creates a distinct type from AsyncSession that can be used to enforce 

21read-only operations in the type system. While functionally identical to 

22AsyncSession, using AsyncReadSession signals intent for read-only database 

23access and helps prevent accidental writes in read-only contexts. 

24""" 

25 

26AsyncSession = NewType("AsyncSession", AsyncReadSession) 1ab

27""" 1ab

28A type alias for read-write database sessions. 

29 

30This creates a distinct type from AsyncReadSession that can be used to enforce 

31read-write operations in the type system. While functionally identical to 

32AsyncReadSession, using AsyncSession signals intent for read-write database 

33access and helps prevent accidental reads in write-only contexts. 

34""" 

35 

36 

37def _json_obj_serializer(obj: Any) -> Any: 1ab

38 if isinstance(obj, Decimal): 

39 return float(obj) 

40 raise TypeError(f"Object of type {type(obj)} is not JSON serializable") 

41 

42 

43def json_serializer(obj: Any) -> str: 1ab

44 return json.dumps(obj, default=_json_obj_serializer) 

45 

46 

47def create_async_engine( 1ab

48 *, 

49 dsn: str, 

50 application_name: str | None = None, 

51 pool_size: int | None = None, 

52 pool_recycle: int | None = None, 

53 command_timeout: float | None = None, 

54 debug: bool = False, 

55) -> AsyncEngine: 

56 connect_args: dict[str, Any] = {} 

57 if application_name is not None: 57 ↛ 59line 57 didn't jump to line 59 because the condition on line 57 was always true

58 connect_args["server_settings"] = {"application_name": application_name} 

59 if command_timeout is not None: 59 ↛ 62line 59 didn't jump to line 62 because the condition on line 59 was always true

60 connect_args["command_timeout"] = command_timeout 

61 

62 return _create_async_engine( 

63 dsn, 

64 echo=debug, 

65 connect_args=connect_args, 

66 pool_size=pool_size, 

67 pool_recycle=pool_recycle, 

68 json_serializer=json_serializer, 

69 ) 

70 

71 

72def create_sync_engine( 1ab

73 *, 

74 dsn: str, 

75 application_name: str | None = None, 

76 pool_size: int | None = None, 

77 pool_recycle: int | None = None, 

78 command_timeout: float | None = None, 

79 debug: bool = False, 

80) -> Engine: 

81 connect_args: dict[str, Any] = {} 

82 if application_name is not None: 82 ↛ 84line 82 didn't jump to line 84 because the condition on line 82 was always true

83 connect_args["application_name"] = application_name 

84 if command_timeout is not None: 84 ↛ 86line 84 didn't jump to line 86 because the condition on line 84 was always true

85 connect_args["options"] = f"-c statement_timeout={int(command_timeout * 1000)}" 

86 return _create_engine( 

87 dsn, 

88 echo=debug, 

89 connect_args=connect_args, 

90 pool_size=pool_size, 

91 pool_recycle=pool_recycle, 

92 ) 

93 

94 

95AsyncSessionMaker: TypeAlias = async_sessionmaker[AsyncSession] 1ab

96AsyncReadSessionMaker: TypeAlias = async_sessionmaker[AsyncReadSession] 1ab

97 

98 

99def create_async_sessionmaker(engine: AsyncEngine) -> async_sessionmaker[AsyncSession]: 1ab

100 return async_sessionmaker(engine, expire_on_commit=False, class_=_AsyncSession) # type: ignore[return-value] 

101 

102 

103SyncSessionMaker: TypeAlias = sessionmaker[Session] 1ab

104 

105 

106def create_sync_sessionmaker(engine: Engine) -> sessionmaker[Session]: 1ab

107 return sessionmaker(engine, expire_on_commit=False) 

108 

109 

110__all__ = [ 1ab

111 "AsyncSession", 

112 "AsyncEngine", 

113 "AsyncReadSession", 

114 "Session", 

115 "Engine", 

116 "AsyncSessionMaker", 

117 "AsyncReadSessionMaker", 

118 "SyncSessionMaker", 

119 "create_async_engine", 

120 "create_sync_engine", 

121 "create_async_sessionmaker", 

122 "create_sync_sessionmaker", 

123 "sql", 

124]