Coverage for src / mcp_server_langgraph / api / health.py: 90%

102 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Health Check and Startup Validation 

3 

4Provides health check endpoint and startup validation to ensure all critical 

5systems are properly initialized before the app accepts requests. 

6 

7This module prevents the classes of issues found in OpenAI Codex audit from recurring. 

8""" 

9 

10from fastapi import APIRouter, status 

11from pydantic import BaseModel 

12 

13from mcp_server_langgraph.core.config import settings 

14from mcp_server_langgraph.observability.telemetry import logger 

15 

16router = APIRouter(prefix="/api/v1/health", tags=["health"]) 

17 

18 

19class HealthCheckResult(BaseModel): 

20 """Health check result model""" 

21 

22 status: str 

23 checks: dict[str, bool] 

24 errors: list[str] 

25 warnings: list[str] 

26 

27 

28class SystemValidationError(Exception): 

29 """Raised when critical system validation fails at startup""" 

30 

31 

32def validate_observability_initialized() -> tuple[bool, str]: 

33 """ 

34 Validate that observability system is properly initialized. 

35 

36 Returns: 

37 Tuple of (is_healthy, message) 

38 

39 Related to: OpenAI Codex Finding #2 - Observability not initialized 

40 """ 

41 try: 

42 # Test that logger is usable 

43 logger.debug("Observability health check") 

44 return True, "Observability initialized and functional" 

45 except RuntimeError as e: 

46 return False, f"Observability not initialized: {e}" 

47 

48 

49def validate_session_store_registered() -> tuple[bool, str]: 

50 """ 

51 Validate that session store is properly registered globally. 

52 

53 Returns: 

54 Tuple of (is_healthy, message) 

55 

56 Related to: OpenAI Codex Finding #3 - Session storage miswired 

57 """ 

58 if settings.auth_mode != "session": 

59 return True, "Session auth not enabled (token mode)" 

60 

61 try: 

62 from mcp_server_langgraph.auth.session import get_session_store 

63 

64 session_store = get_session_store() 

65 

66 # Note: get_session_store() always returns SessionStore (never None per type signature) 

67 # If it were to return None, it would have raised an error already in dependency injection 

68 

69 # Check if we're using the fallback (warning in logs indicates this) 

70 store_type = type(session_store).__name__ 

71 

72 # Expected: RedisSessionStore if redis configured, InMemorySessionStore if memory configured 

73 if settings.session_backend == "redis" and store_type != "RedisSessionStore": 

74 return False, f"Expected RedisSessionStore, got {store_type} (fallback detected)" 

75 

76 return True, f"Session store registered: {store_type}" 

77 except Exception as e: 

78 return False, f"Session store validation failed: {e}" 

79 

80 

81def validate_api_key_cache_configured() -> tuple[bool, str]: 

82 """ 

83 Validate that API key cache is properly configured if enabled. 

84 

85 Returns: 

86 Tuple of (is_healthy, message) 

87 

88 Related to: OpenAI Codex Finding #5 - Redis API key caching not used 

89 """ 

90 if not settings.api_key_cache_enabled: 

91 return True, "API key caching disabled by configuration" 

92 

93 if not settings.redis_url: 

94 return True, "API key caching disabled (no redis_url configured)" 

95 

96 # We can't easily check the singleton without triggering initialization 

97 # Instead, we validate the configuration is consistent 

98 warnings = [] 

99 

100 if settings.api_key_cache_ttl <= 0: 

101 warnings.append(f"Cache TTL is {settings.api_key_cache_ttl}, should be > 0") 

102 

103 if warnings: 

104 return False, f"API key cache configuration issues: {', '.join(warnings)}" 

105 

106 return True, "API key caching properly configured" 

107 

108 

109def validate_docker_sandbox_security() -> tuple[bool, str]: 

110 """ 

111 Validate that Docker sandbox has proper security configuration. 

112 

113 Returns: 

114 Tuple of (is_healthy, message) 

115 

116 Related to: OpenAI Codex Finding #4 - Docker sandbox security 

117 """ 

118 # This is informational - we can't check Docker runtime config without creating a container 

119 warnings = [] 

120 

121 # Check if network allowlist is being used (not fully implemented) 

122 if hasattr(settings, "sandbox_network_mode") and settings.sandbox_network_mode == "allowlist": 122 ↛ 123line 122 didn't jump to line 123 because the condition on line 122 was never true

123 warnings.append("Network allowlist mode is not fully implemented - using unrestricted bridge network") 

124 

125 if warnings: 125 ↛ 126line 125 didn't jump to line 126 because the condition on line 125 was never true

126 return True, f"Docker sandbox warnings: {', '.join(warnings)}" 

127 

128 return True, "Docker sandbox security checks not applicable (runtime validation required)" 

129 

130 

131def validate_database_connectivity() -> tuple[bool, str]: 

132 """ 

133 Validate that PostgreSQL database is accessible. 

134 

135 Returns: 

136 Tuple of (is_healthy, message) 

137 

138 Related to: PostgreSQL dependency chain validation 

139 """ 

140 import asyncio 

141 

142 from mcp_server_langgraph.infrastructure.database import check_database_connectivity 

143 

144 # Parse the postgres URL from settings 

145 postgres_url = settings.gdpr_postgres_url 

146 

147 logger.debug(f"Validating database connectivity to {postgres_url.split('@')[-1]}") 

148 

149 # Run the async check synchronously 

150 try: 

151 return asyncio.run(check_database_connectivity(postgres_url, timeout=5.0)) 

152 except RuntimeError as e: 

153 # If we're already in an event loop (shouldn't happen in startup) 

154 if "cannot be called from a running event loop" in str(e): 

155 return False, "Database validation failed: already in event loop" 

156 raise 

157 

158 

159async def validate_database_connectivity_async() -> tuple[bool, str]: 

160 """ 

161 Validate that PostgreSQL database is accessible (async version). 

162 

163 Returns: 

164 Tuple of (is_healthy, message) 

165 """ 

166 from mcp_server_langgraph.infrastructure.database import check_database_connectivity 

167 

168 # Parse the postgres URL from settings 

169 postgres_url = settings.gdpr_postgres_url 

170 

171 logger.debug(f"Validating database connectivity to {postgres_url.split('@')[-1]}") 

172 

173 return await check_database_connectivity(postgres_url, timeout=5.0) 

174 

175 

176def run_startup_validation() -> None: 

177 """ 

178 Run all startup validations and raise SystemValidationError if critical checks fail. 

179 ... 

180 """ 

181 checks = { 

182 "observability": validate_observability_initialized(), 

183 "session_store": validate_session_store_registered(), 

184 "api_key_cache": validate_api_key_cache_configured(), 

185 "docker_sandbox": validate_docker_sandbox_security(), 

186 "database_connectivity": validate_database_connectivity(), 

187 } 

188 _process_validation_results(checks) 

189 

190 

191async def run_startup_validation_async() -> None: 

192 """ 

193 Run all startup validations asynchronously. 

194 """ 

195 checks = { 

196 "observability": validate_observability_initialized(), 

197 "session_store": validate_session_store_registered(), 

198 "api_key_cache": validate_api_key_cache_configured(), 

199 "docker_sandbox": validate_docker_sandbox_security(), 

200 "database_connectivity": await validate_database_connectivity_async(), 

201 } 

202 _process_validation_results(checks) 

203 

204 

205def _process_validation_results(checks: dict[str, tuple[bool, str]]) -> None: 

206 """Process validation results and raise error if needed.""" 

207 errors = [] 

208 warnings = [] 

209 

210 for check_name, (is_healthy, message) in checks.items(): 

211 if is_healthy: 

212 logger.info(f"{check_name}: {message}") 

213 if "warning" in message.lower(): 

214 warnings.append(f"{check_name}: {message}") 

215 else: 

216 logger.error(f"{check_name}: {message}") 

217 errors.append(f"{check_name}: {message}") 

218 

219 if errors: 

220 error_msg = f"Startup validation failed: {', '.join(errors)}" 

221 logger.critical(error_msg) 

222 raise SystemValidationError(error_msg) 

223 

224 if warnings: 

225 logger.warning(f"Startup validation warnings: {', '.join(warnings)}") 

226 

227 logger.info("All startup validations passed") 

228 

229 

230@router.get( 

231 "", 

232 status_code=status.HTTP_200_OK, 

233 summary="Health Check", 

234 description="Check the health status of all critical systems", 

235) 

236async def health_check() -> HealthCheckResult: 

237 """ 

238 Health check endpoint that validates all critical systems. 

239 

240 Returns: 

241 HealthCheckResult with status and detailed check results 

242 

243 Example: 

244 ``` 

245 GET /api/v1/health 

246 { 

247 "status": "healthy", 

248 "checks": { 

249 "observability": true, 

250 "session_store": true, 

251 "api_key_cache": true, 

252 "docker_sandbox": true 

253 }, 

254 "errors": [], 

255 "warnings": [] 

256 } 

257 ``` 

258 """ 

259 checks_dict = { 

260 "observability": validate_observability_initialized(), 

261 "session_store": validate_session_store_registered(), 

262 "api_key_cache": validate_api_key_cache_configured(), 

263 "docker_sandbox": validate_docker_sandbox_security(), 

264 "database_connectivity": await validate_database_connectivity_async(), 

265 } 

266 

267 # Convert to bool dict and collect errors/warnings 

268 checks = {} 

269 errors = [] 

270 warnings = [] 

271 

272 for check_name, (is_healthy, message) in checks_dict.items(): 

273 checks[check_name] = is_healthy 

274 if not is_healthy: 

275 errors.append(f"{check_name}: {message}") 

276 elif "warning" in message.lower(): 

277 warnings.append(f"{check_name}: {message}") 

278 

279 # Overall status 

280 overall_status = "healthy" if not errors else "unhealthy" 

281 if warnings and not errors: 

282 overall_status = "degraded" 

283 

284 return HealthCheckResult( 

285 status=overall_status, 

286 checks=checks, 

287 errors=errors, 

288 warnings=warnings, 

289 )