Coverage for src / mcp_server_langgraph / core / container.py: 56%

188 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Dependency Injection Container for MCP Server LangGraph 

3 

4This module provides a clean dependency injection pattern that: 

51. Eliminates global singletons 

62. Makes testing easier (injectable dependencies) 

73. Supports multiple environments (test, dev, production) 

84. Enables per-tenant/per-agent configuration 

9 

10Architecture: 

11- ApplicationContainer: Main DI container 

12- ContainerConfig: Configuration for the container 

13- Provider abstractions: TelemetryProvider, AuthProvider, StorageProvider 

14- Helper functions: create_test_container, create_production_container 

15""" 

16 

17from __future__ import annotations 

18 

19import logging 

20from dataclasses import dataclass, field 

21from functools import cached_property 

22from typing import Any, Protocol, runtime_checkable 

23 

24from mcp_server_langgraph.core.config import Settings 

25 

26# ============================================================================== 

27# Container Configuration 

28# ============================================================================== 

29 

30 

31@dataclass 

32class ContainerConfig: 

33 """Configuration for the application container""" 

34 

35 environment: str = "development" 

36 enable_telemetry: bool = field(default=False) 

37 enable_auth: bool = field(default=False) 

38 log_level: str = "INFO" 

39 

40 def __post_init__(self) -> None: 

41 """Adjust defaults based on environment""" 

42 if self.environment == "test": 

43 # Test mode: disable telemetry and auth by default 

44 self.enable_telemetry = False 

45 self.enable_auth = False 

46 elif self.environment == "production": 

47 # Production mode: enable security features by default 

48 self.enable_telemetry = True 

49 self.enable_auth = True 

50 

51 

52# ============================================================================== 

53# Provider Protocols (Abstractions) 

54# ============================================================================== 

55 

56 

57@runtime_checkable 

58class TelemetryProvider(Protocol): 

59 """Protocol for telemetry providers""" 

60 

61 @property 

62 def logger(self) -> logging.Logger: 

63 """Get logger instance""" 

64 ... 

65 

66 @property 

67 def metrics(self) -> Any: 

68 """Get metrics instance""" 

69 ... 

70 

71 @property 

72 def tracer(self) -> Any: 

73 """Get tracer instance""" 

74 ... 

75 

76 

77@runtime_checkable 

78class AuthProvider(Protocol): 

79 """Protocol for authentication providers""" 

80 

81 def validate_token(self, token: str) -> bool: 

82 """Validate authentication token""" 

83 ... 

84 

85 def get_current_user(self, token: str) -> dict[str, Any]: 

86 """Get current user from token""" 

87 ... 

88 

89 

90@runtime_checkable 

91class StorageProvider(Protocol): 

92 """Protocol for storage providers""" 

93 

94 def get(self, key: str) -> Any | None: 

95 """Get value by key""" 

96 ... 

97 

98 def set(self, key: str, value: Any) -> None: 

99 """Set value by key""" 

100 ... 

101 

102 def delete(self, key: str) -> None: 

103 """Delete value by key""" 

104 ... 

105 

106 

107# ============================================================================== 

108# No-Op Providers (for testing) 

109# ============================================================================== 

110 

111 

112class NoOpLogger: 

113 """No-op logger that doesn't output anything""" 

114 

115 def info(self, msg: str, *args: Any, **kwargs: Any) -> None: 

116 pass 

117 

118 def debug(self, msg: str, *args: Any, **kwargs: Any) -> None: 

119 pass 

120 

121 def warning(self, msg: str, *args: Any, **kwargs: Any) -> None: 

122 pass 

123 

124 def error(self, msg: str, *args: Any, **kwargs: Any) -> None: 

125 pass 

126 

127 def critical(self, msg: str, *args: Any, **kwargs: Any) -> None: 

128 pass 

129 

130 

131class NoOpMetrics: 

132 """No-op metrics that doesn't collect anything""" 

133 

134 def counter(self, name: str, value: int = 1, **kwargs: Any) -> None: 

135 pass 

136 

137 def gauge(self, name: str, value: float, **kwargs: Any) -> None: 

138 pass 

139 

140 def histogram(self, name: str, value: float, **kwargs: Any) -> None: 

141 pass 

142 

143 

144class NoOpTracer: 

145 """No-op tracer that doesn't trace anything""" 

146 

147 def start_as_current_span(self, name: str, **kwargs: Any) -> Any: 

148 """Context manager that does nothing""" 

149 from contextlib import nullcontext 

150 

151 return nullcontext() 

152 

153 

154class NoOpTelemetryProvider: 

155 """No-op telemetry provider for testing""" 

156 

157 def __init__(self) -> None: 

158 self._logger = NoOpLogger() 

159 self._metrics = NoOpMetrics() 

160 self._tracer = NoOpTracer() 

161 

162 @property 

163 def logger(self) -> NoOpLogger: 

164 return self._logger 

165 

166 @property 

167 def metrics(self) -> NoOpMetrics: 

168 return self._metrics 

169 

170 @property 

171 def tracer(self) -> NoOpTracer: 

172 return self._tracer 

173 

174 

175class NoOpAuthProvider: 

176 """No-op auth provider for testing""" 

177 

178 def validate_token(self, token: str) -> bool: 

179 """Accept any token in test mode""" 

180 return True 

181 

182 def get_current_user(self, token: str) -> dict[str, Any]: 

183 """Return mock user""" 

184 return {"user_id": "test-user", "username": "testuser", "email": "test@example.com"} 

185 

186 

187class MemoryStorageProvider: 

188 """In-memory storage provider for testing""" 

189 

190 def __init__(self) -> None: 

191 self._store: dict[str, Any] = {} 

192 

193 def get(self, key: str) -> Any | None: 

194 return self._store.get(key) 

195 

196 def set(self, key: str, value: Any) -> None: 

197 self._store[key] = value 

198 

199 def delete(self, key: str) -> None: 

200 self._store.pop(key, None) 

201 

202 

203# ============================================================================== 

204# Production Providers 

205# ============================================================================== 

206 

207 

208class ProductionTelemetryProvider: 

209 """Production telemetry provider using actual OpenTelemetry""" 

210 

211 def __init__(self, settings: Settings): 

212 self.settings = settings 

213 self._logger: logging.Logger | None = None 

214 self._metrics: Any | None = None 

215 self._tracer: Any | None = None 

216 

217 @property 

218 def logger(self) -> logging.Logger: 

219 if self._logger is None: 

220 # Use existing telemetry module's logger 

221 from mcp_server_langgraph.observability.telemetry import logger 

222 

223 self._logger = logger 

224 return self._logger 

225 

226 @property 

227 def metrics(self) -> Any: 

228 if self._metrics is None: 

229 # Use existing telemetry module's metrics 

230 from mcp_server_langgraph.observability.telemetry import metrics 

231 

232 self._metrics = metrics 

233 return self._metrics 

234 

235 @property 

236 def tracer(self) -> Any: 

237 if self._tracer is None: 

238 # Use existing telemetry module's tracer 

239 from mcp_server_langgraph.observability.telemetry import tracer 

240 

241 self._tracer = tracer 

242 return self._tracer 

243 

244 

245class InMemoryAuthProvider: 

246 """In-memory auth provider for development""" 

247 

248 def __init__(self) -> None: 

249 self._tokens: dict[str, dict[str, Any]] = {} 

250 

251 def create_token(self, user_id: str, username: str, **kwargs: Any) -> str: 

252 """Create a simple token (NOT cryptographically secure - dev only!)""" 

253 import secrets 

254 

255 token = secrets.token_urlsafe(32) 

256 self._tokens[token] = {"user_id": user_id, "username": username, **kwargs} 

257 return token 

258 

259 def validate_token(self, token: str) -> bool: 

260 return token in self._tokens 

261 

262 def get_current_user(self, token: str) -> dict[str, Any]: 

263 return self._tokens.get(token, {}) 

264 

265 

266class RedisStorageProvider: 

267 """Redis storage provider for production""" 

268 

269 def __init__(self, settings: Settings) -> None: 

270 self.settings = settings 

271 self._client: Any | None = None 

272 

273 def _get_client(self) -> Any: 

274 """Lazy Redis client initialization""" 

275 if self._client is None: 

276 import redis 

277 

278 self._client = redis.Redis(host=self.settings.redis_host, port=self.settings.redis_port, decode_responses=True) 

279 return self._client 

280 

281 def get(self, key: str) -> Any | None: 

282 import json 

283 

284 value = self._get_client().get(key) 

285 return json.loads(value) if value else None 

286 

287 def set(self, key: str, value: Any) -> None: 

288 import json 

289 

290 self._get_client().set(key, json.dumps(value)) 

291 

292 def delete(self, key: str) -> None: 

293 self._get_client().delete(key) 

294 

295 

296# ============================================================================== 

297# Application Container 

298# ============================================================================== 

299 

300 

301class ApplicationContainer: 

302 """ 

303 Main dependency injection container 

304 

305 Provides: 

306 - Settings (configuration) 

307 - Telemetry (logging, metrics, tracing) 

308 - Authentication 

309 - Storage 

310 - Agent graph factory 

311 - MCP server factory 

312 

313 Usage: 

314 # Test mode 

315 container = create_test_container() 

316 

317 # Development mode 

318 config = ContainerConfig(environment="development") 

319 container = ApplicationContainer(config) 

320 

321 # Production mode 

322 container = create_production_container() 

323 

324 # Use dependencies 

325 settings = container.settings 

326 telemetry = container.get_telemetry() 

327 auth = container.get_auth() 

328 """ 

329 

330 def __init__(self, config: ContainerConfig, settings: Settings | None = None): 

331 """ 

332 Initialize the container 

333 

334 Args: 

335 config: Container configuration 

336 settings: Optional settings override (for testing) 

337 """ 

338 self.config = config 

339 self._settings = settings 

340 # Note: Don't initialize _*_instance attributes here 

341 # They are created on first access for true lazy initialization 

342 

343 @cached_property 

344 def settings(self) -> Settings: 

345 """Get settings (lazy initialization)""" 

346 if self._settings is not None: 

347 return self._settings 

348 

349 # Create settings based on environment 

350 return Settings(environment=self.config.environment, log_level=self.config.log_level) 

351 

352 def get_telemetry(self) -> TelemetryProvider: 

353 """Get telemetry provider (lazy initialization)""" 

354 if not hasattr(self, "_telemetry_instance"): 354 ↛ 360line 354 didn't jump to line 360 because the condition on line 354 was always true

355 if self.config.environment == "test" or not self.config.enable_telemetry: 355 ↛ 358line 355 didn't jump to line 358 because the condition on line 355 was always true

356 self._telemetry_instance = NoOpTelemetryProvider() 

357 else: 

358 self._telemetry_instance = ProductionTelemetryProvider(self.settings) # type: ignore[assignment] 

359 

360 return self._telemetry_instance # type: ignore[return-value] 

361 

362 def get_auth(self) -> AuthProvider: 

363 """Get auth provider (lazy initialization)""" 

364 if not hasattr(self, "_auth_instance"): 364 ↛ 374line 364 didn't jump to line 374 because the condition on line 364 was always true

365 if self.config.environment == "test": 365 ↛ 367line 365 didn't jump to line 367 because the condition on line 365 was always true

366 self._auth_instance = NoOpAuthProvider() 

367 elif self.config.environment == "development" or not self.config.enable_auth: 

368 self._auth_instance = InMemoryAuthProvider() # type: ignore[assignment] 

369 else: 

370 # Production: use real auth (Keycloak, etc.) 

371 # TODO: Implement production auth provider 

372 self._auth_instance = InMemoryAuthProvider() # type: ignore[assignment] 

373 

374 return self._auth_instance 

375 

376 def get_storage(self) -> StorageProvider: 

377 """Get storage provider (lazy initialization)""" 

378 if not hasattr(self, "_storage_instance"): 

379 if self.config.environment == "test": 

380 self._storage_instance = MemoryStorageProvider() 

381 elif self.settings.redis_host: 

382 self._storage_instance = RedisStorageProvider(self.settings) # type: ignore[assignment] 

383 else: 

384 # Fallback to in-memory for development 

385 self._storage_instance = MemoryStorageProvider() 

386 

387 return self._storage_instance 

388 

389 

390# ============================================================================== 

391# Helper Functions 

392# ============================================================================== 

393 

394 

395def create_test_container(settings: Settings | None = None) -> ApplicationContainer: 

396 """ 

397 Create a container for testing 

398 

399 This container: 

400 - Uses no-op providers (no telemetry, no auth) 

401 - Uses in-memory storage 

402 - Has no global side effects 

403 - Can be created multiple times independently 

404 

405 Args: 

406 settings: Optional settings override 

407 

408 Returns: 

409 ApplicationContainer configured for testing 

410 

411 Example: 

412 container = create_test_container() 

413 agent = create_agent(container.settings, container.get_telemetry()) 

414 """ 

415 config = ContainerConfig(environment="test") 

416 return ApplicationContainer(config, settings=settings) 

417 

418 

419def create_development_container() -> ApplicationContainer: 

420 """ 

421 Create a container for local development 

422 

423 This container: 

424 - Uses real telemetry (optional) 

425 - Uses in-memory auth 

426 - Uses Redis if available, otherwise in-memory 

427 """ 

428 config = ContainerConfig(environment="development") 

429 return ApplicationContainer(config) 

430 

431 

432def create_production_container() -> ApplicationContainer: 

433 """ 

434 Create a container for production 

435 

436 This container: 

437 - Uses full telemetry 

438 - Uses real auth (Keycloak/OpenFGA) 

439 - Uses Redis storage 

440 - Enforces security requirements 

441 """ 

442 config = ContainerConfig(environment="production") 

443 return ApplicationContainer(config)