Coverage for src / mcp_server_langgraph / auth / factory.py: 97%

62 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Authentication middleware factory for configurable auth providers 

3 

4Centralizes AuthMiddleware creation based on settings configuration. 

5Supports multiple authentication backends: 

6- InMemoryUserProvider (development/testing) 

7- KeycloakUserProvider (production) 

8- Custom providers (extensibility) 

9""" 

10 

11from mcp_server_langgraph.auth.keycloak import KeycloakConfig 

12from mcp_server_langgraph.auth.middleware import AuthMiddleware 

13from mcp_server_langgraph.auth.openfga import OpenFGAClient 

14from mcp_server_langgraph.auth.session import RedisSessionStore, SessionStore 

15from mcp_server_langgraph.auth.user_provider import InMemoryUserProvider, KeycloakUserProvider, UserProvider 

16from mcp_server_langgraph.core.config import Settings 

17from mcp_server_langgraph.observability.telemetry import logger 

18 

19 

20def create_user_provider(settings: Settings, openfga_client: OpenFGAClient | None = None) -> UserProvider: 

21 """ 

22 Create UserProvider based on settings configuration (production use) 

23 

24 This is the recommended factory function for production code. 

25 It reads from the Settings object to automatically configure the provider. 

26 

27 For test code, use `mcp_server_langgraph.auth.user_provider.create_user_provider()` 

28 which allows explicit provider_type and config specification. 

29 

30 Args: 

31 settings: Application settings instance 

32 openfga_client: Optional OpenFGA client for role synchronization 

33 

34 Returns: 

35 Configured UserProvider instance 

36 

37 Raises: 

38 ValueError: If provider type is unknown or required config is missing 

39 

40 See Also: 

41 mcp_server_langgraph.auth.user_provider.create_user_provider: Alternative factory for tests 

42 """ 

43 provider_type = settings.auth_provider.lower() 

44 

45 if provider_type == "inmemory": 

46 # SECURITY: Block InMemoryUserProvider in production/staging environments 

47 # Get environment with safe fallback to 'development' 

48 environment = getattr(settings, "environment", "development").lower() 

49 

50 # Block in production and staging (allow in development/test) 

51 if environment in ("production", "staging", "prod", "stg"): 

52 msg = ( 

53 f"SECURITY: InMemoryUserProvider is not allowed in production environments. " 

54 f"Current environment: '{environment}'. " 

55 f"InMemoryUserProvider is only suitable for development/testing (stores credentials in memory). " 

56 f"For production, use AUTH_PROVIDER=keycloak with proper SSO integration." 

57 ) 

58 raise RuntimeError(msg) 

59 

60 logger.info("Creating InMemoryUserProvider for authentication") 

61 logger.warning( 

62 f"InMemoryUserProvider is being used in '{environment}' environment. " 

63 f"This provider stores credentials in memory and should NEVER be used in production." 

64 ) 

65 

66 # Validate JWT secret is configured 

67 if not settings.jwt_secret_key: 

68 msg = "CRITICAL: JWT secret key required for InMemoryUserProvider. Set JWT_SECRET_KEY environment variable." 

69 raise ValueError(msg) 

70 

71 return InMemoryUserProvider(secret_key=settings.jwt_secret_key, use_password_hashing=settings.use_password_hashing) 

72 

73 elif provider_type == "keycloak": 

74 logger.info("Creating KeycloakUserProvider for authentication") 

75 

76 # Validate Keycloak configuration 

77 if not settings.keycloak_client_secret: 

78 msg = "CRITICAL: Keycloak client secret required. Set KEYCLOAK_CLIENT_SECRET environment variable." 

79 raise ValueError(msg) 

80 

81 if not settings.keycloak_admin_password: 

82 msg = ( 

83 "CRITICAL: Keycloak admin password required for admin API operations. " 

84 "Set KEYCLOAK_ADMIN_PASSWORD environment variable. " 

85 "This is needed for user management, API key operations, and attribute updates." 

86 ) 

87 raise ValueError(msg) 

88 

89 # Build Keycloak configuration from settings 

90 keycloak_config = KeycloakConfig( 

91 server_url=settings.keycloak_server_url, 

92 realm=settings.keycloak_realm, 

93 client_id=settings.keycloak_client_id, 

94 client_secret=settings.keycloak_client_secret, 

95 admin_username=settings.keycloak_admin_username, 

96 admin_password=settings.keycloak_admin_password, 

97 verify_ssl=settings.keycloak_verify_ssl, 

98 timeout=settings.keycloak_timeout, 

99 ) 

100 

101 return KeycloakUserProvider( 

102 config=keycloak_config, 

103 openfga_client=openfga_client, 

104 sync_on_login=True, # Auto-sync roles to OpenFGA on login 

105 ) 

106 

107 else: 

108 msg = ( 

109 f"Unknown auth provider: '{provider_type}'. " 

110 f"Supported providers: 'inmemory', 'keycloak'. " 

111 f"To add custom providers, extend UserProvider and update this factory." 

112 ) 

113 raise ValueError(msg) 

114 

115 

116def create_session_store(settings: Settings) -> SessionStore | None: 

117 """ 

118 Create SessionStore based on settings configuration 

119 

120 Args: 

121 settings: Application settings instance 

122 

123 Returns: 

124 Configured SessionStore instance or None if sessions disabled 

125 """ 

126 if settings.auth_mode != "session": 

127 # Token-based auth doesn't need session store 

128 logger.info("Session store not needed for token-based auth") 

129 return None 

130 

131 backend = settings.session_backend.lower() 

132 

133 if backend == "memory": 

134 logger.warning( 

135 "Using in-memory session store. Sessions will not persist across restarts. For production, use 'redis' backend." 

136 ) 

137 # Import here to avoid circular dependency 

138 from mcp_server_langgraph.auth.session import InMemorySessionStore 

139 

140 return InMemorySessionStore( 

141 default_ttl_seconds=settings.session_ttl_seconds, 

142 sliding_window=settings.session_sliding_window, 

143 max_concurrent_sessions=settings.session_max_concurrent, 

144 ) 

145 

146 elif backend == "redis": 

147 logger.info("Creating Redis session store") 

148 

149 # Validate Redis configuration 

150 if not settings.redis_url: 

151 msg = "CRITICAL: Redis URL required for Redis session store. Set REDIS_URL environment variable." 

152 raise ValueError(msg) 

153 

154 return RedisSessionStore( 

155 redis_url=settings.redis_url, 

156 password=settings.redis_password, 

157 ssl=settings.redis_ssl, 

158 ttl_seconds=settings.session_ttl_seconds, 

159 sliding_window=settings.session_sliding_window, 

160 max_concurrent_sessions=settings.session_max_concurrent, 

161 ) 

162 

163 else: 

164 msg = f"Unknown session backend: '{backend}'. Supported backends: 'memory', 'redis'." 

165 raise ValueError(msg) 

166 

167 

168def create_auth_middleware(settings: Settings, openfga_client: OpenFGAClient | None = None) -> AuthMiddleware: 

169 """ 

170 Create AuthMiddleware with configured providers 

171 

172 This factory reads settings and wires up the appropriate: 

173 - UserProvider (InMemory, Keycloak, custom) 

174 - SessionStore (Memory, Redis) 

175 - OpenFGA client (for fine-grained authorization) 

176 

177 Args: 

178 settings: Application settings instance 

179 openfga_client: Optional OpenFGA client for authorization 

180 

181 Returns: 

182 Fully configured AuthMiddleware instance 

183 

184 Example: 

185 >>> from mcp_server_langgraph.core.config import settings 

186 >>> from mcp_server_langgraph.auth.openfga import OpenFGAClient 

187 >>> openfga = OpenFGAClient(...) 

188 >>> auth = create_auth_middleware(settings, openfga) 

189 >>> # Now auth uses the configured provider (InMemory or Keycloak) 

190 """ 

191 # Create user provider based on settings 

192 user_provider = create_user_provider(settings, openfga_client) 

193 

194 # Create session store if using session-based auth 

195 session_store = create_session_store(settings) 

196 

197 # Register session store globally if created 

198 # This ensures get_session_store() returns the configured store (Redis/Memory) 

199 # instead of creating a fallback in-memory store (OpenAI Codex Finding #3) 

200 if session_store is not None: 

201 from mcp_server_langgraph.auth.session import get_session_store, set_session_store 

202 

203 set_session_store(session_store) 

204 

205 # Validation: Ensure registration succeeded (prevent regression) 

206 registered_store = get_session_store() 

207 if registered_store is not session_store: 207 ↛ 208line 207 didn't jump to line 208 because the condition on line 207 was never true

208 msg = ( 

209 "Session store registration failed! " 

210 f"Expected {type(session_store).__name__} but got {type(registered_store).__name__}. " 

211 "This is a critical bug - GDPR/session APIs will use wrong store." 

212 ) 

213 raise RuntimeError(msg) 

214 

215 # Build AuthMiddleware with all components 

216 auth = AuthMiddleware( 

217 secret_key=settings.jwt_secret_key, 

218 openfga_client=openfga_client, 

219 user_provider=user_provider, 

220 session_store=session_store, 

221 settings=settings, 

222 ) 

223 

224 logger.info( 

225 "AuthMiddleware created", 

226 extra={ 

227 "auth_provider": settings.auth_provider, 

228 "auth_mode": settings.auth_mode, 

229 "session_backend": settings.session_backend if settings.auth_mode == "session" else "N/A", 

230 "openfga_enabled": openfga_client is not None, 

231 }, 

232 ) 

233 

234 return auth