Coverage for src / mcp_server_langgraph / secrets / manager.py: 52%
179 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Infisical integration for secure secrets management
4This module provides:
5- SecretString: A wrapper for secret values that prevents accidental exposure
6- SecretsManager: Integration with Infisical for secure secrets retrieval
7"""
9import hashlib
10import logging
11import os
12from typing import TYPE_CHECKING, Any, Optional
14if TYPE_CHECKING:
15 from opentelemetry.trace import Tracer
18class SecretString:
19 """Wrapper for secret values that prevents accidental exposure in logs and exceptions.
21 Implements the same pattern as Pydantic's SecretStr - the actual secret value is
22 hidden in __str__ and __repr__ to prevent accidental logging or exposure in
23 exception tracebacks.
25 Security Context:
26 - CWE-200: Information Exposure
27 - CWE-532: Information Exposure Through Log Files
29 Example:
30 >>> secret = SecretString("my-api-key-123")
31 >>> print(secret)
32 ***REDACTED***
33 >>> f"Using key: {secret}"
34 'Using key: ***REDACTED***'
35 >>> secret.get_secret_value()
36 'my-api-key-123'
37 """
39 __slots__ = ("_value",)
41 def __init__(self, value: str) -> None:
42 """Initialize with a secret value.
44 Args:
45 value: The secret value to wrap
46 """
47 self._value = value
49 def __str__(self) -> str:
50 """Return redacted placeholder instead of actual value."""
51 return "***REDACTED***"
53 def __repr__(self) -> str:
54 """Return redacted representation."""
55 return "SecretString('***REDACTED***')"
57 def __eq__(self, other: object) -> bool:
58 """Compare secret values for equality."""
59 if isinstance(other, SecretString):
60 return self._value == other._value
61 return False
63 def __hash__(self) -> int:
64 """Hash based on actual value for use in sets/dicts."""
65 return hash(self._value)
67 def get_secret_value(self) -> str:
68 """Get the actual secret value.
70 Use this method explicitly when you need to access the actual secret,
71 such as when making API calls or database connections.
73 Returns:
74 The actual secret value
75 """
76 return self._value
79# Conditional import - infisical-python is an optional dependency
80try:
81 from infisical_client import AuthenticationOptions, ClientSettings, InfisicalClient, UniversalAuthMethod
83 INFISICAL_AVAILABLE = True
84except ImportError:
85 INFISICAL_AVAILABLE = False
87# Use standard library logging until observability is initialized
88# This breaks the circular import: config -> secrets.manager -> telemetry -> config
89_stdlib_logger = logging.getLogger(__name__)
92def _get_logger() -> logging.Logger:
93 """
94 Get logger instance, preferring observability logger if initialized.
96 This allows secrets manager to work before observability initialization.
97 """
98 try:
99 from mcp_server_langgraph.observability.telemetry import get_logger, is_initialized
101 if is_initialized():
102 return get_logger() # type: ignore[no-any-return]
103 except (ImportError, RuntimeError):
104 pass
105 return _stdlib_logger
108def _get_tracer() -> Optional["Tracer"]:
109 """
110 Get tracer instance if observability is initialized, otherwise return None.
111 """
112 try:
113 from mcp_server_langgraph.observability.telemetry import get_tracer, is_initialized
115 if is_initialized():
116 return get_tracer() # type: ignore[no-any-return]
117 except (ImportError, RuntimeError):
118 pass
119 return None
122# Log warning if infisical not available
123if not INFISICAL_AVAILABLE: 123 ↛ 130line 123 didn't jump to line 130 because the condition on line 123 was always true
124 _stdlib_logger.warning(
125 "infisical-python not installed - secrets will fall back to environment variables. "
126 "Install with: pip install 'mcp-server-langgraph[secrets]'"
127 )
130class SecretsManager:
131 """
132 Infisical-based secrets manager
134 Provides secure retrieval and caching of secrets with automatic rotation support.
135 """
137 def __init__(
138 self,
139 site_url: str = "https://app.infisical.com",
140 client_id: str | None = None,
141 client_secret: str | None = None,
142 project_id: str | None = None,
143 environment: str = "dev",
144 ):
145 """
146 Initialize Infisical secrets manager
148 Args:
149 site_url: Infisical server URL
150 client_id: Universal auth client ID
151 client_secret: Universal auth client secret
152 project_id: Project ID
153 environment: Environment slug (dev, staging, prod)
154 """
155 self.site_url = site_url
156 self.project_id = project_id or os.getenv("INFISICAL_PROJECT_ID")
157 self.environment = environment
158 self._cache: dict[str, Any] = {}
160 # Use environment variables if not provided
161 client_id = client_id or os.getenv("INFISICAL_CLIENT_ID")
162 client_secret = client_secret or os.getenv("INFISICAL_CLIENT_SECRET")
164 # Check if infisical-python is available
165 if not INFISICAL_AVAILABLE: 165 ↛ 170line 165 didn't jump to line 170 because the condition on line 165 was always true
166 _get_logger().warning("infisical-python not installed, using fallback mode (environment variables)")
167 self.client = None
168 return
170 if not client_id or not client_secret:
171 _get_logger().warning("Infisical credentials not provided, using fallback mode")
172 self.client = None
173 return
175 # Configure Infisical client
176 try:
177 self.client = InfisicalClient(
178 ClientSettings(
179 site_url=site_url,
180 auth=AuthenticationOptions(
181 universal_auth=UniversalAuthMethod(client_id=client_id, client_secret=client_secret)
182 ),
183 )
184 )
186 _get_logger().info(
187 "Infisical secrets manager initialized", extra={"site_url": site_url, "environment": environment}
188 )
189 except Exception:
190 # SECURITY: Do not log exception details - may contain credentials (CWE-532)
191 _get_logger().error("Failed to initialize Infisical client")
192 self.client = None
194 def __str__(self) -> str:
195 """Return safe string representation without exposing secrets."""
196 return f"SecretsManager(site_url={self.site_url!r}, project_id={self.project_id!r}, environment={self.environment!r})"
198 def __repr__(self) -> str:
199 """Return safe repr without exposing secrets."""
200 client_status = "connected" if self.client else "disconnected"
201 return (
202 f"SecretsManager(site_url={self.site_url!r}, "
203 f"project_id={self.project_id!r}, "
204 f"environment={self.environment!r}, "
205 f"client={client_status!r})"
206 )
208 def get_secret(self, key: str, path: str = "/", use_cache: bool = True, fallback: str | None = None) -> str | None:
209 """
210 Get a secret from Infisical
212 Args:
213 key: Secret key name
214 path: Secret path (default: root)
215 use_cache: Whether to use cached value
216 fallback: Fallback value if secret not found
218 Returns:
219 Secret value or fallback
220 """
221 tracer = _get_tracer()
222 # Use tracing if available, otherwise proceed without it
223 if tracer:
224 with tracer.start_as_current_span("secrets.get_secret") as span:
225 return self._get_secret_impl(key, path, use_cache, fallback, span)
226 else:
227 return self._get_secret_impl(key, path, use_cache, fallback, None)
229 def _get_secret_impl(self, key: str, path: str, use_cache: bool, fallback: str | None, span: Any) -> str | None:
230 """Implementation of get_secret with optional tracing span."""
231 if span:
232 span.set_attribute("secret.key", key)
233 span.set_attribute("secret.path", path)
235 cache_key = f"{path}:{key}"
237 # Check cache first
238 if use_cache and cache_key in self._cache: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true
239 _get_logger().debug("Secret retrieved from cache", extra={"key": key})
240 return self._cache[cache_key] # type: ignore[no-any-return]
242 # Fallback if client not initialized
243 if not self.client: 243 ↛ 254line 243 didn't jump to line 254 because the condition on line 243 was always true
244 _get_logger().warning(
245 "Infisical client not available, using fallback",
246 extra={"key_hash": hashlib.sha256(key.encode()).hexdigest()[:8]},
247 )
248 # Try environment variable first
249 env_value = os.getenv(key)
250 if env_value:
251 return env_value
252 return fallback
254 try:
255 secret = self.client.get_secret(
256 secret_name=key, project_id=self.project_id, environment=self.environment, path=path
257 )
259 value = secret.secret_value
261 # Cache the value
262 if use_cache:
263 self._cache[cache_key] = value
265 _get_logger().info("Secret retrieved from Infisical", extra={"key": key, "path": path})
267 if span:
268 span.set_attribute("secret.found", True)
269 return value # type: ignore[no-any-return]
271 except Exception as e:
272 _get_logger().error(
273 "Failed to retrieve secret",
274 extra={"key_hash": hashlib.sha256(key.encode()).hexdigest()[:8], "path": path, "error_type": type(e).__name__},
275 exc_info=True,
276 )
277 if span:
278 span.record_exception(e)
279 span.set_attribute("secret.found", False)
281 # Try environment variable fallback
282 env_value = os.getenv(key)
283 if env_value:
284 _get_logger().info(
285 "Using environment variable fallback", extra={"key_hash": hashlib.sha256(key.encode()).hexdigest()[:8]}
286 )
287 return env_value
289 return fallback
291 def get_all_secrets(self, path: str = "/", use_cache: bool = True) -> dict[str, str]:
292 """
293 Get all secrets from a path
295 Args:
296 path: Secret path
297 use_cache: Whether to use cached values
299 Returns:
300 Dictionary of all secrets
301 """
302 tracer = _get_tracer()
303 if tracer: 303 ↛ 307line 303 didn't jump to line 307 because the condition on line 303 was always true
304 with tracer.start_as_current_span("secrets.get_all_secrets"):
305 return self._get_all_secrets_impl(path, use_cache)
306 else:
307 return self._get_all_secrets_impl(path, use_cache)
309 def _get_all_secrets_impl(self, path: str, use_cache: bool) -> dict[str, str]:
310 """Implementation of get_all_secrets."""
311 if not self.client: 311 ↛ 315line 311 didn't jump to line 315 because the condition on line 311 was always true
312 _get_logger().warning("Infisical client not available")
313 return {}
315 try:
316 secrets = self.client.list_secrets(project_id=self.project_id, environment=self.environment, path=path)
318 result = {secret.secret_key: secret.secret_value for secret in secrets}
320 # Cache all secrets
321 if use_cache:
322 for key, value in result.items():
323 cache_key = f"{path}:{key}"
324 self._cache[cache_key] = value
326 _get_logger().info("Retrieved all secrets from path", extra={"path": path, "count": len(result)})
328 return result
330 except Exception as e:
331 _get_logger().error(f"Failed to retrieve secrets from path '{path}': {e}", exc_info=True)
332 return {}
334 def create_secret(self, key: str, value: str, path: str = "/", secret_comment: str | None = None) -> bool:
335 """
336 Create a new secret in Infisical
338 Args:
339 key: Secret key name
340 value: Secret value
341 path: Secret path
342 secret_comment: Optional comment
344 Returns:
345 True if successful
346 """
347 if not self.client: 347 ↛ 351line 347 didn't jump to line 351 because the condition on line 347 was always true
348 _get_logger().error("Infisical client not available")
349 return False
351 try:
352 self.client.create_secret(
353 secret_name=key,
354 secret_value=value,
355 project_id=self.project_id,
356 environment=self.environment,
357 path=path,
358 secret_comment=secret_comment,
359 )
361 # Update cache
362 cache_key = f"{path}:{key}"
363 self._cache[cache_key] = value
365 _get_logger().info("Secret created in Infisical", extra={"key": key, "path": path})
367 return True
369 except Exception as e:
370 _get_logger().error(f"Failed to create secret '{key}': {e}", exc_info=True)
371 return False
373 def update_secret(self, key: str, value: str, path: str = "/") -> bool:
374 """
375 Update an existing secret
377 Args:
378 key: Secret key name
379 value: New secret value
380 path: Secret path
382 Returns:
383 True if successful
384 """
385 if not self.client: 385 ↛ 389line 385 didn't jump to line 389 because the condition on line 385 was always true
386 _get_logger().error("Infisical client not available")
387 return False
389 try:
390 self.client.update_secret(
391 secret_name=key, secret_value=value, project_id=self.project_id, environment=self.environment, path=path
392 )
394 # Update cache
395 cache_key = f"{path}:{key}"
396 self._cache[cache_key] = value
398 _get_logger().info("Secret updated in Infisical", extra={"key": key, "path": path})
400 return True
402 except Exception as e:
403 _get_logger().error(f"Failed to update secret '{key}': {e}", exc_info=True)
404 return False
406 def delete_secret(self, key: str, path: str = "/") -> bool:
407 """
408 Delete a secret
410 Args:
411 key: Secret key name
412 path: Secret path
414 Returns:
415 True if successful
416 """
417 if not self.client: 417 ↛ 421line 417 didn't jump to line 421 because the condition on line 417 was always true
418 _get_logger().error("Infisical client not available")
419 return False
421 try:
422 self.client.delete_secret(secret_name=key, project_id=self.project_id, environment=self.environment, path=path)
424 # Remove from cache
425 cache_key = f"{path}:{key}"
426 self._cache.pop(cache_key, None)
428 _get_logger().info("Secret deleted from Infisical", extra={"key": key, "path": path})
430 return True
432 except Exception as e:
433 _get_logger().error(f"Failed to delete secret '{key}': {e}", exc_info=True)
434 return False
436 def invalidate_cache(self, key: str | None = None) -> None:
437 """
438 Invalidate secret cache
440 Args:
441 key: Specific key to invalidate, or None for all
442 """
443 if key:
444 # Remove all cache entries for this key
445 keys_to_remove = [k for k in self._cache if k.endswith(f":{key}")]
446 for k in keys_to_remove:
447 self._cache.pop(k, None)
448 _get_logger().info(f"Cache invalidated for secret: {key}")
449 else:
450 self._cache.clear()
451 _get_logger().info("All secret cache invalidated")
454# Singleton instance
455_secrets_manager: SecretsManager | None = None
458def get_secrets_manager() -> SecretsManager:
459 """Get or create the global secrets manager instance"""
460 global _secrets_manager
462 if _secrets_manager is None:
463 _secrets_manager = SecretsManager(
464 site_url=os.getenv("INFISICAL_SITE_URL", "https://app.infisical.com"), environment=os.getenv("ENVIRONMENT", "dev")
465 )
467 return _secrets_manager