Coverage for src / mcp_server_langgraph / health / database_checks.py: 98%

111 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Database Structure Validation Module 

3 

4Validates that all required databases exist with correct schema according to ADR-0056. 

5 

6This module provides runtime validation of the database architecture to ensure: 

71. All required databases exist (gdpr, openfga, keycloak) 

82. Environment-specific naming is correct (dev vs test) 

93. Required tables exist in each database 

104. Schema migrations have been applied correctly 

11 

12References: 

13 - ADR-0056: Database Architecture and Naming Convention 

14 - migrations/000_init_databases.sh: Database initialization script 

15 - migrations/001_gdpr_schema.sql: GDPR schema definition 

16""" 

17 

18import os 

19from dataclasses import dataclass 

20from enum import Enum 

21from typing import Any 

22 

23import asyncpg 

24 

25from mcp_server_langgraph.observability.telemetry import logger 

26 

27 

28class Environment(str, Enum): 

29 """Environment type enum""" 

30 

31 DEV = "dev" 

32 TEST = "test" 

33 STAGING = "staging" 

34 PRODUCTION = "production" 

35 

36 

37@dataclass 

38class DatabaseInfo: 

39 """Information about a required database""" 

40 

41 name: str 

42 purpose: str 

43 required_tables: list[str] 

44 managed_by: str # "migrations" or service name (e.g., "openfga", "keycloak") 

45 

46 

47class DatabaseValidator: 

48 """ 

49 Database architecture validator. 

50 

51 Validates that the PostgreSQL instance has the correct database structure 

52 according to ADR-0056. 

53 

54 Usage: 

55 validator = DatabaseValidator( 

56 host="localhost", 

57 port=5432, 

58 user="postgres", 

59 password="postgres" 

60 ) 

61 

62 result = await validator.validate() 

63 if result.is_valid: 

64 print("✅ Database structure is valid") 

65 else: 

66 print(f"❌ Validation failed: {result.errors}") 

67 """ 

68 

69 def __init__( 

70 self, 

71 host: str, 

72 port: int, 

73 user: str, 

74 password: str, 

75 environment: Environment | None = None, 

76 ): 

77 """ 

78 Initialize database validator. 

79 

80 Args: 

81 host: PostgreSQL host 

82 port: PostgreSQL port 

83 user: PostgreSQL username 

84 password: PostgreSQL password 

85 environment: Environment type (auto-detected if None) 

86 """ 

87 self.host = host 

88 self.port = port 

89 self.user = user 

90 self.password = password 

91 self.environment = environment or self._detect_environment() 

92 

93 def _detect_environment(self) -> Environment: 

94 """ 

95 Detect environment from environment variables. 

96 

97 Detection logic: 

98 1. Check ENVIRONMENT env var 

99 2. Check TESTING env var 

100 3. Check POSTGRES_DB suffix 

101 4. Default to DEV 

102 

103 Returns: 

104 Detected environment type 

105 """ 

106 # Check explicit environment variable 

107 env = os.getenv("ENVIRONMENT", "").lower() 

108 if env in [e.value for e in Environment]: 

109 return Environment(env) 

110 

111 # Check if running in test mode 

112 if os.getenv("TESTING") == "true" or os.getenv("PYTEST_CURRENT_TEST"): 

113 return Environment.TEST 

114 

115 # Check POSTGRES_DB for test suffix 

116 postgres_db = os.getenv("POSTGRES_DB", "") 

117 if postgres_db.endswith("_test") or postgres_db == "openfga_test": 

118 return Environment.TEST 

119 

120 # Default to development 

121 return Environment.DEV 

122 

123 def get_expected_databases(self) -> dict[str, DatabaseInfo]: 

124 """ 

125 Get expected databases based on environment. 

126 

127 Returns: 

128 Dictionary mapping database names to DatabaseInfo objects 

129 """ 

130 # Determine database name suffix based on environment 

131 suffix = "_test" if self.environment == Environment.TEST else "" 

132 

133 return { 

134 f"gdpr{suffix}": DatabaseInfo( 

135 name=f"gdpr{suffix}", 

136 purpose="GDPR compliance storage (user profiles, consents, audit logs)", 

137 required_tables=["user_profiles", "user_preferences", "consent_records", "conversations", "audit_logs"], 

138 managed_by="migrations", 

139 ), 

140 f"openfga{suffix}": DatabaseInfo( 

141 name=f"openfga{suffix}", 

142 purpose="OpenFGA authorization (relationship tuples, policies)", 

143 required_tables=["tuple", "authorization_model", "store"], 

144 managed_by="openfga", 

145 ), 

146 f"keycloak{suffix}": DatabaseInfo( 

147 name=f"keycloak{suffix}", 

148 purpose="Keycloak authentication (users, realms, clients)", 

149 required_tables=["user_entity", "realm", "client"], 

150 managed_by="keycloak", 

151 ), 

152 } 

153 

154 async def _check_database_exists(self, conn: asyncpg.Connection, db_name: str) -> bool: 

155 """ 

156 Check if a database exists. 

157 

158 Args: 

159 conn: PostgreSQL connection 

160 db_name: Database name to check 

161 

162 Returns: 

163 True if database exists, False otherwise 

164 """ 

165 result = await conn.fetchval("SELECT 1 FROM pg_database WHERE datname = $1", db_name) 

166 return bool(result == 1) 

167 

168 async def _check_table_exists(self, conn: asyncpg.Connection, db_name: str, table_name: str) -> bool: 

169 """ 

170 Check if a table exists in a database. 

171 

172 Args: 

173 conn: PostgreSQL connection (connected to the database) 

174 db_name: Database name (for logging) 

175 table_name: Table name to check 

176 

177 Returns: 

178 True if table exists, False otherwise 

179 """ 

180 result = await conn.fetchval( 

181 """ 

182 SELECT 1 FROM information_schema.tables 

183 WHERE table_name = $1 

184 """, 

185 table_name, 

186 ) 

187 return bool(result == 1) 

188 

189 async def validate_database(self, db_info: DatabaseInfo) -> "DatabaseValidationResult": 

190 """ 

191 Validate a single database. 

192 

193 Args: 

194 db_info: Database information to validate 

195 

196 Returns: 

197 Validation result for this database 

198 """ 

199 errors: list[str] = [] 

200 warnings: list[str] = [] 

201 

202 try: 

203 # Connect to postgres database to check if target database exists 

204 conn = await asyncpg.connect( 

205 host=self.host, 

206 port=self.port, 

207 user=self.user, 

208 password=self.password, 

209 database="postgres", 

210 ) 

211 

212 try: 

213 # Check if database exists 

214 exists = await self._check_database_exists(conn, db_info.name) 

215 if not exists: 

216 errors.append(f"Database '{db_info.name}' does not exist") 

217 return DatabaseValidationResult( 

218 database_name=db_info.name, 

219 exists=False, 

220 tables_valid=False, 

221 errors=errors, 

222 warnings=warnings, 

223 ) 

224 

225 finally: 

226 await conn.close() 

227 

228 # Connect to the database to check tables 

229 db_conn = await asyncpg.connect( 

230 host=self.host, 

231 port=self.port, 

232 user=self.user, 

233 password=self.password, 

234 database=db_info.name, 

235 ) 

236 

237 try: 

238 # Check required tables 

239 missing_tables = [] 

240 for table in db_info.required_tables: 

241 exists = await self._check_table_exists(db_conn, db_info.name, table) 

242 if not exists: 

243 missing_tables.append(table) 

244 

245 if missing_tables: 

246 if db_info.managed_by == "migrations": 

247 errors.append( 

248 f"Missing tables in '{db_info.name}': {', '.join(missing_tables)}. " 

249 f"Schema migration may not have run correctly." 

250 ) 

251 else: 

252 warnings.append( 

253 f"Missing tables in '{db_info.name}': {', '.join(missing_tables)}. " 

254 f"This database is managed by {db_info.managed_by} - " 

255 f"tables may not exist yet if service hasn't started." 

256 ) 

257 

258 return DatabaseValidationResult( 

259 database_name=db_info.name, 

260 exists=True, 

261 tables_valid=len(missing_tables) == 0, 

262 errors=errors, 

263 warnings=warnings, 

264 ) 

265 

266 finally: 

267 await db_conn.close() 

268 

269 except asyncpg.PostgresConnectionError as e: 

270 errors.append(f"Failed to connect to PostgreSQL: {e}") 

271 return DatabaseValidationResult( 

272 database_name=db_info.name, 

273 exists=False, 

274 tables_valid=False, 

275 errors=errors, 

276 warnings=warnings, 

277 ) 

278 except Exception as e: 

279 errors.append(f"Unexpected error validating '{db_info.name}': {e}") 

280 return DatabaseValidationResult( 

281 database_name=db_info.name, 

282 exists=False, 

283 tables_valid=False, 

284 errors=errors, 

285 warnings=warnings, 

286 ) 

287 

288 async def validate(self) -> "ValidationResult": 

289 """ 

290 Validate all required databases. 

291 

292 Returns: 

293 Overall validation result 

294 """ 

295 logger.info(f"Validating database architecture for {self.environment.value} environment") 

296 

297 expected_databases = self.get_expected_databases() 

298 database_results = {} 

299 

300 for db_name, db_info in expected_databases.items(): 

301 result = await self.validate_database(db_info) 

302 database_results[db_name] = result 

303 

304 # Aggregate results 

305 all_errors = [] 

306 all_warnings = [] 

307 all_valid = True 

308 

309 for db_name, result in database_results.items(): 

310 all_errors.extend(result.errors) 

311 all_warnings.extend(result.warnings) 

312 if not result.is_valid: 

313 all_valid = False 

314 

315 return ValidationResult( 

316 environment=self.environment, 

317 databases=database_results, 

318 is_valid=all_valid, 

319 errors=all_errors, 

320 warnings=all_warnings, 

321 ) 

322 

323 

324@dataclass 

325class DatabaseValidationResult: 

326 """Result of validating a single database""" 

327 

328 database_name: str 

329 exists: bool 

330 tables_valid: bool 

331 errors: list[str] 

332 warnings: list[str] 

333 

334 @property 

335 def is_valid(self) -> bool: 

336 """Check if database validation passed (no errors)""" 

337 return self.exists and len(self.errors) == 0 

338 

339 

340@dataclass 

341class ValidationResult: 

342 """Overall validation result for all databases""" 

343 

344 environment: Environment 

345 databases: dict[str, DatabaseValidationResult] 

346 is_valid: bool 

347 errors: list[str] 

348 warnings: list[str] 

349 

350 def to_dict(self) -> dict[str, Any]: 

351 """Convert to dictionary for serialization""" 

352 return { 

353 "environment": self.environment.value, 

354 "is_valid": self.is_valid, 

355 "databases": { 

356 name: { 

357 "exists": result.exists, 

358 "tables_valid": result.tables_valid, 

359 "errors": result.errors, 

360 "warnings": result.warnings, 

361 } 

362 for name, result in self.databases.items() 

363 }, 

364 "errors": self.errors, 

365 "warnings": self.warnings, 

366 } 

367 

368 

369async def validate_database_architecture( 

370 host: str = "localhost", 

371 port: int = 5432, 

372 user: str = "postgres", 

373 password: str = "postgres", 

374) -> ValidationResult: 

375 """ 

376 Convenience function to validate database architecture. 

377 

378 Args: 

379 host: PostgreSQL host 

380 port: PostgreSQL port 

381 user: PostgreSQL username 

382 password: PostgreSQL password 

383 

384 Returns: 

385 Validation result 

386 

387 Example: 

388 result = await validate_database_architecture() 

389 if result.is_valid: 

390 print("✅ All databases valid") 

391 else: 

392 for error in result.errors: 

393 print(f"❌ {error}") 

394 """ 

395 validator = DatabaseValidator(host=host, port=port, user=user, password=password) 

396 return await validator.validate()