Coverage for src / mcp_server_langgraph / database / session.py: 100%
42 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Database session management for async PostgreSQL connections.
4This module provides async SQLAlchemy session management with connection pooling
5and automatic cleanup.
6"""
8import logging
9from collections.abc import AsyncGenerator
10from contextlib import asynccontextmanager
12from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine
14from mcp_server_langgraph.database.models import Base
16logger = logging.getLogger(__name__)
18# Global engine instance (lazy-initialized)
19_engine: AsyncEngine | None = None
20_async_session_maker: async_sessionmaker[AsyncSession] | None = None
23def get_engine(database_url: str, echo: bool = False) -> AsyncEngine:
24 """
25 Get or create the async database engine.
27 Args:
28 database_url: PostgreSQL connection URL (must use asyncpg driver)
29 Example: postgresql+asyncpg://user:pass@localhost/dbname
30 echo: Whether to echo SQL statements (for debugging)
32 Returns:
33 AsyncEngine instance
35 Example:
36 >>> engine = get_engine("postgresql+asyncpg://postgres:postgres@localhost/cost_tracking")
37 """
38 global _engine
40 if _engine is None:
41 logger.info(f"Creating async database engine: {database_url.split('@')[-1]}")
43 _engine = create_async_engine(
44 database_url,
45 echo=echo,
46 pool_size=10, # Connection pool size
47 max_overflow=20, # Max additional connections beyond pool_size
48 pool_pre_ping=True, # Verify connections before using
49 pool_recycle=3600, # Recycle connections after 1 hour
50 )
52 return _engine
55def get_session_maker(database_url: str, echo: bool = False) -> async_sessionmaker[AsyncSession]:
56 """
57 Get or create the async session maker.
59 Args:
60 database_url: PostgreSQL connection URL
61 echo: Whether to echo SQL statements
63 Returns:
64 async_sessionmaker instance
65 """
66 global _async_session_maker
68 if _async_session_maker is None:
69 engine = get_engine(database_url, echo)
70 _async_session_maker = async_sessionmaker(
71 engine,
72 class_=AsyncSession,
73 expire_on_commit=False, # Don't expire objects after commit
74 autoflush=False, # Manual flush control
75 autocommit=False, # Manual commit control
76 )
78 return _async_session_maker
81@asynccontextmanager
82async def get_async_session(database_url: str, echo: bool = False) -> AsyncGenerator[AsyncSession, None]:
83 """
84 Async context manager for database sessions.
86 Automatically handles session lifecycle:
87 - Creates session
88 - Commits on success
89 - Rolls back on exception
90 - Closes session
92 Args:
93 database_url: PostgreSQL connection URL
94 echo: Whether to echo SQL statements
96 Yields:
97 AsyncSession instance
99 Example:
100 >>> async with get_async_session(db_url) as session:
101 ... result = await session.execute(select(TokenUsageRecord))
102 ... records = result.scalars().all()
103 """
104 session_maker = get_session_maker(database_url, echo)
105 async with session_maker() as session:
106 try:
107 yield session
108 await session.commit()
109 except Exception:
110 await session.rollback()
111 raise
112 finally:
113 await session.close()
116async def init_database(database_url: str, echo: bool = False) -> None:
117 """
118 Initialize the database schema.
120 Creates all tables defined in models if they don't exist.
121 This is idempotent - safe to call multiple times.
123 Args:
124 database_url: PostgreSQL connection URL
125 echo: Whether to echo SQL statements
127 Example:
128 >>> await init_database("postgresql+asyncpg://postgres:postgres@localhost/cost_tracking")
129 """
130 engine = get_engine(database_url, echo)
132 logger.info("Initializing database schema...")
134 async with engine.begin() as conn:
135 # Create all tables
136 await conn.run_sync(Base.metadata.create_all)
138 logger.info("Database schema initialized successfully")
141async def cleanup_database() -> None:
142 """
143 Cleanup database resources.
145 Closes all connections and disposes the engine.
146 Should be called on application shutdown.
147 """
148 global _engine, _async_session_maker
150 if _engine is not None:
151 logger.info("Cleaning up database connections...")
152 await _engine.dispose()
153 _engine = None
154 _async_session_maker = None
155 logger.info("Database cleanup complete")