Coverage for src / mcp_server_langgraph / core / dependencies.py: 70%

66 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2FastAPI Dependencies 

3 

4Provides dependency injection for commonly used services. 

5""" 

6 

7from typing import Any 

8 

9from fastapi import Depends 

10 

11from mcp_server_langgraph.auth.api_keys import APIKeyManager 

12from mcp_server_langgraph.auth.keycloak import KeycloakClient 

13from mcp_server_langgraph.auth.openfga import OpenFGAClient 

14from mcp_server_langgraph.auth.service_principal import ServicePrincipalManager 

15from mcp_server_langgraph.core.config import Settings, settings 

16 

17# Singleton instances (will be initialized on first use) 

18_keycloak_client: KeycloakClient | None = None 

19_openfga_client: OpenFGAClient | None = None 

20_service_principal_manager: ServicePrincipalManager | None = None 

21_api_key_manager: APIKeyManager | None = None 

22 

23 

24def get_keycloak_client() -> KeycloakClient: 

25 """Get Keycloak client instance (singleton)""" 

26 global _keycloak_client 

27 

28 if _keycloak_client is None: 28 ↛ 41line 28 didn't jump to line 41 because the condition on line 28 was always true

29 from mcp_server_langgraph.auth.keycloak import KeycloakConfig 

30 

31 keycloak_config = KeycloakConfig( 

32 server_url=settings.keycloak_server_url, 

33 realm=settings.keycloak_realm, 

34 client_id=settings.keycloak_client_id, 

35 client_secret=settings.keycloak_client_secret, 

36 admin_username=settings.keycloak_admin_username, 

37 admin_password=settings.keycloak_admin_password, 

38 ) 

39 _keycloak_client = KeycloakClient(config=keycloak_config) 

40 

41 return _keycloak_client 

42 

43 

44def get_openfga_client() -> OpenFGAClient | None: 

45 """ 

46 Get OpenFGA client instance (singleton) 

47 

48 Returns None if OpenFGA is not fully configured (store_id or model_id missing). 

49 This allows graceful degradation when OpenFGA is intentionally disabled. 

50 """ 

51 global _openfga_client 

52 

53 if _openfga_client is None: 53 ↛ 73line 53 didn't jump to line 73 because the condition on line 53 was always true

54 from mcp_server_langgraph.auth.openfga import OpenFGAConfig 

55 from mcp_server_langgraph.observability.telemetry import logger 

56 

57 # Validate that required configuration is present 

58 if not settings.openfga_store_id or not settings.openfga_model_id: 

59 logger.warning( 

60 "OpenFGA configuration incomplete - authorization will be degraded. " 

61 f"store_id: {settings.openfga_store_id}, model_id: {settings.openfga_model_id}. " 

62 "Set OPENFGA_STORE_ID and OPENFGA_MODEL_ID environment variables to enable OpenFGA." 

63 ) 

64 return None 

65 

66 openfga_config = OpenFGAConfig( 

67 api_url=settings.openfga_api_url, 

68 store_id=settings.openfga_store_id, 

69 model_id=settings.openfga_model_id, 

70 ) 

71 _openfga_client = OpenFGAClient(config=openfga_config) 

72 

73 return _openfga_client 

74 

75 

76def validate_production_auth_config(settings_obj: Settings) -> None: 

77 """ 

78 Validate that production deployments have proper authorization configured. 

79 

80 SECURITY: This function prevents production deployments from running with degraded 

81 authorization when OpenFGA is not configured and fallback is disabled. 

82 

83 Implements remediation for OpenAI Codex Finding #1: Authorization Degradation 

84 

85 Args: 

86 settings_obj: Settings object to validate 

87 

88 Raises: 

89 RuntimeError: If production environment lacks required authorization infrastructure 

90 

91 References: 

92 - tests/security/test_authorization_fallback_controls.py 

93 - CWE-862: Missing Authorization 

94 """ 

95 if settings_obj.environment != "production": 

96 # Only enforce for production environment 

97 return 

98 

99 # Check if OpenFGA is configured 

100 openfga_configured = bool(settings_obj.openfga_store_id and settings_obj.openfga_model_id) 

101 

102 # Check if fallback is allowed 

103 allow_fallback = getattr(settings_obj, "allow_auth_fallback", False) 

104 

105 # Production MUST have either: 

106 # 1. OpenFGA properly configured, OR 

107 # 2. Fallback explicitly disabled (fail-closed) 

108 if not openfga_configured and not allow_fallback: 

109 # This is the secure configuration - production with no OpenFGA will deny all auth requests 

110 # This is intentional fail-closed behavior 

111 from mcp_server_langgraph.observability.telemetry import logger 

112 

113 logger.warning( 

114 "Production deployment without OpenFGA will deny all authorization requests. " 

115 "This is secure fail-closed behavior. Configure OpenFGA for production use.", 

116 extra={ 

117 "environment": settings_obj.environment, 

118 "openfga_configured": openfga_configured, 

119 "allow_auth_fallback": allow_fallback, 

120 }, 

121 ) 

122 # This is actually a valid secure configuration, so we'll allow it 

123 # The authorization will just deny everything, which is secure 

124 return 

125 

126 if not openfga_configured and allow_fallback: 

127 # SECURITY ERROR: Production with fallback enabled but no OpenFGA 

128 msg = ( 

129 "SECURITY ERROR: Production deployment requires OpenFGA authorization infrastructure. " 

130 f"OpenFGA is not configured (store_id: {settings_obj.openfga_store_id}, " 

131 f"model_id: {settings_obj.openfga_model_id}) but ALLOW_AUTH_FALLBACK=true. " 

132 "This configuration would allow degraded role-based authorization in production. " 

133 "Either: (1) Configure OpenFGA properly, or (2) Set ALLOW_AUTH_FALLBACK=false to fail-closed." 

134 ) 

135 raise RuntimeError(msg) 

136 

137 

138def get_service_principal_manager( 

139 keycloak: KeycloakClient = Depends(get_keycloak_client), 

140 openfga: OpenFGAClient = Depends(get_openfga_client), 

141) -> ServicePrincipalManager: 

142 """ 

143 Get ServicePrincipalManager instance 

144 

145 Args: 

146 keycloak: Keycloak client (injected) 

147 openfga: OpenFGA client (injected) 

148 

149 Returns: 

150 ServicePrincipalManager instance 

151 """ 

152 global _service_principal_manager 

153 

154 if _service_principal_manager is None: 154 ↛ 160line 154 didn't jump to line 160 because the condition on line 154 was always true

155 _service_principal_manager = ServicePrincipalManager( 

156 keycloak_client=keycloak, 

157 openfga_client=openfga, 

158 ) 

159 

160 return _service_principal_manager 

161 

162 

163def get_api_key_manager( 

164 keycloak: KeycloakClient = Depends(get_keycloak_client), 

165) -> APIKeyManager: 

166 """ 

167 Get APIKeyManager instance with Redis caching if enabled 

168 

169 Args: 

170 keycloak: Keycloak client (injected) 

171 

172 Returns: 

173 APIKeyManager instance configured with Redis cache if settings.api_key_cache_enabled=True 

174 """ 

175 global _api_key_manager 

176 

177 if _api_key_manager is None: 177 ↛ 243line 177 didn't jump to line 243 because the condition on line 177 was always true

178 # Wire Redis client for API key caching if enabled (OpenAI Codex Finding #5) 

179 # IMPORTANT: This Redis client must be passed to APIKeyManager or caching will be disabled 

180 # Settings used: 

181 # - api_key_cache_enabled: Enable/disable caching 

182 # - api_key_cache_db: Redis database number (default: 2) 

183 # - api_key_cache_ttl: Cache TTL in seconds (default: 3600) 

184 # - redis_url: Redis connection URL 

185 # - redis_password: Redis password (optional) 

186 # - redis_ssl: Use SSL for Redis connection (default: False) 

187 redis_client: Any | None = None 

188 if settings.api_key_cache_enabled and settings.redis_url: 

189 try: 

190 from urllib.parse import urlparse, urlunparse 

191 

192 import redis.asyncio as redis 

193 

194 # Build Redis URL with correct database number 

195 # Parse the URL to handle existing database numbers, trailing slashes, query params 

196 parsed = urlparse(settings.redis_url) 

197 

198 # Remove existing database number from path (if present) 

199 # Redis URL path is typically empty or /db_number 

200 # We'll replace it with the configured database number 

201 new_path = f"/{settings.api_key_cache_db}" 

202 

203 # Reconstruct URL with new database number 

204 redis_url_with_db = urlunparse( 

205 ( 

206 parsed.scheme, # redis:// or rediss:// 

207 parsed.netloc, # host:port 

208 new_path, # /db_number 

209 parsed.params, # unused in Redis URLs 

210 parsed.query, # query parameters (if any) 

211 parsed.fragment, # fragment (if any) 

212 ) 

213 ) 

214 

215 # Create Redis client with configured credentials 

216 redis_client = redis.from_url( # type: ignore[no-untyped-call] 

217 redis_url_with_db, 

218 password=settings.redis_password, 

219 ssl=settings.redis_ssl, 

220 decode_responses=True, 

221 ) 

222 except ImportError: 

223 # Redis not installed, disable caching gracefully 

224 redis_client = None 

225 

226 _api_key_manager = APIKeyManager( 

227 keycloak_client=keycloak, 

228 redis_client=redis_client, 

229 cache_ttl=settings.api_key_cache_ttl, 

230 cache_enabled=settings.api_key_cache_enabled, 

231 ) 

232 

233 # Validation: Ensure cache is actually enabled if requested (prevent regression) 

234 if settings.api_key_cache_enabled and settings.redis_url and not _api_key_manager.cache_enabled: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true

235 msg = ( 

236 "API key caching configuration error! " 

237 f"settings.api_key_cache_enabled=True but APIKeyManager.cache_enabled=False. " 

238 f"Redis client: {redis_client}. " 

239 "This indicates Redis client was not properly wired." 

240 ) 

241 raise RuntimeError(msg) 

242 

243 return _api_key_manager 

244 

245 

246# ============================================================================== 

247# Testing Utilities (CODEX Finding #6) 

248# ============================================================================== 

249 

250 

251def reset_singleton_dependencies() -> None: 

252 """ 

253 Reset all singleton dependencies to None. 

254 

255 CODEX FINDING #6: Permanently skipped test due to singleton cache. 

256 This function enables proper testing of dependency wiring logic by 

257 allowing tests to reset singletons between test runs. 

258 

259 Usage in tests: 

260 from mcp_server_langgraph.core.dependencies import reset_singleton_dependencies 

261 

262 def test_dependency_wiring(): 

263 reset_singleton_dependencies() 

264 # Now test dependency initialization with mocked settings 

265 ... 

266 

267 WARNING: This should ONLY be used in tests. Never call in production code. 

268 """ 

269 global _keycloak_client, _openfga_client, _service_principal_manager, _api_key_manager 

270 

271 _keycloak_client = None 

272 _openfga_client = None 

273 _service_principal_manager = None 

274 _api_key_manager = None