Coverage for src / mcp_server_langgraph / health / database_checks.py: 98%
111 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Database Structure Validation Module
4Validates that all required databases exist with correct schema according to ADR-0056.
6This module provides runtime validation of the database architecture to ensure:
71. All required databases exist (gdpr, openfga, keycloak)
82. Environment-specific naming is correct (dev vs test)
93. Required tables exist in each database
104. Schema migrations have been applied correctly
12References:
13 - ADR-0056: Database Architecture and Naming Convention
14 - migrations/000_init_databases.sh: Database initialization script
15 - migrations/001_gdpr_schema.sql: GDPR schema definition
16"""
18import os
19from dataclasses import dataclass
20from enum import Enum
21from typing import Any
23import asyncpg
25from mcp_server_langgraph.observability.telemetry import logger
28class Environment(str, Enum):
29 """Environment type enum"""
31 DEV = "dev"
32 TEST = "test"
33 STAGING = "staging"
34 PRODUCTION = "production"
37@dataclass
38class DatabaseInfo:
39 """Information about a required database"""
41 name: str
42 purpose: str
43 required_tables: list[str]
44 managed_by: str # "migrations" or service name (e.g., "openfga", "keycloak")
47class DatabaseValidator:
48 """
49 Database architecture validator.
51 Validates that the PostgreSQL instance has the correct database structure
52 according to ADR-0056.
54 Usage:
55 validator = DatabaseValidator(
56 host="localhost",
57 port=5432,
58 user="postgres",
59 password="postgres"
60 )
62 result = await validator.validate()
63 if result.is_valid:
64 print("✅ Database structure is valid")
65 else:
66 print(f"❌ Validation failed: {result.errors}")
67 """
69 def __init__(
70 self,
71 host: str,
72 port: int,
73 user: str,
74 password: str,
75 environment: Environment | None = None,
76 ):
77 """
78 Initialize database validator.
80 Args:
81 host: PostgreSQL host
82 port: PostgreSQL port
83 user: PostgreSQL username
84 password: PostgreSQL password
85 environment: Environment type (auto-detected if None)
86 """
87 self.host = host
88 self.port = port
89 self.user = user
90 self.password = password
91 self.environment = environment or self._detect_environment()
93 def _detect_environment(self) -> Environment:
94 """
95 Detect environment from environment variables.
97 Detection logic:
98 1. Check ENVIRONMENT env var
99 2. Check TESTING env var
100 3. Check POSTGRES_DB suffix
101 4. Default to DEV
103 Returns:
104 Detected environment type
105 """
106 # Check explicit environment variable
107 env = os.getenv("ENVIRONMENT", "").lower()
108 if env in [e.value for e in Environment]:
109 return Environment(env)
111 # Check if running in test mode
112 if os.getenv("TESTING") == "true" or os.getenv("PYTEST_CURRENT_TEST"):
113 return Environment.TEST
115 # Check POSTGRES_DB for test suffix
116 postgres_db = os.getenv("POSTGRES_DB", "")
117 if postgres_db.endswith("_test") or postgres_db == "openfga_test":
118 return Environment.TEST
120 # Default to development
121 return Environment.DEV
123 def get_expected_databases(self) -> dict[str, DatabaseInfo]:
124 """
125 Get expected databases based on environment.
127 Returns:
128 Dictionary mapping database names to DatabaseInfo objects
129 """
130 # Determine database name suffix based on environment
131 suffix = "_test" if self.environment == Environment.TEST else ""
133 return {
134 f"gdpr{suffix}": DatabaseInfo(
135 name=f"gdpr{suffix}",
136 purpose="GDPR compliance storage (user profiles, consents, audit logs)",
137 required_tables=["user_profiles", "user_preferences", "consent_records", "conversations", "audit_logs"],
138 managed_by="migrations",
139 ),
140 f"openfga{suffix}": DatabaseInfo(
141 name=f"openfga{suffix}",
142 purpose="OpenFGA authorization (relationship tuples, policies)",
143 required_tables=["tuple", "authorization_model", "store"],
144 managed_by="openfga",
145 ),
146 f"keycloak{suffix}": DatabaseInfo(
147 name=f"keycloak{suffix}",
148 purpose="Keycloak authentication (users, realms, clients)",
149 required_tables=["user_entity", "realm", "client"],
150 managed_by="keycloak",
151 ),
152 }
154 async def _check_database_exists(self, conn: asyncpg.Connection, db_name: str) -> bool:
155 """
156 Check if a database exists.
158 Args:
159 conn: PostgreSQL connection
160 db_name: Database name to check
162 Returns:
163 True if database exists, False otherwise
164 """
165 result = await conn.fetchval("SELECT 1 FROM pg_database WHERE datname = $1", db_name)
166 return bool(result == 1)
168 async def _check_table_exists(self, conn: asyncpg.Connection, db_name: str, table_name: str) -> bool:
169 """
170 Check if a table exists in a database.
172 Args:
173 conn: PostgreSQL connection (connected to the database)
174 db_name: Database name (for logging)
175 table_name: Table name to check
177 Returns:
178 True if table exists, False otherwise
179 """
180 result = await conn.fetchval(
181 """
182 SELECT 1 FROM information_schema.tables
183 WHERE table_name = $1
184 """,
185 table_name,
186 )
187 return bool(result == 1)
189 async def validate_database(self, db_info: DatabaseInfo) -> "DatabaseValidationResult":
190 """
191 Validate a single database.
193 Args:
194 db_info: Database information to validate
196 Returns:
197 Validation result for this database
198 """
199 errors: list[str] = []
200 warnings: list[str] = []
202 try:
203 # Connect to postgres database to check if target database exists
204 conn = await asyncpg.connect(
205 host=self.host,
206 port=self.port,
207 user=self.user,
208 password=self.password,
209 database="postgres",
210 )
212 try:
213 # Check if database exists
214 exists = await self._check_database_exists(conn, db_info.name)
215 if not exists:
216 errors.append(f"Database '{db_info.name}' does not exist")
217 return DatabaseValidationResult(
218 database_name=db_info.name,
219 exists=False,
220 tables_valid=False,
221 errors=errors,
222 warnings=warnings,
223 )
225 finally:
226 await conn.close()
228 # Connect to the database to check tables
229 db_conn = await asyncpg.connect(
230 host=self.host,
231 port=self.port,
232 user=self.user,
233 password=self.password,
234 database=db_info.name,
235 )
237 try:
238 # Check required tables
239 missing_tables = []
240 for table in db_info.required_tables:
241 exists = await self._check_table_exists(db_conn, db_info.name, table)
242 if not exists:
243 missing_tables.append(table)
245 if missing_tables:
246 if db_info.managed_by == "migrations":
247 errors.append(
248 f"Missing tables in '{db_info.name}': {', '.join(missing_tables)}. "
249 f"Schema migration may not have run correctly."
250 )
251 else:
252 warnings.append(
253 f"Missing tables in '{db_info.name}': {', '.join(missing_tables)}. "
254 f"This database is managed by {db_info.managed_by} - "
255 f"tables may not exist yet if service hasn't started."
256 )
258 return DatabaseValidationResult(
259 database_name=db_info.name,
260 exists=True,
261 tables_valid=len(missing_tables) == 0,
262 errors=errors,
263 warnings=warnings,
264 )
266 finally:
267 await db_conn.close()
269 except asyncpg.PostgresConnectionError as e:
270 errors.append(f"Failed to connect to PostgreSQL: {e}")
271 return DatabaseValidationResult(
272 database_name=db_info.name,
273 exists=False,
274 tables_valid=False,
275 errors=errors,
276 warnings=warnings,
277 )
278 except Exception as e:
279 errors.append(f"Unexpected error validating '{db_info.name}': {e}")
280 return DatabaseValidationResult(
281 database_name=db_info.name,
282 exists=False,
283 tables_valid=False,
284 errors=errors,
285 warnings=warnings,
286 )
288 async def validate(self) -> "ValidationResult":
289 """
290 Validate all required databases.
292 Returns:
293 Overall validation result
294 """
295 logger.info(f"Validating database architecture for {self.environment.value} environment")
297 expected_databases = self.get_expected_databases()
298 database_results = {}
300 for db_name, db_info in expected_databases.items():
301 result = await self.validate_database(db_info)
302 database_results[db_name] = result
304 # Aggregate results
305 all_errors = []
306 all_warnings = []
307 all_valid = True
309 for db_name, result in database_results.items():
310 all_errors.extend(result.errors)
311 all_warnings.extend(result.warnings)
312 if not result.is_valid:
313 all_valid = False
315 return ValidationResult(
316 environment=self.environment,
317 databases=database_results,
318 is_valid=all_valid,
319 errors=all_errors,
320 warnings=all_warnings,
321 )
324@dataclass
325class DatabaseValidationResult:
326 """Result of validating a single database"""
328 database_name: str
329 exists: bool
330 tables_valid: bool
331 errors: list[str]
332 warnings: list[str]
334 @property
335 def is_valid(self) -> bool:
336 """Check if database validation passed (no errors)"""
337 return self.exists and len(self.errors) == 0
340@dataclass
341class ValidationResult:
342 """Overall validation result for all databases"""
344 environment: Environment
345 databases: dict[str, DatabaseValidationResult]
346 is_valid: bool
347 errors: list[str]
348 warnings: list[str]
350 def to_dict(self) -> dict[str, Any]:
351 """Convert to dictionary for serialization"""
352 return {
353 "environment": self.environment.value,
354 "is_valid": self.is_valid,
355 "databases": {
356 name: {
357 "exists": result.exists,
358 "tables_valid": result.tables_valid,
359 "errors": result.errors,
360 "warnings": result.warnings,
361 }
362 for name, result in self.databases.items()
363 },
364 "errors": self.errors,
365 "warnings": self.warnings,
366 }
369async def validate_database_architecture(
370 host: str = "localhost",
371 port: int = 5432,
372 user: str = "postgres",
373 password: str = "postgres",
374) -> ValidationResult:
375 """
376 Convenience function to validate database architecture.
378 Args:
379 host: PostgreSQL host
380 port: PostgreSQL port
381 user: PostgreSQL username
382 password: PostgreSQL password
384 Returns:
385 Validation result
387 Example:
388 result = await validate_database_architecture()
389 if result.is_valid:
390 print("✅ All databases valid")
391 else:
392 for error in result.errors:
393 print(f"❌ {error}")
394 """
395 validator = DatabaseValidator(host=host, port=port, user=user, password=password)
396 return await validator.validate()