Coverage for polar/postgres.py: 84%
43 statements
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
« prev ^ index » next coverage.py v7.10.6, created at 2025-12-05 16:17 +0000
1from collections.abc import AsyncGenerator 1ab
2from typing import Literal, TypeAlias 1ab
4from fastapi import Request 1ab
5from starlette.types import ASGIApp, Receive, Scope, Send 1ab
7from polar.config import settings 1ab
8from polar.kit.db.postgres import ( 1ab
9 AsyncEngine,
10 AsyncReadSession,
11 AsyncReadSessionMaker,
12 AsyncSession,
13 AsyncSessionMaker,
14 Engine,
15 sql,
16)
17from polar.kit.db.postgres import create_async_engine as _create_async_engine 1ab
18from polar.kit.db.postgres import create_sync_engine as _create_sync_engine 1ab
20ProcessName: TypeAlias = Literal["app", "worker", "scheduler", "script"] 1ab
23def create_async_engine(process_name: ProcessName) -> AsyncEngine: 1ab
24 return _create_async_engine(
25 dsn=str(settings.get_postgres_dsn("asyncpg")),
26 application_name=f"{settings.ENV.value}.{process_name}",
27 debug=settings.SQLALCHEMY_DEBUG,
28 pool_size=settings.DATABASE_POOL_SIZE,
29 pool_recycle=settings.DATABASE_POOL_RECYCLE_SECONDS,
30 command_timeout=settings.DATABASE_COMMAND_TIMEOUT_SECONDS,
31 )
34def create_async_read_engine(process_name: ProcessName) -> AsyncEngine: 1ab
35 return _create_async_engine(
36 dsn=str(settings.get_postgres_read_dsn("asyncpg")),
37 application_name=f"{settings.ENV.value}.{process_name}",
38 debug=settings.SQLALCHEMY_DEBUG,
39 pool_size=settings.DATABASE_POOL_SIZE,
40 pool_recycle=settings.DATABASE_POOL_RECYCLE_SECONDS,
41 command_timeout=settings.DATABASE_COMMAND_TIMEOUT_SECONDS,
42 )
45def create_sync_engine(process_name: ProcessName) -> Engine: 1ab
46 return _create_sync_engine(
47 dsn=str(settings.get_postgres_dsn("psycopg2")),
48 application_name=f"{settings.ENV.value}.{process_name}",
49 debug=settings.SQLALCHEMY_DEBUG,
50 pool_size=settings.DATABASE_SYNC_POOL_SIZE,
51 pool_recycle=settings.DATABASE_POOL_RECYCLE_SECONDS,
52 command_timeout=settings.DATABASE_COMMAND_TIMEOUT_SECONDS,
53 )
56class AsyncSessionMiddleware: 1ab
57 def __init__(self, app: ASGIApp) -> None: 1ab
58 self.app = app
60 async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None: 1ab
61 if scope["type"] not in ("http", "websocket"): 1dc
62 return await self.app(scope, receive, send)
64 sessionmaker: AsyncSessionMaker = scope["state"]["async_sessionmaker"] 1dc
65 async with sessionmaker() as session: 1dc
66 scope["state"]["async_session"] = session 1dc
67 await self.app(scope, receive, send) 1dc
70async def get_db_sessionmaker(request: Request) -> AsyncSessionMaker: 1ab
71 return request.state.async_sessionmaker
74async def get_db_session(request: Request) -> AsyncGenerator[AsyncSession]: 1ab
75 try: 1dc
76 session = request.state.async_session 1dc
77 except AttributeError as e:
78 raise RuntimeError(
79 "Session is not present in the request state. "
80 "Did you forget to add AsyncSessionMiddleware?"
81 ) from e
83 try: 1dc
84 yield session 1dc
85 except: 1d
86 await session.rollback() 1d
87 raise
88 else:
89 await session.commit() 1dc
92async def get_db_read_session(request: Request) -> AsyncGenerator[AsyncReadSession]: 1ab
93 sessionmaker: AsyncReadSessionMaker = request.state.async_read_sessionmaker
94 async with sessionmaker() as session:
95 yield session
98__all__ = [ 1ab
99 "AsyncEngine",
100 "AsyncSession",
101 "AsyncReadSession",
102 "sql",
103 "create_async_engine",
104 "create_async_read_engine",
105 "create_sync_engine",
106 "get_db_session",
107 "get_db_read_session",
108 "get_db_sessionmaker",
109]