Coverage for polar/postgres.py: 84%

43 statements  

« prev     ^ index     » next       coverage.py v7.10.6, created at 2025-12-05 15:52 +0000

1from collections.abc import AsyncGenerator 1ab

2from typing import Literal, TypeAlias 1ab

3 

4from fastapi import Request 1ab

5from starlette.types import ASGIApp, Receive, Scope, Send 1ab

6 

7from polar.config import settings 1ab

8from polar.kit.db.postgres import ( 1ab

9 AsyncEngine, 

10 AsyncReadSession, 

11 AsyncReadSessionMaker, 

12 AsyncSession, 

13 AsyncSessionMaker, 

14 Engine, 

15 sql, 

16) 

17from polar.kit.db.postgres import create_async_engine as _create_async_engine 1ab

18from polar.kit.db.postgres import create_sync_engine as _create_sync_engine 1ab

19 

20ProcessName: TypeAlias = Literal["app", "worker", "scheduler", "script"] 1ab

21 

22 

23def create_async_engine(process_name: ProcessName) -> AsyncEngine: 1ab

24 return _create_async_engine( 

25 dsn=str(settings.get_postgres_dsn("asyncpg")), 

26 application_name=f"{settings.ENV.value}.{process_name}", 

27 debug=settings.SQLALCHEMY_DEBUG, 

28 pool_size=settings.DATABASE_POOL_SIZE, 

29 pool_recycle=settings.DATABASE_POOL_RECYCLE_SECONDS, 

30 command_timeout=settings.DATABASE_COMMAND_TIMEOUT_SECONDS, 

31 ) 

32 

33 

34def create_async_read_engine(process_name: ProcessName) -> AsyncEngine: 1ab

35 return _create_async_engine( 

36 dsn=str(settings.get_postgres_read_dsn("asyncpg")), 

37 application_name=f"{settings.ENV.value}.{process_name}", 

38 debug=settings.SQLALCHEMY_DEBUG, 

39 pool_size=settings.DATABASE_POOL_SIZE, 

40 pool_recycle=settings.DATABASE_POOL_RECYCLE_SECONDS, 

41 command_timeout=settings.DATABASE_COMMAND_TIMEOUT_SECONDS, 

42 ) 

43 

44 

45def create_sync_engine(process_name: ProcessName) -> Engine: 1ab

46 return _create_sync_engine( 

47 dsn=str(settings.get_postgres_dsn("psycopg2")), 

48 application_name=f"{settings.ENV.value}.{process_name}", 

49 debug=settings.SQLALCHEMY_DEBUG, 

50 pool_size=settings.DATABASE_SYNC_POOL_SIZE, 

51 pool_recycle=settings.DATABASE_POOL_RECYCLE_SECONDS, 

52 command_timeout=settings.DATABASE_COMMAND_TIMEOUT_SECONDS, 

53 ) 

54 

55 

56class AsyncSessionMiddleware: 1ab

57 def __init__(self, app: ASGIApp) -> None: 1ab

58 self.app = app 

59 

60 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 1ab

61 if scope["type"] not in ("http", "websocket"): 1dc

62 return await self.app(scope, receive, send) 

63 

64 sessionmaker: AsyncSessionMaker = scope["state"]["async_sessionmaker"] 1dc

65 async with sessionmaker() as session: 1dc

66 scope["state"]["async_session"] = session 1dc

67 await self.app(scope, receive, send) 1dc

68 

69 

70async def get_db_sessionmaker(request: Request) -> AsyncSessionMaker: 1ab

71 return request.state.async_sessionmaker 

72 

73 

74async def get_db_session(request: Request) -> AsyncGenerator[AsyncSession]: 1ab

75 try: 1dc

76 session = request.state.async_session 1dc

77 except AttributeError as e: 

78 raise RuntimeError( 

79 "Session is not present in the request state. " 

80 "Did you forget to add AsyncSessionMiddleware?" 

81 ) from e 

82 

83 try: 1dc

84 yield session 1dc

85 except: 1d

86 await session.rollback() 1d

87 raise 

88 else: 

89 await session.commit() 1dc

90 

91 

92async def get_db_read_session(request: Request) -> AsyncGenerator[AsyncReadSession]: 1ab

93 sessionmaker: AsyncReadSessionMaker = request.state.async_read_sessionmaker 

94 async with sessionmaker() as session: 

95 yield session 

96 

97 

98__all__ = [ 1ab

99 "AsyncEngine", 

100 "AsyncSession", 

101 "AsyncReadSession", 

102 "sql", 

103 "create_async_engine", 

104 "create_async_read_engine", 

105 "create_sync_engine", 

106 "get_db_session", 

107 "get_db_read_session", 

108 "get_db_sessionmaker", 

109]