Coverage for src / mcp_server_langgraph / core / container.py: 56%
188 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Dependency Injection Container for MCP Server LangGraph
4This module provides a clean dependency injection pattern that:
51. Eliminates global singletons
62. Makes testing easier (injectable dependencies)
73. Supports multiple environments (test, dev, production)
84. Enables per-tenant/per-agent configuration
10Architecture:
11- ApplicationContainer: Main DI container
12- ContainerConfig: Configuration for the container
13- Provider abstractions: TelemetryProvider, AuthProvider, StorageProvider
14- Helper functions: create_test_container, create_production_container
15"""
17from __future__ import annotations
19import logging
20from dataclasses import dataclass, field
21from functools import cached_property
22from typing import Any, Protocol, runtime_checkable
24from mcp_server_langgraph.core.config import Settings
26# ==============================================================================
27# Container Configuration
28# ==============================================================================
31@dataclass
32class ContainerConfig:
33 """Configuration for the application container"""
35 environment: str = "development"
36 enable_telemetry: bool = field(default=False)
37 enable_auth: bool = field(default=False)
38 log_level: str = "INFO"
40 def __post_init__(self) -> None:
41 """Adjust defaults based on environment"""
42 if self.environment == "test":
43 # Test mode: disable telemetry and auth by default
44 self.enable_telemetry = False
45 self.enable_auth = False
46 elif self.environment == "production":
47 # Production mode: enable security features by default
48 self.enable_telemetry = True
49 self.enable_auth = True
52# ==============================================================================
53# Provider Protocols (Abstractions)
54# ==============================================================================
57@runtime_checkable
58class TelemetryProvider(Protocol):
59 """Protocol for telemetry providers"""
61 @property
62 def logger(self) -> logging.Logger:
63 """Get logger instance"""
64 ...
66 @property
67 def metrics(self) -> Any:
68 """Get metrics instance"""
69 ...
71 @property
72 def tracer(self) -> Any:
73 """Get tracer instance"""
74 ...
77@runtime_checkable
78class AuthProvider(Protocol):
79 """Protocol for authentication providers"""
81 def validate_token(self, token: str) -> bool:
82 """Validate authentication token"""
83 ...
85 def get_current_user(self, token: str) -> dict[str, Any]:
86 """Get current user from token"""
87 ...
90@runtime_checkable
91class StorageProvider(Protocol):
92 """Protocol for storage providers"""
94 def get(self, key: str) -> Any | None:
95 """Get value by key"""
96 ...
98 def set(self, key: str, value: Any) -> None:
99 """Set value by key"""
100 ...
102 def delete(self, key: str) -> None:
103 """Delete value by key"""
104 ...
107# ==============================================================================
108# No-Op Providers (for testing)
109# ==============================================================================
112class NoOpLogger:
113 """No-op logger that doesn't output anything"""
115 def info(self, msg: str, *args: Any, **kwargs: Any) -> None:
116 pass
118 def debug(self, msg: str, *args: Any, **kwargs: Any) -> None:
119 pass
121 def warning(self, msg: str, *args: Any, **kwargs: Any) -> None:
122 pass
124 def error(self, msg: str, *args: Any, **kwargs: Any) -> None:
125 pass
127 def critical(self, msg: str, *args: Any, **kwargs: Any) -> None:
128 pass
131class NoOpMetrics:
132 """No-op metrics that doesn't collect anything"""
134 def counter(self, name: str, value: int = 1, **kwargs: Any) -> None:
135 pass
137 def gauge(self, name: str, value: float, **kwargs: Any) -> None:
138 pass
140 def histogram(self, name: str, value: float, **kwargs: Any) -> None:
141 pass
144class NoOpTracer:
145 """No-op tracer that doesn't trace anything"""
147 def start_as_current_span(self, name: str, **kwargs: Any) -> Any:
148 """Context manager that does nothing"""
149 from contextlib import nullcontext
151 return nullcontext()
154class NoOpTelemetryProvider:
155 """No-op telemetry provider for testing"""
157 def __init__(self) -> None:
158 self._logger = NoOpLogger()
159 self._metrics = NoOpMetrics()
160 self._tracer = NoOpTracer()
162 @property
163 def logger(self) -> NoOpLogger:
164 return self._logger
166 @property
167 def metrics(self) -> NoOpMetrics:
168 return self._metrics
170 @property
171 def tracer(self) -> NoOpTracer:
172 return self._tracer
175class NoOpAuthProvider:
176 """No-op auth provider for testing"""
178 def validate_token(self, token: str) -> bool:
179 """Accept any token in test mode"""
180 return True
182 def get_current_user(self, token: str) -> dict[str, Any]:
183 """Return mock user"""
184 return {"user_id": "test-user", "username": "testuser", "email": "test@example.com"}
187class MemoryStorageProvider:
188 """In-memory storage provider for testing"""
190 def __init__(self) -> None:
191 self._store: dict[str, Any] = {}
193 def get(self, key: str) -> Any | None:
194 return self._store.get(key)
196 def set(self, key: str, value: Any) -> None:
197 self._store[key] = value
199 def delete(self, key: str) -> None:
200 self._store.pop(key, None)
203# ==============================================================================
204# Production Providers
205# ==============================================================================
208class ProductionTelemetryProvider:
209 """Production telemetry provider using actual OpenTelemetry"""
211 def __init__(self, settings: Settings):
212 self.settings = settings
213 self._logger: logging.Logger | None = None
214 self._metrics: Any | None = None
215 self._tracer: Any | None = None
217 @property
218 def logger(self) -> logging.Logger:
219 if self._logger is None:
220 # Use existing telemetry module's logger
221 from mcp_server_langgraph.observability.telemetry import logger
223 self._logger = logger
224 return self._logger
226 @property
227 def metrics(self) -> Any:
228 if self._metrics is None:
229 # Use existing telemetry module's metrics
230 from mcp_server_langgraph.observability.telemetry import metrics
232 self._metrics = metrics
233 return self._metrics
235 @property
236 def tracer(self) -> Any:
237 if self._tracer is None:
238 # Use existing telemetry module's tracer
239 from mcp_server_langgraph.observability.telemetry import tracer
241 self._tracer = tracer
242 return self._tracer
245class InMemoryAuthProvider:
246 """In-memory auth provider for development"""
248 def __init__(self) -> None:
249 self._tokens: dict[str, dict[str, Any]] = {}
251 def create_token(self, user_id: str, username: str, **kwargs: Any) -> str:
252 """Create a simple token (NOT cryptographically secure - dev only!)"""
253 import secrets
255 token = secrets.token_urlsafe(32)
256 self._tokens[token] = {"user_id": user_id, "username": username, **kwargs}
257 return token
259 def validate_token(self, token: str) -> bool:
260 return token in self._tokens
262 def get_current_user(self, token: str) -> dict[str, Any]:
263 return self._tokens.get(token, {})
266class RedisStorageProvider:
267 """Redis storage provider for production"""
269 def __init__(self, settings: Settings) -> None:
270 self.settings = settings
271 self._client: Any | None = None
273 def _get_client(self) -> Any:
274 """Lazy Redis client initialization"""
275 if self._client is None:
276 import redis
278 self._client = redis.Redis(host=self.settings.redis_host, port=self.settings.redis_port, decode_responses=True)
279 return self._client
281 def get(self, key: str) -> Any | None:
282 import json
284 value = self._get_client().get(key)
285 return json.loads(value) if value else None
287 def set(self, key: str, value: Any) -> None:
288 import json
290 self._get_client().set(key, json.dumps(value))
292 def delete(self, key: str) -> None:
293 self._get_client().delete(key)
296# ==============================================================================
297# Application Container
298# ==============================================================================
301class ApplicationContainer:
302 """
303 Main dependency injection container
305 Provides:
306 - Settings (configuration)
307 - Telemetry (logging, metrics, tracing)
308 - Authentication
309 - Storage
310 - Agent graph factory
311 - MCP server factory
313 Usage:
314 # Test mode
315 container = create_test_container()
317 # Development mode
318 config = ContainerConfig(environment="development")
319 container = ApplicationContainer(config)
321 # Production mode
322 container = create_production_container()
324 # Use dependencies
325 settings = container.settings
326 telemetry = container.get_telemetry()
327 auth = container.get_auth()
328 """
330 def __init__(self, config: ContainerConfig, settings: Settings | None = None):
331 """
332 Initialize the container
334 Args:
335 config: Container configuration
336 settings: Optional settings override (for testing)
337 """
338 self.config = config
339 self._settings = settings
340 # Note: Don't initialize _*_instance attributes here
341 # They are created on first access for true lazy initialization
343 @cached_property
344 def settings(self) -> Settings:
345 """Get settings (lazy initialization)"""
346 if self._settings is not None:
347 return self._settings
349 # Create settings based on environment
350 return Settings(environment=self.config.environment, log_level=self.config.log_level)
352 def get_telemetry(self) -> TelemetryProvider:
353 """Get telemetry provider (lazy initialization)"""
354 if not hasattr(self, "_telemetry_instance"): 354 ↛ 360line 354 didn't jump to line 360 because the condition on line 354 was always true
355 if self.config.environment == "test" or not self.config.enable_telemetry: 355 ↛ 358line 355 didn't jump to line 358 because the condition on line 355 was always true
356 self._telemetry_instance = NoOpTelemetryProvider()
357 else:
358 self._telemetry_instance = ProductionTelemetryProvider(self.settings) # type: ignore[assignment]
360 return self._telemetry_instance # type: ignore[return-value]
362 def get_auth(self) -> AuthProvider:
363 """Get auth provider (lazy initialization)"""
364 if not hasattr(self, "_auth_instance"): 364 ↛ 374line 364 didn't jump to line 374 because the condition on line 364 was always true
365 if self.config.environment == "test": 365 ↛ 367line 365 didn't jump to line 367 because the condition on line 365 was always true
366 self._auth_instance = NoOpAuthProvider()
367 elif self.config.environment == "development" or not self.config.enable_auth:
368 self._auth_instance = InMemoryAuthProvider() # type: ignore[assignment]
369 else:
370 # Production: use real auth (Keycloak, etc.)
371 # TODO: Implement production auth provider
372 self._auth_instance = InMemoryAuthProvider() # type: ignore[assignment]
374 return self._auth_instance
376 def get_storage(self) -> StorageProvider:
377 """Get storage provider (lazy initialization)"""
378 if not hasattr(self, "_storage_instance"):
379 if self.config.environment == "test":
380 self._storage_instance = MemoryStorageProvider()
381 elif self.settings.redis_host:
382 self._storage_instance = RedisStorageProvider(self.settings) # type: ignore[assignment]
383 else:
384 # Fallback to in-memory for development
385 self._storage_instance = MemoryStorageProvider()
387 return self._storage_instance
390# ==============================================================================
391# Helper Functions
392# ==============================================================================
395def create_test_container(settings: Settings | None = None) -> ApplicationContainer:
396 """
397 Create a container for testing
399 This container:
400 - Uses no-op providers (no telemetry, no auth)
401 - Uses in-memory storage
402 - Has no global side effects
403 - Can be created multiple times independently
405 Args:
406 settings: Optional settings override
408 Returns:
409 ApplicationContainer configured for testing
411 Example:
412 container = create_test_container()
413 agent = create_agent(container.settings, container.get_telemetry())
414 """
415 config = ContainerConfig(environment="test")
416 return ApplicationContainer(config, settings=settings)
419def create_development_container() -> ApplicationContainer:
420 """
421 Create a container for local development
423 This container:
424 - Uses real telemetry (optional)
425 - Uses in-memory auth
426 - Uses Redis if available, otherwise in-memory
427 """
428 config = ContainerConfig(environment="development")
429 return ApplicationContainer(config)
432def create_production_container() -> ApplicationContainer:
433 """
434 Create a container for production
436 This container:
437 - Uses full telemetry
438 - Uses real auth (Keycloak/OpenFGA)
439 - Uses Redis storage
440 - Enforces security requirements
441 """
442 config = ContainerConfig(environment="production")
443 return ApplicationContainer(config)