Coverage for src / mcp_server_langgraph / core / dependencies.py: 70%
66 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2FastAPI Dependencies
4Provides dependency injection for commonly used services.
5"""
7from typing import Any
9from fastapi import Depends
11from mcp_server_langgraph.auth.api_keys import APIKeyManager
12from mcp_server_langgraph.auth.keycloak import KeycloakClient
13from mcp_server_langgraph.auth.openfga import OpenFGAClient
14from mcp_server_langgraph.auth.service_principal import ServicePrincipalManager
15from mcp_server_langgraph.core.config import Settings, settings
17# Singleton instances (will be initialized on first use)
18_keycloak_client: KeycloakClient | None = None
19_openfga_client: OpenFGAClient | None = None
20_service_principal_manager: ServicePrincipalManager | None = None
21_api_key_manager: APIKeyManager | None = None
24def get_keycloak_client() -> KeycloakClient:
25 """Get Keycloak client instance (singleton)"""
26 global _keycloak_client
28 if _keycloak_client is None: 28 ↛ 41line 28 didn't jump to line 41 because the condition on line 28 was always true
29 from mcp_server_langgraph.auth.keycloak import KeycloakConfig
31 keycloak_config = KeycloakConfig(
32 server_url=settings.keycloak_server_url,
33 realm=settings.keycloak_realm,
34 client_id=settings.keycloak_client_id,
35 client_secret=settings.keycloak_client_secret,
36 admin_username=settings.keycloak_admin_username,
37 admin_password=settings.keycloak_admin_password,
38 )
39 _keycloak_client = KeycloakClient(config=keycloak_config)
41 return _keycloak_client
44def get_openfga_client() -> OpenFGAClient | None:
45 """
46 Get OpenFGA client instance (singleton)
48 Returns None if OpenFGA is not fully configured (store_id or model_id missing).
49 This allows graceful degradation when OpenFGA is intentionally disabled.
50 """
51 global _openfga_client
53 if _openfga_client is None: 53 ↛ 73line 53 didn't jump to line 73 because the condition on line 53 was always true
54 from mcp_server_langgraph.auth.openfga import OpenFGAConfig
55 from mcp_server_langgraph.observability.telemetry import logger
57 # Validate that required configuration is present
58 if not settings.openfga_store_id or not settings.openfga_model_id:
59 logger.warning(
60 "OpenFGA configuration incomplete - authorization will be degraded. "
61 f"store_id: {settings.openfga_store_id}, model_id: {settings.openfga_model_id}. "
62 "Set OPENFGA_STORE_ID and OPENFGA_MODEL_ID environment variables to enable OpenFGA."
63 )
64 return None
66 openfga_config = OpenFGAConfig(
67 api_url=settings.openfga_api_url,
68 store_id=settings.openfga_store_id,
69 model_id=settings.openfga_model_id,
70 )
71 _openfga_client = OpenFGAClient(config=openfga_config)
73 return _openfga_client
76def validate_production_auth_config(settings_obj: Settings) -> None:
77 """
78 Validate that production deployments have proper authorization configured.
80 SECURITY: This function prevents production deployments from running with degraded
81 authorization when OpenFGA is not configured and fallback is disabled.
83 Implements remediation for OpenAI Codex Finding #1: Authorization Degradation
85 Args:
86 settings_obj: Settings object to validate
88 Raises:
89 RuntimeError: If production environment lacks required authorization infrastructure
91 References:
92 - tests/security/test_authorization_fallback_controls.py
93 - CWE-862: Missing Authorization
94 """
95 if settings_obj.environment != "production":
96 # Only enforce for production environment
97 return
99 # Check if OpenFGA is configured
100 openfga_configured = bool(settings_obj.openfga_store_id and settings_obj.openfga_model_id)
102 # Check if fallback is allowed
103 allow_fallback = getattr(settings_obj, "allow_auth_fallback", False)
105 # Production MUST have either:
106 # 1. OpenFGA properly configured, OR
107 # 2. Fallback explicitly disabled (fail-closed)
108 if not openfga_configured and not allow_fallback:
109 # This is the secure configuration - production with no OpenFGA will deny all auth requests
110 # This is intentional fail-closed behavior
111 from mcp_server_langgraph.observability.telemetry import logger
113 logger.warning(
114 "Production deployment without OpenFGA will deny all authorization requests. "
115 "This is secure fail-closed behavior. Configure OpenFGA for production use.",
116 extra={
117 "environment": settings_obj.environment,
118 "openfga_configured": openfga_configured,
119 "allow_auth_fallback": allow_fallback,
120 },
121 )
122 # This is actually a valid secure configuration, so we'll allow it
123 # The authorization will just deny everything, which is secure
124 return
126 if not openfga_configured and allow_fallback:
127 # SECURITY ERROR: Production with fallback enabled but no OpenFGA
128 msg = (
129 "SECURITY ERROR: Production deployment requires OpenFGA authorization infrastructure. "
130 f"OpenFGA is not configured (store_id: {settings_obj.openfga_store_id}, "
131 f"model_id: {settings_obj.openfga_model_id}) but ALLOW_AUTH_FALLBACK=true. "
132 "This configuration would allow degraded role-based authorization in production. "
133 "Either: (1) Configure OpenFGA properly, or (2) Set ALLOW_AUTH_FALLBACK=false to fail-closed."
134 )
135 raise RuntimeError(msg)
138def get_service_principal_manager(
139 keycloak: KeycloakClient = Depends(get_keycloak_client),
140 openfga: OpenFGAClient = Depends(get_openfga_client),
141) -> ServicePrincipalManager:
142 """
143 Get ServicePrincipalManager instance
145 Args:
146 keycloak: Keycloak client (injected)
147 openfga: OpenFGA client (injected)
149 Returns:
150 ServicePrincipalManager instance
151 """
152 global _service_principal_manager
154 if _service_principal_manager is None: 154 ↛ 160line 154 didn't jump to line 160 because the condition on line 154 was always true
155 _service_principal_manager = ServicePrincipalManager(
156 keycloak_client=keycloak,
157 openfga_client=openfga,
158 )
160 return _service_principal_manager
163def get_api_key_manager(
164 keycloak: KeycloakClient = Depends(get_keycloak_client),
165) -> APIKeyManager:
166 """
167 Get APIKeyManager instance with Redis caching if enabled
169 Args:
170 keycloak: Keycloak client (injected)
172 Returns:
173 APIKeyManager instance configured with Redis cache if settings.api_key_cache_enabled=True
174 """
175 global _api_key_manager
177 if _api_key_manager is None: 177 ↛ 243line 177 didn't jump to line 243 because the condition on line 177 was always true
178 # Wire Redis client for API key caching if enabled (OpenAI Codex Finding #5)
179 # IMPORTANT: This Redis client must be passed to APIKeyManager or caching will be disabled
180 # Settings used:
181 # - api_key_cache_enabled: Enable/disable caching
182 # - api_key_cache_db: Redis database number (default: 2)
183 # - api_key_cache_ttl: Cache TTL in seconds (default: 3600)
184 # - redis_url: Redis connection URL
185 # - redis_password: Redis password (optional)
186 # - redis_ssl: Use SSL for Redis connection (default: False)
187 redis_client: Any | None = None
188 if settings.api_key_cache_enabled and settings.redis_url:
189 try:
190 from urllib.parse import urlparse, urlunparse
192 import redis.asyncio as redis
194 # Build Redis URL with correct database number
195 # Parse the URL to handle existing database numbers, trailing slashes, query params
196 parsed = urlparse(settings.redis_url)
198 # Remove existing database number from path (if present)
199 # Redis URL path is typically empty or /db_number
200 # We'll replace it with the configured database number
201 new_path = f"/{settings.api_key_cache_db}"
203 # Reconstruct URL with new database number
204 redis_url_with_db = urlunparse(
205 (
206 parsed.scheme, # redis:// or rediss://
207 parsed.netloc, # host:port
208 new_path, # /db_number
209 parsed.params, # unused in Redis URLs
210 parsed.query, # query parameters (if any)
211 parsed.fragment, # fragment (if any)
212 )
213 )
215 # Create Redis client with configured credentials
216 redis_client = redis.from_url( # type: ignore[no-untyped-call]
217 redis_url_with_db,
218 password=settings.redis_password,
219 ssl=settings.redis_ssl,
220 decode_responses=True,
221 )
222 except ImportError:
223 # Redis not installed, disable caching gracefully
224 redis_client = None
226 _api_key_manager = APIKeyManager(
227 keycloak_client=keycloak,
228 redis_client=redis_client,
229 cache_ttl=settings.api_key_cache_ttl,
230 cache_enabled=settings.api_key_cache_enabled,
231 )
233 # Validation: Ensure cache is actually enabled if requested (prevent regression)
234 if settings.api_key_cache_enabled and settings.redis_url and not _api_key_manager.cache_enabled: 234 ↛ 235line 234 didn't jump to line 235 because the condition on line 234 was never true
235 msg = (
236 "API key caching configuration error! "
237 f"settings.api_key_cache_enabled=True but APIKeyManager.cache_enabled=False. "
238 f"Redis client: {redis_client}. "
239 "This indicates Redis client was not properly wired."
240 )
241 raise RuntimeError(msg)
243 return _api_key_manager
246# ==============================================================================
247# Testing Utilities (CODEX Finding #6)
248# ==============================================================================
251def reset_singleton_dependencies() -> None:
252 """
253 Reset all singleton dependencies to None.
255 CODEX FINDING #6: Permanently skipped test due to singleton cache.
256 This function enables proper testing of dependency wiring logic by
257 allowing tests to reset singletons between test runs.
259 Usage in tests:
260 from mcp_server_langgraph.core.dependencies import reset_singleton_dependencies
262 def test_dependency_wiring():
263 reset_singleton_dependencies()
264 # Now test dependency initialization with mocked settings
265 ...
267 WARNING: This should ONLY be used in tests. Never call in production code.
268 """
269 global _keycloak_client, _openfga_client, _service_principal_manager, _api_key_manager
271 _keycloak_client = None
272 _openfga_client = None
273 _service_principal_manager = None
274 _api_key_manager = None