Coverage for src / mcp_server_langgraph / database / session.py: 100%

42 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Database session management for async PostgreSQL connections. 

3 

4This module provides async SQLAlchemy session management with connection pooling 

5and automatic cleanup. 

6""" 

7 

8import logging 

9from collections.abc import AsyncGenerator 

10from contextlib import asynccontextmanager 

11 

12from sqlalchemy.ext.asyncio import AsyncEngine, AsyncSession, async_sessionmaker, create_async_engine 

13 

14from mcp_server_langgraph.database.models import Base 

15 

16logger = logging.getLogger(__name__) 

17 

18# Global engine instance (lazy-initialized) 

19_engine: AsyncEngine | None = None 

20_async_session_maker: async_sessionmaker[AsyncSession] | None = None 

21 

22 

23def get_engine(database_url: str, echo: bool = False) -> AsyncEngine: 

24 """ 

25 Get or create the async database engine. 

26 

27 Args: 

28 database_url: PostgreSQL connection URL (must use asyncpg driver) 

29 Example: postgresql+asyncpg://user:pass@localhost/dbname 

30 echo: Whether to echo SQL statements (for debugging) 

31 

32 Returns: 

33 AsyncEngine instance 

34 

35 Example: 

36 >>> engine = get_engine("postgresql+asyncpg://postgres:postgres@localhost/cost_tracking") 

37 """ 

38 global _engine 

39 

40 if _engine is None: 

41 logger.info(f"Creating async database engine: {database_url.split('@')[-1]}") 

42 

43 _engine = create_async_engine( 

44 database_url, 

45 echo=echo, 

46 pool_size=10, # Connection pool size 

47 max_overflow=20, # Max additional connections beyond pool_size 

48 pool_pre_ping=True, # Verify connections before using 

49 pool_recycle=3600, # Recycle connections after 1 hour 

50 ) 

51 

52 return _engine 

53 

54 

55def get_session_maker(database_url: str, echo: bool = False) -> async_sessionmaker[AsyncSession]: 

56 """ 

57 Get or create the async session maker. 

58 

59 Args: 

60 database_url: PostgreSQL connection URL 

61 echo: Whether to echo SQL statements 

62 

63 Returns: 

64 async_sessionmaker instance 

65 """ 

66 global _async_session_maker 

67 

68 if _async_session_maker is None: 

69 engine = get_engine(database_url, echo) 

70 _async_session_maker = async_sessionmaker( 

71 engine, 

72 class_=AsyncSession, 

73 expire_on_commit=False, # Don't expire objects after commit 

74 autoflush=False, # Manual flush control 

75 autocommit=False, # Manual commit control 

76 ) 

77 

78 return _async_session_maker 

79 

80 

81@asynccontextmanager 

82async def get_async_session(database_url: str, echo: bool = False) -> AsyncGenerator[AsyncSession, None]: 

83 """ 

84 Async context manager for database sessions. 

85 

86 Automatically handles session lifecycle: 

87 - Creates session 

88 - Commits on success 

89 - Rolls back on exception 

90 - Closes session 

91 

92 Args: 

93 database_url: PostgreSQL connection URL 

94 echo: Whether to echo SQL statements 

95 

96 Yields: 

97 AsyncSession instance 

98 

99 Example: 

100 >>> async with get_async_session(db_url) as session: 

101 ... result = await session.execute(select(TokenUsageRecord)) 

102 ... records = result.scalars().all() 

103 """ 

104 session_maker = get_session_maker(database_url, echo) 

105 async with session_maker() as session: 

106 try: 

107 yield session 

108 await session.commit() 

109 except Exception: 

110 await session.rollback() 

111 raise 

112 finally: 

113 await session.close() 

114 

115 

116async def init_database(database_url: str, echo: bool = False) -> None: 

117 """ 

118 Initialize the database schema. 

119 

120 Creates all tables defined in models if they don't exist. 

121 This is idempotent - safe to call multiple times. 

122 

123 Args: 

124 database_url: PostgreSQL connection URL 

125 echo: Whether to echo SQL statements 

126 

127 Example: 

128 >>> await init_database("postgresql+asyncpg://postgres:postgres@localhost/cost_tracking") 

129 """ 

130 engine = get_engine(database_url, echo) 

131 

132 logger.info("Initializing database schema...") 

133 

134 async with engine.begin() as conn: 

135 # Create all tables 

136 await conn.run_sync(Base.metadata.create_all) 

137 

138 logger.info("Database schema initialized successfully") 

139 

140 

141async def cleanup_database() -> None: 

142 """ 

143 Cleanup database resources. 

144 

145 Closes all connections and disposes the engine. 

146 Should be called on application shutdown. 

147 """ 

148 global _engine, _async_session_maker 

149 

150 if _engine is not None: 

151 logger.info("Cleaning up database connections...") 

152 await _engine.dispose() 

153 _engine = None 

154 _async_session_maker = None 

155 logger.info("Database cleanup complete")