Coverage for src / mcp_server_langgraph / auth / factory.py: 97%
62 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Authentication middleware factory for configurable auth providers
4Centralizes AuthMiddleware creation based on settings configuration.
5Supports multiple authentication backends:
6- InMemoryUserProvider (development/testing)
7- KeycloakUserProvider (production)
8- Custom providers (extensibility)
9"""
11from mcp_server_langgraph.auth.keycloak import KeycloakConfig
12from mcp_server_langgraph.auth.middleware import AuthMiddleware
13from mcp_server_langgraph.auth.openfga import OpenFGAClient
14from mcp_server_langgraph.auth.session import RedisSessionStore, SessionStore
15from mcp_server_langgraph.auth.user_provider import InMemoryUserProvider, KeycloakUserProvider, UserProvider
16from mcp_server_langgraph.core.config import Settings
17from mcp_server_langgraph.observability.telemetry import logger
20def create_user_provider(settings: Settings, openfga_client: OpenFGAClient | None = None) -> UserProvider:
21 """
22 Create UserProvider based on settings configuration (production use)
24 This is the recommended factory function for production code.
25 It reads from the Settings object to automatically configure the provider.
27 For test code, use `mcp_server_langgraph.auth.user_provider.create_user_provider()`
28 which allows explicit provider_type and config specification.
30 Args:
31 settings: Application settings instance
32 openfga_client: Optional OpenFGA client for role synchronization
34 Returns:
35 Configured UserProvider instance
37 Raises:
38 ValueError: If provider type is unknown or required config is missing
40 See Also:
41 mcp_server_langgraph.auth.user_provider.create_user_provider: Alternative factory for tests
42 """
43 provider_type = settings.auth_provider.lower()
45 if provider_type == "inmemory":
46 # SECURITY: Block InMemoryUserProvider in production/staging environments
47 # Get environment with safe fallback to 'development'
48 environment = getattr(settings, "environment", "development").lower()
50 # Block in production and staging (allow in development/test)
51 if environment in ("production", "staging", "prod", "stg"):
52 msg = (
53 f"SECURITY: InMemoryUserProvider is not allowed in production environments. "
54 f"Current environment: '{environment}'. "
55 f"InMemoryUserProvider is only suitable for development/testing (stores credentials in memory). "
56 f"For production, use AUTH_PROVIDER=keycloak with proper SSO integration."
57 )
58 raise RuntimeError(msg)
60 logger.info("Creating InMemoryUserProvider for authentication")
61 logger.warning(
62 f"InMemoryUserProvider is being used in '{environment}' environment. "
63 f"This provider stores credentials in memory and should NEVER be used in production."
64 )
66 # Validate JWT secret is configured
67 if not settings.jwt_secret_key:
68 msg = "CRITICAL: JWT secret key required for InMemoryUserProvider. Set JWT_SECRET_KEY environment variable."
69 raise ValueError(msg)
71 return InMemoryUserProvider(secret_key=settings.jwt_secret_key, use_password_hashing=settings.use_password_hashing)
73 elif provider_type == "keycloak":
74 logger.info("Creating KeycloakUserProvider for authentication")
76 # Validate Keycloak configuration
77 if not settings.keycloak_client_secret:
78 msg = "CRITICAL: Keycloak client secret required. Set KEYCLOAK_CLIENT_SECRET environment variable."
79 raise ValueError(msg)
81 if not settings.keycloak_admin_password:
82 msg = (
83 "CRITICAL: Keycloak admin password required for admin API operations. "
84 "Set KEYCLOAK_ADMIN_PASSWORD environment variable. "
85 "This is needed for user management, API key operations, and attribute updates."
86 )
87 raise ValueError(msg)
89 # Build Keycloak configuration from settings
90 keycloak_config = KeycloakConfig(
91 server_url=settings.keycloak_server_url,
92 realm=settings.keycloak_realm,
93 client_id=settings.keycloak_client_id,
94 client_secret=settings.keycloak_client_secret,
95 admin_username=settings.keycloak_admin_username,
96 admin_password=settings.keycloak_admin_password,
97 verify_ssl=settings.keycloak_verify_ssl,
98 timeout=settings.keycloak_timeout,
99 )
101 return KeycloakUserProvider(
102 config=keycloak_config,
103 openfga_client=openfga_client,
104 sync_on_login=True, # Auto-sync roles to OpenFGA on login
105 )
107 else:
108 msg = (
109 f"Unknown auth provider: '{provider_type}'. "
110 f"Supported providers: 'inmemory', 'keycloak'. "
111 f"To add custom providers, extend UserProvider and update this factory."
112 )
113 raise ValueError(msg)
116def create_session_store(settings: Settings) -> SessionStore | None:
117 """
118 Create SessionStore based on settings configuration
120 Args:
121 settings: Application settings instance
123 Returns:
124 Configured SessionStore instance or None if sessions disabled
125 """
126 if settings.auth_mode != "session":
127 # Token-based auth doesn't need session store
128 logger.info("Session store not needed for token-based auth")
129 return None
131 backend = settings.session_backend.lower()
133 if backend == "memory":
134 logger.warning(
135 "Using in-memory session store. Sessions will not persist across restarts. For production, use 'redis' backend."
136 )
137 # Import here to avoid circular dependency
138 from mcp_server_langgraph.auth.session import InMemorySessionStore
140 return InMemorySessionStore(
141 default_ttl_seconds=settings.session_ttl_seconds,
142 sliding_window=settings.session_sliding_window,
143 max_concurrent_sessions=settings.session_max_concurrent,
144 )
146 elif backend == "redis":
147 logger.info("Creating Redis session store")
149 # Validate Redis configuration
150 if not settings.redis_url:
151 msg = "CRITICAL: Redis URL required for Redis session store. Set REDIS_URL environment variable."
152 raise ValueError(msg)
154 return RedisSessionStore(
155 redis_url=settings.redis_url,
156 password=settings.redis_password,
157 ssl=settings.redis_ssl,
158 ttl_seconds=settings.session_ttl_seconds,
159 sliding_window=settings.session_sliding_window,
160 max_concurrent_sessions=settings.session_max_concurrent,
161 )
163 else:
164 msg = f"Unknown session backend: '{backend}'. Supported backends: 'memory', 'redis'."
165 raise ValueError(msg)
168def create_auth_middleware(settings: Settings, openfga_client: OpenFGAClient | None = None) -> AuthMiddleware:
169 """
170 Create AuthMiddleware with configured providers
172 This factory reads settings and wires up the appropriate:
173 - UserProvider (InMemory, Keycloak, custom)
174 - SessionStore (Memory, Redis)
175 - OpenFGA client (for fine-grained authorization)
177 Args:
178 settings: Application settings instance
179 openfga_client: Optional OpenFGA client for authorization
181 Returns:
182 Fully configured AuthMiddleware instance
184 Example:
185 >>> from mcp_server_langgraph.core.config import settings
186 >>> from mcp_server_langgraph.auth.openfga import OpenFGAClient
187 >>> openfga = OpenFGAClient(...)
188 >>> auth = create_auth_middleware(settings, openfga)
189 >>> # Now auth uses the configured provider (InMemory or Keycloak)
190 """
191 # Create user provider based on settings
192 user_provider = create_user_provider(settings, openfga_client)
194 # Create session store if using session-based auth
195 session_store = create_session_store(settings)
197 # Register session store globally if created
198 # This ensures get_session_store() returns the configured store (Redis/Memory)
199 # instead of creating a fallback in-memory store (OpenAI Codex Finding #3)
200 if session_store is not None:
201 from mcp_server_langgraph.auth.session import get_session_store, set_session_store
203 set_session_store(session_store)
205 # Validation: Ensure registration succeeded (prevent regression)
206 registered_store = get_session_store()
207 if registered_store is not session_store: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true
208 msg = (
209 "Session store registration failed! "
210 f"Expected {type(session_store).__name__} but got {type(registered_store).__name__}. "
211 "This is a critical bug - GDPR/session APIs will use wrong store."
212 )
213 raise RuntimeError(msg)
215 # Build AuthMiddleware with all components
216 auth = AuthMiddleware(
217 secret_key=settings.jwt_secret_key,
218 openfga_client=openfga_client,
219 user_provider=user_provider,
220 session_store=session_store,
221 settings=settings,
222 )
224 logger.info(
225 "AuthMiddleware created",
226 extra={
227 "auth_provider": settings.auth_provider,
228 "auth_mode": settings.auth_mode,
229 "session_backend": settings.session_backend if settings.auth_mode == "session" else "N/A",
230 "openfga_enabled": openfga_client is not None,
231 },
232 )
234 return auth