Coverage for src / mcp_server_langgraph / secrets / manager.py: 52%

179 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Infisical integration for secure secrets management 

3 

4This module provides: 

5- SecretString: A wrapper for secret values that prevents accidental exposure 

6- SecretsManager: Integration with Infisical for secure secrets retrieval 

7""" 

8 

9import hashlib 

10import logging 

11import os 

12from typing import TYPE_CHECKING, Any, Optional 

13 

14if TYPE_CHECKING: 

15 from opentelemetry.trace import Tracer 

16 

17 

18class SecretString: 

19 """Wrapper for secret values that prevents accidental exposure in logs and exceptions. 

20 

21 Implements the same pattern as Pydantic's SecretStr - the actual secret value is 

22 hidden in __str__ and __repr__ to prevent accidental logging or exposure in 

23 exception tracebacks. 

24 

25 Security Context: 

26 - CWE-200: Information Exposure 

27 - CWE-532: Information Exposure Through Log Files 

28 

29 Example: 

30 >>> secret = SecretString("my-api-key-123") 

31 >>> print(secret) 

32 ***REDACTED*** 

33 >>> f"Using key: {secret}" 

34 'Using key: ***REDACTED***' 

35 >>> secret.get_secret_value() 

36 'my-api-key-123' 

37 """ 

38 

39 __slots__ = ("_value",) 

40 

41 def __init__(self, value: str) -> None: 

42 """Initialize with a secret value. 

43 

44 Args: 

45 value: The secret value to wrap 

46 """ 

47 self._value = value 

48 

49 def __str__(self) -> str: 

50 """Return redacted placeholder instead of actual value.""" 

51 return "***REDACTED***" 

52 

53 def __repr__(self) -> str: 

54 """Return redacted representation.""" 

55 return "SecretString('***REDACTED***')" 

56 

57 def __eq__(self, other: object) -> bool: 

58 """Compare secret values for equality.""" 

59 if isinstance(other, SecretString): 

60 return self._value == other._value 

61 return False 

62 

63 def __hash__(self) -> int: 

64 """Hash based on actual value for use in sets/dicts.""" 

65 return hash(self._value) 

66 

67 def get_secret_value(self) -> str: 

68 """Get the actual secret value. 

69 

70 Use this method explicitly when you need to access the actual secret, 

71 such as when making API calls or database connections. 

72 

73 Returns: 

74 The actual secret value 

75 """ 

76 return self._value 

77 

78 

79# Conditional import - infisical-python is an optional dependency 

80try: 

81 from infisical_client import AuthenticationOptions, ClientSettings, InfisicalClient, UniversalAuthMethod 

82 

83 INFISICAL_AVAILABLE = True 

84except ImportError: 

85 INFISICAL_AVAILABLE = False 

86 

87# Use standard library logging until observability is initialized 

88# This breaks the circular import: config -> secrets.manager -> telemetry -> config 

89_stdlib_logger = logging.getLogger(__name__) 

90 

91 

92def _get_logger() -> logging.Logger: 

93 """ 

94 Get logger instance, preferring observability logger if initialized. 

95 

96 This allows secrets manager to work before observability initialization. 

97 """ 

98 try: 

99 from mcp_server_langgraph.observability.telemetry import get_logger, is_initialized 

100 

101 if is_initialized(): 

102 return get_logger() # type: ignore[no-any-return] 

103 except (ImportError, RuntimeError): 

104 pass 

105 return _stdlib_logger 

106 

107 

108def _get_tracer() -> Optional["Tracer"]: 

109 """ 

110 Get tracer instance if observability is initialized, otherwise return None. 

111 """ 

112 try: 

113 from mcp_server_langgraph.observability.telemetry import get_tracer, is_initialized 

114 

115 if is_initialized(): 

116 return get_tracer() # type: ignore[no-any-return] 

117 except (ImportError, RuntimeError): 

118 pass 

119 return None 

120 

121 

122# Log warning if infisical not available 

123if not INFISICAL_AVAILABLE: 123 ↛ 130line 123 didn't jump to line 130 because the condition on line 123 was always true

124 _stdlib_logger.warning( 

125 "infisical-python not installed - secrets will fall back to environment variables. " 

126 "Install with: pip install 'mcp-server-langgraph[secrets]'" 

127 ) 

128 

129 

130class SecretsManager: 

131 """ 

132 Infisical-based secrets manager 

133 

134 Provides secure retrieval and caching of secrets with automatic rotation support. 

135 """ 

136 

137 def __init__( 

138 self, 

139 site_url: str = "https://app.infisical.com", 

140 client_id: str | None = None, 

141 client_secret: str | None = None, 

142 project_id: str | None = None, 

143 environment: str = "dev", 

144 ): 

145 """ 

146 Initialize Infisical secrets manager 

147 

148 Args: 

149 site_url: Infisical server URL 

150 client_id: Universal auth client ID 

151 client_secret: Universal auth client secret 

152 project_id: Project ID 

153 environment: Environment slug (dev, staging, prod) 

154 """ 

155 self.site_url = site_url 

156 self.project_id = project_id or os.getenv("INFISICAL_PROJECT_ID") 

157 self.environment = environment 

158 self._cache: dict[str, Any] = {} 

159 

160 # Use environment variables if not provided 

161 client_id = client_id or os.getenv("INFISICAL_CLIENT_ID") 

162 client_secret = client_secret or os.getenv("INFISICAL_CLIENT_SECRET") 

163 

164 # Check if infisical-python is available 

165 if not INFISICAL_AVAILABLE: 165 ↛ 170line 165 didn't jump to line 170 because the condition on line 165 was always true

166 _get_logger().warning("infisical-python not installed, using fallback mode (environment variables)") 

167 self.client = None 

168 return 

169 

170 if not client_id or not client_secret: 

171 _get_logger().warning("Infisical credentials not provided, using fallback mode") 

172 self.client = None 

173 return 

174 

175 # Configure Infisical client 

176 try: 

177 self.client = InfisicalClient( 

178 ClientSettings( 

179 site_url=site_url, 

180 auth=AuthenticationOptions( 

181 universal_auth=UniversalAuthMethod(client_id=client_id, client_secret=client_secret) 

182 ), 

183 ) 

184 ) 

185 

186 _get_logger().info( 

187 "Infisical secrets manager initialized", extra={"site_url": site_url, "environment": environment} 

188 ) 

189 except Exception: 

190 # SECURITY: Do not log exception details - may contain credentials (CWE-532) 

191 _get_logger().error("Failed to initialize Infisical client") 

192 self.client = None 

193 

194 def __str__(self) -> str: 

195 """Return safe string representation without exposing secrets.""" 

196 return f"SecretsManager(site_url={self.site_url!r}, project_id={self.project_id!r}, environment={self.environment!r})" 

197 

198 def __repr__(self) -> str: 

199 """Return safe repr without exposing secrets.""" 

200 client_status = "connected" if self.client else "disconnected" 

201 return ( 

202 f"SecretsManager(site_url={self.site_url!r}, " 

203 f"project_id={self.project_id!r}, " 

204 f"environment={self.environment!r}, " 

205 f"client={client_status!r})" 

206 ) 

207 

208 def get_secret(self, key: str, path: str = "/", use_cache: bool = True, fallback: str | None = None) -> str | None: 

209 """ 

210 Get a secret from Infisical 

211 

212 Args: 

213 key: Secret key name 

214 path: Secret path (default: root) 

215 use_cache: Whether to use cached value 

216 fallback: Fallback value if secret not found 

217 

218 Returns: 

219 Secret value or fallback 

220 """ 

221 tracer = _get_tracer() 

222 # Use tracing if available, otherwise proceed without it 

223 if tracer: 

224 with tracer.start_as_current_span("secrets.get_secret") as span: 

225 return self._get_secret_impl(key, path, use_cache, fallback, span) 

226 else: 

227 return self._get_secret_impl(key, path, use_cache, fallback, None) 

228 

229 def _get_secret_impl(self, key: str, path: str, use_cache: bool, fallback: str | None, span: Any) -> str | None: 

230 """Implementation of get_secret with optional tracing span.""" 

231 if span: 

232 span.set_attribute("secret.key", key) 

233 span.set_attribute("secret.path", path) 

234 

235 cache_key = f"{path}:{key}" 

236 

237 # Check cache first 

238 if use_cache and cache_key in self._cache: 238 ↛ 239line 238 didn't jump to line 239 because the condition on line 238 was never true

239 _get_logger().debug("Secret retrieved from cache", extra={"key": key}) 

240 return self._cache[cache_key] # type: ignore[no-any-return] 

241 

242 # Fallback if client not initialized 

243 if not self.client: 243 ↛ 254line 243 didn't jump to line 254 because the condition on line 243 was always true

244 _get_logger().warning( 

245 "Infisical client not available, using fallback", 

246 extra={"key_hash": hashlib.sha256(key.encode()).hexdigest()[:8]}, 

247 ) 

248 # Try environment variable first 

249 env_value = os.getenv(key) 

250 if env_value: 

251 return env_value 

252 return fallback 

253 

254 try: 

255 secret = self.client.get_secret( 

256 secret_name=key, project_id=self.project_id, environment=self.environment, path=path 

257 ) 

258 

259 value = secret.secret_value 

260 

261 # Cache the value 

262 if use_cache: 

263 self._cache[cache_key] = value 

264 

265 _get_logger().info("Secret retrieved from Infisical", extra={"key": key, "path": path}) 

266 

267 if span: 

268 span.set_attribute("secret.found", True) 

269 return value # type: ignore[no-any-return] 

270 

271 except Exception as e: 

272 _get_logger().error( 

273 "Failed to retrieve secret", 

274 extra={"key_hash": hashlib.sha256(key.encode()).hexdigest()[:8], "path": path, "error_type": type(e).__name__}, 

275 exc_info=True, 

276 ) 

277 if span: 

278 span.record_exception(e) 

279 span.set_attribute("secret.found", False) 

280 

281 # Try environment variable fallback 

282 env_value = os.getenv(key) 

283 if env_value: 

284 _get_logger().info( 

285 "Using environment variable fallback", extra={"key_hash": hashlib.sha256(key.encode()).hexdigest()[:8]} 

286 ) 

287 return env_value 

288 

289 return fallback 

290 

291 def get_all_secrets(self, path: str = "/", use_cache: bool = True) -> dict[str, str]: 

292 """ 

293 Get all secrets from a path 

294 

295 Args: 

296 path: Secret path 

297 use_cache: Whether to use cached values 

298 

299 Returns: 

300 Dictionary of all secrets 

301 """ 

302 tracer = _get_tracer() 

303 if tracer: 303 ↛ 307line 303 didn't jump to line 307 because the condition on line 303 was always true

304 with tracer.start_as_current_span("secrets.get_all_secrets"): 

305 return self._get_all_secrets_impl(path, use_cache) 

306 else: 

307 return self._get_all_secrets_impl(path, use_cache) 

308 

309 def _get_all_secrets_impl(self, path: str, use_cache: bool) -> dict[str, str]: 

310 """Implementation of get_all_secrets.""" 

311 if not self.client: 311 ↛ 315line 311 didn't jump to line 315 because the condition on line 311 was always true

312 _get_logger().warning("Infisical client not available") 

313 return {} 

314 

315 try: 

316 secrets = self.client.list_secrets(project_id=self.project_id, environment=self.environment, path=path) 

317 

318 result = {secret.secret_key: secret.secret_value for secret in secrets} 

319 

320 # Cache all secrets 

321 if use_cache: 

322 for key, value in result.items(): 

323 cache_key = f"{path}:{key}" 

324 self._cache[cache_key] = value 

325 

326 _get_logger().info("Retrieved all secrets from path", extra={"path": path, "count": len(result)}) 

327 

328 return result 

329 

330 except Exception as e: 

331 _get_logger().error(f"Failed to retrieve secrets from path '{path}': {e}", exc_info=True) 

332 return {} 

333 

334 def create_secret(self, key: str, value: str, path: str = "/", secret_comment: str | None = None) -> bool: 

335 """ 

336 Create a new secret in Infisical 

337 

338 Args: 

339 key: Secret key name 

340 value: Secret value 

341 path: Secret path 

342 secret_comment: Optional comment 

343 

344 Returns: 

345 True if successful 

346 """ 

347 if not self.client: 347 ↛ 351line 347 didn't jump to line 351 because the condition on line 347 was always true

348 _get_logger().error("Infisical client not available") 

349 return False 

350 

351 try: 

352 self.client.create_secret( 

353 secret_name=key, 

354 secret_value=value, 

355 project_id=self.project_id, 

356 environment=self.environment, 

357 path=path, 

358 secret_comment=secret_comment, 

359 ) 

360 

361 # Update cache 

362 cache_key = f"{path}:{key}" 

363 self._cache[cache_key] = value 

364 

365 _get_logger().info("Secret created in Infisical", extra={"key": key, "path": path}) 

366 

367 return True 

368 

369 except Exception as e: 

370 _get_logger().error(f"Failed to create secret '{key}': {e}", exc_info=True) 

371 return False 

372 

373 def update_secret(self, key: str, value: str, path: str = "/") -> bool: 

374 """ 

375 Update an existing secret 

376 

377 Args: 

378 key: Secret key name 

379 value: New secret value 

380 path: Secret path 

381 

382 Returns: 

383 True if successful 

384 """ 

385 if not self.client: 385 ↛ 389line 385 didn't jump to line 389 because the condition on line 385 was always true

386 _get_logger().error("Infisical client not available") 

387 return False 

388 

389 try: 

390 self.client.update_secret( 

391 secret_name=key, secret_value=value, project_id=self.project_id, environment=self.environment, path=path 

392 ) 

393 

394 # Update cache 

395 cache_key = f"{path}:{key}" 

396 self._cache[cache_key] = value 

397 

398 _get_logger().info("Secret updated in Infisical", extra={"key": key, "path": path}) 

399 

400 return True 

401 

402 except Exception as e: 

403 _get_logger().error(f"Failed to update secret '{key}': {e}", exc_info=True) 

404 return False 

405 

406 def delete_secret(self, key: str, path: str = "/") -> bool: 

407 """ 

408 Delete a secret 

409 

410 Args: 

411 key: Secret key name 

412 path: Secret path 

413 

414 Returns: 

415 True if successful 

416 """ 

417 if not self.client: 417 ↛ 421line 417 didn't jump to line 421 because the condition on line 417 was always true

418 _get_logger().error("Infisical client not available") 

419 return False 

420 

421 try: 

422 self.client.delete_secret(secret_name=key, project_id=self.project_id, environment=self.environment, path=path) 

423 

424 # Remove from cache 

425 cache_key = f"{path}:{key}" 

426 self._cache.pop(cache_key, None) 

427 

428 _get_logger().info("Secret deleted from Infisical", extra={"key": key, "path": path}) 

429 

430 return True 

431 

432 except Exception as e: 

433 _get_logger().error(f"Failed to delete secret '{key}': {e}", exc_info=True) 

434 return False 

435 

436 def invalidate_cache(self, key: str | None = None) -> None: 

437 """ 

438 Invalidate secret cache 

439 

440 Args: 

441 key: Specific key to invalidate, or None for all 

442 """ 

443 if key: 

444 # Remove all cache entries for this key 

445 keys_to_remove = [k for k in self._cache if k.endswith(f":{key}")] 

446 for k in keys_to_remove: 

447 self._cache.pop(k, None) 

448 _get_logger().info(f"Cache invalidated for secret: {key}") 

449 else: 

450 self._cache.clear() 

451 _get_logger().info("All secret cache invalidated") 

452 

453 

454# Singleton instance 

455_secrets_manager: SecretsManager | None = None 

456 

457 

458def get_secrets_manager() -> SecretsManager: 

459 """Get or create the global secrets manager instance""" 

460 global _secrets_manager 

461 

462 if _secrets_manager is None: 

463 _secrets_manager = SecretsManager( 

464 site_url=os.getenv("INFISICAL_SITE_URL", "https://app.infisical.com"), environment=os.getenv("ENVIRONMENT", "dev") 

465 ) 

466 

467 return _secrets_manager