Coverage for src / mcp_server_langgraph / mcp / server_stdio.py: 52%

312 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2MCP Server implementation for LangGraph agent with OpenFGA and Infisical 

3 

4Implements Anthropic's best practices for writing tools for agents: 

5- Token-efficient responses with truncation 

6- Search-focused tools instead of list-all 

7- Response format control (concise vs detailed) 

8- Namespaced tools for clarity 

9- High-signal information in responses 

10""" 

11 

12import asyncio 

13import sys 

14import time 

15from typing import Any, Literal 

16 

17from langchain_core.messages import HumanMessage 

18from mcp.server import Server 

19from mcp.server.stdio import stdio_server 

20from mcp.types import Resource, TextContent, Tool 

21from pydantic import AnyUrl, BaseModel, Field 

22 

23from mcp_server_langgraph.auth.factory import create_auth_middleware 

24from mcp_server_langgraph.auth.middleware import AuthMiddleware 

25from mcp_server_langgraph.auth.openfga import OpenFGAClient 

26from mcp_server_langgraph.core.agent import AgentState, get_agent_graph 

27from mcp_server_langgraph.core.config import Settings, settings 

28from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer 

29from mcp_server_langgraph.utils.response_optimizer import format_response 

30 

31 

32class ChatInput(BaseModel): 

33 """ 

34 Input schema for agent_chat tool. 

35 

36 Follows Anthropic best practices: 

37 - Unambiguous parameter names (user_id not username, message not query) 

38 - Response format control for token efficiency 

39 - Clear field descriptions 

40 """ 

41 

42 message: str = Field(description="The user message to send to the agent", min_length=1, max_length=10000) 

43 token: str = Field( 

44 description=( 

45 "JWT authentication token. Obtain via /auth/login endpoint (HTTP) " 

46 "or external authentication service. Required for all tool calls." 

47 ) 

48 ) 

49 user_id: str = Field( 

50 description=( 

51 "User identifier for authentication and authorization. " 

52 "Accepts both plain usernames ('alice') and OpenFGA-prefixed IDs ('user:alice'). " 

53 "The system will normalize both formats automatically." 

54 ) 

55 ) 

56 thread_id: str | None = Field( 

57 default=None, description="Optional thread ID for conversation continuity (e.g., 'conv_123')" 

58 ) 

59 response_format: Literal["concise", "detailed"] = Field( 

60 default="concise", 

61 description=( 

62 "Response verbosity level. " 

63 "'concise' returns ~500 tokens (faster, less context). " 

64 "'detailed' returns ~2000 tokens (comprehensive, more context)." 

65 ), 

66 ) 

67 

68 # Backward compatibility - DEPRECATED 

69 username: str | None = Field( 

70 default=None, deprecated=True, description="DEPRECATED: Use 'user_id' instead. Maintained for backward compatibility." 

71 ) 

72 

73 @property 

74 def effective_user_id(self) -> str: 

75 """Get effective user ID, prioritizing user_id over deprecated username.""" 

76 return self.user_id if hasattr(self, "user_id") and self.user_id else (self.username or "") 

77 

78 

79class SearchConversationsInput(BaseModel): 

80 """Input schema for conversation_search tool.""" 

81 

82 query: str = Field( 

83 description="Search query to filter conversations. Empty string returns recent conversations.", 

84 min_length=0, 

85 max_length=500, 

86 ) 

87 token: str = Field( 

88 description=( 

89 "JWT authentication token. Obtain via /auth/login endpoint (HTTP) " 

90 "or external authentication service. Required for all tool calls." 

91 ) 

92 ) 

93 user_id: str = Field(description="User identifier for authentication and authorization") 

94 limit: int = Field(default=10, ge=1, le=50, description="Maximum number of conversations to return (1-50)") 

95 

96 # Backward compatibility - DEPRECATED 

97 username: str | None = Field( 

98 default=None, deprecated=True, description="DEPRECATED: Use 'user_id' instead. Maintained for backward compatibility." 

99 ) 

100 

101 @property 

102 def effective_user_id(self) -> str: 

103 """Get effective user ID, prioritizing user_id over deprecated username.""" 

104 return self.user_id if hasattr(self, "user_id") and self.user_id else (self.username or "") 

105 

106 

107class MCPAgentServer: 

108 """MCP Server exposing LangGraph agent with OpenFGA authorization""" 

109 

110 def __init__( 

111 self, 

112 openfga_client: OpenFGAClient | None = None, 

113 auth: AuthMiddleware | None = None, 

114 settings: Settings | None = None, 

115 ) -> None: 

116 """ 

117 Initialize MCP Agent Server with optional dependency injection. 

118 

119 Args: 

120 openfga_client: Optional OpenFGA client for authorization. 

121 If None, creates one from settings. 

122 auth: Optional pre-configured AuthMiddleware instance. 

123 If provided, this takes precedence over creating auth from settings. 

124 This enables dependency injection for testing and custom configurations. 

125 settings: Optional Settings instance for runtime configuration. 

126 If provided, enables dynamic feature toggling (e.g., code execution). 

127 If None, uses global settings. This allows tests to inject custom 

128 configuration without module reloading. 

129 

130 Example: 

131 # Default creation (production): 

132 server = MCPAgentServer() 

133 

134 # Custom auth injection (testing): 

135 custom_auth = AuthMiddleware(user_provider=custom_provider, ...) 

136 server = MCPAgentServer(auth=custom_auth) 

137 

138 # Custom settings injection (testing): 

139 test_settings = Settings(enable_code_execution=True) 

140 server = MCPAgentServer(settings=test_settings) 

141 

142 OpenAI Codex Finding (2025-11-16): 

143 =================================== 

144 Added `auth` parameter for constructor-based dependency injection. 

145 This allows tests to inject pre-configured AuthMiddleware with registered users, 

146 fixing the "user not found" failures in integration tests. 

147 

148 Added `settings` parameter (2025-11-16): 

149 ======================================== 

150 Enables runtime configuration without module reloading. Fixes code execution 

151 tool visibility issues in integration tests where ENABLE_CODE_EXECUTION env 

152 var changes didn't take effect due to module-level settings caching. 

153 """ 

154 # Store settings for runtime configuration 

155 # NOTE: When settings=None, we must reference the module-level 'settings' 

156 # imported at the top of this file. This allows tests to mock settings via 

157 # @patch("mcp_server_langgraph.mcp.server_stdio.settings", ...) 

158 # Using sys.modules[__name__] to avoid self-import (CodeQL py/import-own-module) 

159 self.settings = settings if settings is not None else sys.modules[__name__].settings 

160 

161 self.server = Server("langgraph-agent") 

162 

163 # Initialize OpenFGA client 

164 self.openfga = openfga_client or self._create_openfga_client() 

165 

166 # Initialize auth middleware 

167 if auth is not None: 

168 # Use injected auth (dependency injection pattern) 

169 logger.info("Using injected AuthMiddleware instance") 

170 self.auth = auth 

171 

172 # If injected auth doesn't have OpenFGA but we have a client, update it 

173 # This handles the case where auth is injected but openfga_client is also provided 

174 if self.openfga is not None and self.auth.openfga is None: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true

175 logger.info("Updating injected auth with provided OpenFGA client") 

176 self.auth.openfga = self.openfga 

177 else: 

178 # Create auth using factory (respects settings.auth_provider) 

179 # Validate JWT secret is configured for in-memory auth provider 

180 # Keycloak uses RS256 with JWKS (public key crypto) and doesn't need JWT_SECRET_KEY 

181 if self.settings.auth_provider == "inmemory" and not self.settings.jwt_secret_key: 

182 msg = ( 

183 "CRITICAL: JWT secret key not configured for in-memory auth provider. " 

184 "Set JWT_SECRET_KEY environment variable or configure via Infisical. " 

185 "The in-memory auth provider requires a secure secret key for HS256 token signing." 

186 ) 

187 raise ValueError(msg) 

188 

189 # SECURITY: Fail-closed pattern - require OpenFGA in production 

190 if self.settings.environment == "production" and self.openfga is None: 

191 msg = ( 

192 "CRITICAL: OpenFGA authorization is required in production mode. " 

193 "Configure OPENFGA_STORE_ID and OPENFGA_MODEL_ID environment variables, " 

194 "or set ENVIRONMENT=development for local testing. " 

195 "Fallback authorization is not secure enough for production use." 

196 ) 

197 raise ValueError(msg) 

198 

199 self.auth = create_auth_middleware(self.settings, openfga_client=self.openfga) 

200 

201 self._setup_handlers() 

202 

203 def _create_openfga_client(self, settings_override: Settings | None = None) -> OpenFGAClient | None: 

204 """ 

205 Create OpenFGA client from settings. 

206 

207 Args: 

208 settings_override: Optional settings override for testing/dependency injection. 

209 If None, uses global settings. 

210 

211 Returns: 

212 OpenFGAClient instance or None if not configured 

213 """ 

214 _settings = settings_override or settings 

215 

216 if _settings.openfga_store_id and _settings.openfga_model_id: 

217 logger.info( 

218 "Initializing OpenFGA client", 

219 extra={"store_id": _settings.openfga_store_id, "model_id": _settings.openfga_model_id}, 

220 ) 

221 return OpenFGAClient( 

222 api_url=_settings.openfga_api_url, 

223 store_id=_settings.openfga_store_id, 

224 model_id=_settings.openfga_model_id, 

225 ) 

226 else: 

227 logger.warning("OpenFGA not configured, authorization will use fallback mode") 

228 return None 

229 

230 async def list_tools_public(self) -> list[Tool]: 

231 """ 

232 Public method to list available tools (used for testing and external access). 

233 

234 Returns the same tools list as the MCP protocol handler. 

235 """ 

236 tools = [ 

237 Tool( 

238 name="agent_chat", 

239 description=( 

240 "Chat with the AI agent for questions, research, and problem-solving. " 

241 "Returns responses optimized for agent consumption. " 

242 "Response format: 'concise' (~500 tokens, 2-5 sec) or 'detailed' (~2000 tokens, 5-10 sec). " 

243 "For specialized tasks like code execution or web search, use dedicated tools instead. " 

244 "Rate limit: 60 requests/minute per user." 

245 ), 

246 inputSchema=ChatInput.model_json_schema(), 

247 ), 

248 Tool( 

249 name="conversation_get", 

250 description=( 

251 "Retrieve a specific conversation thread by ID. " 

252 "Returns conversation history with messages, participants, and metadata. " 

253 "Response time: <1 second. " 

254 "Use conversation_search to find conversation IDs first." 

255 ), 

256 inputSchema={ 

257 "type": "object", 

258 "properties": { 

259 "thread_id": { 

260 "type": "string", 

261 "description": "Conversation thread identifier (e.g., 'conv_abc123')", 

262 }, 

263 "token": { 

264 "type": "string", 

265 "description": "JWT authentication token. Required for all tool calls.", 

266 }, 

267 "user_id": { 

268 "type": "string", 

269 "description": "User identifier for authentication and authorization", 

270 }, 

271 "username": {"type": "string", "description": "DEPRECATED: Use 'user_id' instead"}, 

272 }, 

273 "required": ["thread_id", "token", "user_id"], 

274 }, 

275 ), 

276 Tool( 

277 name="conversation_search", 

278 description=( 

279 "Search conversations using keywords or filters. " 

280 "Returns matching conversations sorted by relevance. " 

281 "Much more efficient than listing all conversations. " 

282 "Response time: <2 seconds. " 

283 "Examples: 'project updates', 'conversations with alice', 'last week'. " 

284 "Results limited to 50 conversations max to prevent context overflow." 

285 ), 

286 inputSchema=SearchConversationsInput.model_json_schema(), 

287 ), 

288 ] 

289 

290 # Add search_tools for progressive discovery (Anthropic best practice) 

291 tools.append( 

292 Tool( 

293 name="search_tools", 

294 description=( 

295 "Search and discover available tools using progressive disclosure. " 

296 "Query by keyword or category instead of loading all tool definitions. " 

297 "Saves 98%+ tokens compared to list-all approach. " 

298 "Detail levels: minimal (name+desc), standard (+params), full (+schema). " 

299 "Categories: calculator, search, filesystem, execution. " 

300 "Response time: <1 second." 

301 ), 

302 inputSchema={ 

303 "type": "object", 

304 "properties": { 

305 "query": {"type": "string", "description": "Search query (keyword)"}, 

306 "category": {"type": "string", "description": "Tool category filter"}, 

307 "detail_level": { 

308 "type": "string", 

309 "enum": ["minimal", "standard", "full"], 

310 "description": "Level of detail in results", 

311 }, 

312 }, 

313 }, 

314 ) 

315 ) 

316 

317 # Add execute_python if code execution is enabled 

318 # Use runtime settings evaluation (not module-level cached value) 

319 if self.settings.enable_code_execution: 

320 from mcp_server_langgraph.tools.code_execution_tools import ExecutePythonInput 

321 

322 tools.append( 

323 Tool( 

324 name="execute_python", 

325 description=( 

326 "Execute Python code in a secure sandboxed environment. " 

327 "Security: Import whitelist, no eval/exec, resource limits (CPU, memory, timeout). " 

328 "Backends: docker-engine (local/dev) or kubernetes (production). " 

329 "Network: Configurable isolation (none/allowlist/unrestricted). " 

330 "Response time: 1-30 seconds depending on code complexity. " 

331 "Use for data processing, calculations, and Python-specific tasks." 

332 ), 

333 inputSchema=ExecutePythonInput.model_json_schema(), 

334 ) 

335 ) 

336 

337 return tools 

338 

339 async def call_tool_public(self, name: str, arguments: dict[str, Any]) -> list[TextContent]: 

340 """ 

341 Public method to call tools (used for testing and external access). 

342 

343 This method contains the core tool invocation logic including authentication, 

344 authorization, and routing. The MCP protocol handler delegates to this method. 

345 

346 Args: 

347 name: Name of the tool to call 

348 arguments: Tool arguments including 'token' for authentication 

349 

350 Returns: 

351 List of TextContent responses from the tool 

352 

353 Raises: 

354 PermissionError: If authentication or authorization fails 

355 ValueError: If the tool name is unknown 

356 """ 

357 with tracer.start_as_current_span("mcp.call_tool", attributes={"tool.name": name}) as span: 

358 from mcp_server_langgraph.core.security import sanitize_for_logging 

359 

360 logger.info(f"Tool called: {name}", extra={"tool": name, "args": sanitize_for_logging(arguments)}) 

361 metrics.tool_calls.add(1, {"tool": name}) 

362 

363 # SECURITY: Require JWT token for all tool calls 

364 token = arguments.get("token") 

365 

366 if not token: 

367 logger.warning("No authentication token provided") 

368 metrics.auth_failures.add(1) 

369 msg = ( 

370 "Authentication token required. Provide 'token' parameter with a valid JWT. " 

371 "Obtain token via /auth/login endpoint or external authentication service." 

372 ) 

373 raise PermissionError(msg) 

374 

375 # Verify JWT token 

376 token_verification = await self.auth.verify_token(token) 

377 

378 if not token_verification.valid: 

379 logger.warning("Token verification failed", extra={"error": token_verification.error}) 

380 metrics.auth_failures.add(1) 

381 msg = f"Invalid authentication token: {token_verification.error or 'token verification failed'}" 

382 raise PermissionError(msg) 

383 

384 # Extract user_id from validated token payload 

385 if not token_verification.payload or "sub" not in token_verification.payload: 

386 logger.error("Token payload missing 'sub' claim") 

387 metrics.auth_failures.add(1) 

388 msg = "Invalid token: missing user identifier" 

389 raise PermissionError(msg) 

390 

391 # Extract username with defensive fallback 

392 # Priority: preferred_username > username claim > sub parsing 

393 username = token_verification.payload.get("preferred_username") 

394 if not username: 

395 # Try 'username' claim (alternative standard claim) 

396 username = token_verification.payload.get("username") 

397 if not username: 

398 # Fallback: extract from sub if it's in "user:username" format 

399 sub = token_verification.payload.get("sub", "") 

400 if sub.startswith("user:"): 

401 username = sub.split(":", 1)[1] 

402 elif sub and ":" not in sub: 

403 # Log warning for UUID-style subs (may cause issues) 

404 logger.warning( 

405 f"Using sub as username fallback (may be UUID): {sub[:8]}...", 

406 extra={"sub_prefix": sub[:8]}, 

407 ) 

408 username = sub 

409 else: 

410 msg = "Invalid token: cannot extract username from claims" 

411 raise PermissionError(msg) 

412 

413 # Normalize user_id to "user:username" format for OpenFGA compatibility 

414 user_id = f"user:{username}" if not username.startswith("user:") else username 

415 span.set_attribute("user.id", user_id) 

416 

417 logger.info("User authenticated via token", extra={"user_id": user_id, "tool": name}) 

418 

419 # Check OpenFGA authorization 

420 resource = f"tool:{name}" 

421 

422 authorized = await self.auth.authorize(user_id=user_id, relation="executor", resource=resource) 

423 

424 if not authorized: 

425 logger.warning( 

426 "Authorization failed (OpenFGA)", 

427 extra={"user_id": user_id, "resource": resource, "relation": "executor"}, 

428 ) 

429 metrics.authz_failures.add(1, {"resource": resource}) 

430 msg = f"Not authorized to execute {resource}" 

431 raise PermissionError(msg) 

432 

433 logger.info("Authorization granted", extra={"user_id": user_id, "resource": resource}) 

434 

435 # Route to appropriate handler (with backward compatibility) 

436 if name == "agent_chat" or name == "chat": # Support old name for compatibility 

437 return await self._handle_chat(arguments, span, user_id) 

438 elif name == "conversation_get" or name == "get_conversation": 

439 return await self._handle_get_conversation(arguments, span, user_id) 

440 elif name == "conversation_search" or name == "list_conversations": 

441 return await self._handle_search_conversations(arguments, span, user_id) 

442 elif name == "search_tools": 

443 return await self._handle_search_tools(arguments, span) 

444 elif name == "execute_python": 

445 return await self._handle_execute_python(arguments, span, user_id) 

446 else: 

447 msg = f"Unknown tool: {name}" 

448 raise ValueError(msg) 

449 

450 def _setup_handlers(self) -> None: 

451 """Setup MCP protocol handlers""" 

452 

453 @self.server.list_tools() # type: ignore[no-untyped-call, untyped-decorator] 

454 async def list_tools() -> list[Tool]: 

455 """ 

456 List available tools. 

457 

458 Tools follow Anthropic best practices: 

459 - Namespaced for clarity (agent_*, conversation_*) 

460 - Search-focused instead of list-all 

461 - Clear usage guidance in descriptions 

462 - Token limits and expected response times documented 

463 """ 

464 with tracer.start_as_current_span("mcp.list_tools"): 

465 logger.info("Listing available tools") 

466 return await self.list_tools_public() 

467 

468 @self.server.call_tool() # type: ignore[untyped-decorator] 

469 async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]: 

470 """Handle tool calls with OpenFGA authorization and tracing""" 

471 return await self.call_tool_public(name, arguments) 

472 

473 @self.server.list_resources() # type: ignore[no-untyped-call, untyped-decorator] 

474 async def list_resources() -> list[Resource]: 

475 """List available resources""" 

476 with tracer.start_as_current_span("mcp.list_resources"): 

477 return [Resource(uri=AnyUrl("agent://config"), name="Agent Configuration", mimeType="application/json")] 

478 

479 async def _handle_chat(self, arguments: dict[str, Any], span: Any, user_id: str) -> list[TextContent]: 

480 """ 

481 Handle agent_chat tool invocation. 

482 

483 Implements Anthropic best practices: 

484 - Response format control (concise vs detailed) 

485 - Token-efficient responses with truncation 

486 - Clear error messages 

487 - Performance tracking 

488 """ 

489 with tracer.start_as_current_span("agent.chat"): 

490 # BUGFIX: Validate input with Pydantic schema to enforce length limits and required fields 

491 try: 

492 chat_input = ChatInput.model_validate(arguments) 

493 except Exception as e: 

494 logger.error(f"Invalid chat input: {e}", extra={"arguments": arguments}) 

495 msg = f"Invalid chat input: {e}" 

496 raise ValueError(msg) 

497 

498 message = chat_input.message 

499 thread_id = chat_input.thread_id or "default" 

500 response_format_type = chat_input.response_format 

501 

502 span.set_attribute("message.length", len(message)) 

503 span.set_attribute("thread.id", thread_id) 

504 span.set_attribute("user.id", user_id) 

505 span.set_attribute("response.format", response_format_type) 

506 

507 # Check if user can access this conversation 

508 # BUGFIX: Allow first-time conversation creation without pre-existing OpenFGA tuples 

509 # For new conversations, we short-circuit authorization and will seed ownership after creation 

510 conversation_resource = f"conversation:{thread_id}" 

511 

512 # Check if conversation exists by trying to get state from checkpointer 

513 graph = get_agent_graph() # type: ignore[func-returns-value] 

514 conversation_exists = False 

515 if hasattr(graph, "checkpointer") and graph.checkpointer is not None: 515 ↛ 525line 515 didn't jump to line 525 because the condition on line 515 was always true

516 try: 

517 config = {"configurable": {"thread_id": thread_id}} 

518 state_snapshot = await graph.aget_state(config) 

519 conversation_exists = state_snapshot is not None and state_snapshot.values is not None 

520 except Exception: 

521 # If we can't check, assume it doesn't exist (fail-open for creation) 

522 conversation_exists = False 

523 

524 # Only check authorization for existing conversations 

525 if conversation_exists: 

526 can_edit = await self.auth.authorize(user_id=user_id, relation="editor", resource=conversation_resource) 

527 if not can_edit: 

528 logger.warning("User cannot edit conversation", extra={"user_id": user_id, "thread_id": thread_id}) 

529 msg = ( 

530 f"Not authorized to edit conversation '{thread_id}'. " 

531 f"Request access from conversation owner or use a different thread_id." 

532 ) 

533 raise PermissionError(msg) 

534 else: 

535 # New conversation - user becomes implicit owner (OpenFGA tuples should be seeded after creation) 

536 logger.info( 

537 "Creating new conversation, user granted implicit ownership", 

538 extra={"user_id": user_id, "thread_id": thread_id}, 

539 ) 

540 

541 logger.info( 

542 "Processing chat message", 

543 extra={ 

544 "thread_id": thread_id, 

545 "user_id": user_id, 

546 "message_preview": message[:100], 

547 "response_format": response_format_type, 

548 }, 

549 ) 

550 

551 # Create initial state with proper LangChain message objects 

552 initial_state: AgentState = { 

553 "messages": [HumanMessage(content=message)], # Use HumanMessage, not dict 

554 "next_action": "", 

555 "user_id": user_id, 

556 "request_id": str(span.get_span_context().trace_id) if span.get_span_context() else None, 

557 "routing_confidence": None, 

558 "reasoning": None, 

559 "compaction_applied": None, 

560 "original_message_count": None, 

561 "verification_passed": None, 

562 "verification_score": None, 

563 "verification_feedback": None, 

564 "refinement_attempts": None, 

565 "user_request": message, 

566 } 

567 

568 # Run the agent graph 

569 config = {"configurable": {"thread_id": thread_id}} 

570 

571 try: 

572 result = await get_agent_graph().ainvoke(initial_state, config) # type: ignore[func-returns-value] 

573 

574 # Extract response 

575 response_message = result["messages"][-1] 

576 response_text = response_message.content 

577 

578 # Apply response formatting based on format type 

579 # Follows Anthropic guidance: offer response_format enum parameter 

580 formatted_response = format_response(response_text, format_type=response_format_type) 

581 

582 span.set_attribute("response.length.original", len(response_text)) 

583 span.set_attribute("response.length.formatted", len(formatted_response)) 

584 metrics.successful_calls.add(1, {"tool": "agent_chat", "format": response_format_type}) 

585 

586 logger.info( 

587 "Chat response generated", 

588 extra={ 

589 "thread_id": thread_id, 

590 "original_length": len(response_text), 

591 "formatted_length": len(formatted_response), 

592 "format": response_format_type, 

593 }, 

594 ) 

595 

596 # Record conversation metadata in store for search functionality 

597 try: 

598 from mcp_server_langgraph.core.storage.conversation_store import get_conversation_store 

599 

600 store = get_conversation_store() 

601 # Count messages in result 

602 message_count = len(result.get("messages", [])) 

603 # Extract title from first few words of user message 

604 title = message[:50] + "..." if len(message) > 50 else message 

605 

606 await store.record_conversation( 

607 thread_id=thread_id, user_id=user_id, message_count=message_count, title=title 

608 ) 

609 

610 logger.debug(f"Recorded conversation metadata for {thread_id}") 

611 except Exception as e: 

612 # Non-critical - don't fail the request 

613 logger.debug(f"Failed to record conversation metadata: {e}") 

614 

615 return [TextContent(type="text", text=formatted_response)] 

616 

617 except Exception as e: 

618 logger.error(f"Error processing chat: {e}", extra={"error": str(e), "thread_id": thread_id}, exc_info=True) 

619 metrics.failed_calls.add(1, {"tool": "agent_chat", "error": type(e).__name__}) 

620 span.record_exception(e) 

621 raise 

622 

623 async def _handle_get_conversation(self, arguments: dict[str, Any], span: Any, user_id: str) -> list[TextContent]: 

624 """Retrieve conversation history from checkpointer""" 

625 with tracer.start_as_current_span("agent.get_conversation"): 

626 thread_id = arguments["thread_id"] 

627 

628 # Check if user can view this conversation 

629 conversation_resource = f"conversation:{thread_id}" 

630 

631 can_view = await self.auth.authorize(user_id=user_id, relation="viewer", resource=conversation_resource) 

632 

633 if not can_view: 

634 logger.warning("User cannot view conversation", extra={"user_id": user_id, "thread_id": thread_id}) 

635 msg = f"Not authorized to view conversation {thread_id}" 

636 raise PermissionError(msg) 

637 

638 # Retrieve conversation state from checkpointer 

639 try: 

640 # Get the checkpointer from agent_graph 

641 graph = get_agent_graph() # type: ignore[func-returns-value] 

642 if not hasattr(graph, "checkpointer") or graph.checkpointer is None: 

643 logger.warning("Checkpointing not enabled, cannot retrieve conversation history") 

644 return [ 

645 TextContent( 

646 type="text", 

647 text=f"Conversation history not available for thread {thread_id}. " 

648 "Checkpointing is disabled. Enable it by setting ENABLE_CHECKPOINTING=true.", 

649 ) 

650 ] 

651 

652 # Get state from checkpointer 

653 config = {"configurable": {"thread_id": thread_id}} 

654 state_snapshot = await graph.aget_state(config) 

655 

656 if not state_snapshot or not state_snapshot.values: 

657 logger.info("No conversation history found", extra={"thread_id": thread_id}) 

658 return [ 

659 TextContent( 

660 type="text", 

661 text=f"No conversation history found for thread {thread_id}. " 

662 "This thread may not exist or has no messages yet.", 

663 ) 

664 ] 

665 

666 # Extract messages from state 

667 messages = state_snapshot.values.get("messages", []) 

668 

669 if not messages: 

670 return [ 

671 TextContent( 

672 type="text", 

673 text=f"Thread {thread_id} exists but has no messages yet.", 

674 ) 

675 ] 

676 

677 # Format messages for display 

678 formatted_messages = [] 

679 for i, msg in enumerate(messages, 1): 

680 role = "unknown" 

681 content = str(msg) 

682 

683 if hasattr(msg, "type"): 683 ↛ 685line 683 didn't jump to line 685 because the condition on line 683 was always true

684 role = msg.type 

685 elif hasattr(msg, "__class__"): 

686 role = msg.__class__.__name__.replace("Message", "").lower() 

687 

688 if hasattr(msg, "content"): 688 ↛ 691line 688 didn't jump to line 691 because the condition on line 688 was always true

689 content = msg.content 

690 

691 formatted_messages.append(f"{i}. [{role}] {content[:200]}{'...' if len(content) > 200 else ''}") 

692 

693 # Build response 

694 response_text = ( 

695 f"Conversation history for thread {thread_id}\n" 

696 f"Total messages: {len(messages)}\n" 

697 f"User: {user_id}\n\n" 

698 f"Messages:\n" + "\n".join(formatted_messages) 

699 ) 

700 

701 logger.info( 

702 "Retrieved conversation history", 

703 extra={"thread_id": thread_id, "message_count": len(messages), "user_id": user_id}, 

704 ) 

705 

706 return [TextContent(type="text", text=response_text)] 

707 

708 except Exception as e: 

709 logger.error(f"Failed to retrieve conversation: {e}", extra={"thread_id": thread_id}, exc_info=True) 

710 return [ 

711 TextContent( 

712 type="text", 

713 text=f"Error retrieving conversation {thread_id}: {e!s}. " 

714 "This may indicate a checkpointer issue or the conversation may not exist.", 

715 ) 

716 ] 

717 

718 async def _handle_search_conversations(self, arguments: dict[str, Any], span: Any, user_id: str) -> list[TextContent]: 

719 """ 

720 Search conversations (replacing list-all approach). 

721 

722 Implements Anthropic best practice: 

723 "Implement search-focused tools (like search_contacts) rather than 

724 list-all tools (list_contacts)" 

725 

726 Benefits: 

727 - Prevents context overflow with large conversation lists 

728 - Forces agents to be specific in their requests 

729 - More token-efficient 

730 - Better for users with many conversations 

731 """ 

732 with tracer.start_as_current_span("agent.search_conversations"): 

733 # BUGFIX: Validate input with Pydantic schema to enforce query length and limit constraints 

734 try: 

735 search_input = SearchConversationsInput.model_validate(arguments) 

736 except Exception as e: 

737 logger.error(f"Invalid search input: {e}", extra={"arguments": arguments}) 

738 msg = f"Invalid search input: {e}" 

739 raise ValueError(msg) 

740 

741 query = search_input.query 

742 limit = search_input.limit 

743 

744 span.set_attribute("search.query", query) 

745 span.set_attribute("search.limit", limit) 

746 

747 # Initialize all_conversations for logging 

748 all_conversations = [] 

749 

750 # Try to get conversations from OpenFGA first, fall back to conversation store 

751 try: 

752 # Get all conversations user can view from OpenFGA 

753 all_conversations = await self.auth.list_accessible_resources( 

754 user_id=user_id, relation="viewer", resource_type="conversation" 

755 ) 

756 

757 # Filter conversations based on query 

758 # Normalize query and conversation names to handle spaces/underscores/hyphens 

759 if query: 

760 normalized_query = query.lower().replace(" ", "_").replace("-", "_") 

761 filtered_conversations = [ 

762 conv 

763 for conv in all_conversations 

764 if ( 

765 query.lower() in conv.lower() 

766 or normalized_query in conv.lower().replace(" ", "_").replace("-", "_") 

767 ) 

768 ] 

769 else: 

770 filtered_conversations = all_conversations 

771 

772 except Exception: 

773 # Fall back to conversation store 

774 logger.info("Using conversation store for search (OpenFGA unavailable or returning mock data)") 

775 

776 try: 

777 from mcp_server_langgraph.core.storage.conversation_store import get_conversation_store 

778 

779 store = get_conversation_store() 

780 metadata_list = await store.search_conversations(user_id=user_id, query=query, limit=limit) 

781 

782 # Convert metadata to conversation IDs 

783 filtered_conversations = [f"conversation:{m.thread_id}" for m in metadata_list] 

784 

785 except Exception as e: 

786 logger.warning(f"Conversation store also unavailable: {e}") 

787 # Ultimate fallback: empty list 

788 filtered_conversations = [] 

789 

790 # Apply limit to prevent context overflow 

791 # Follows Anthropic guidance: "Restrict responses to ~25,000 tokens" 

792 limited_conversations = filtered_conversations[:limit] 

793 

794 # Build response with high-signal information 

795 # Avoid technical IDs where possible 

796 if not limited_conversations: 

797 response_text = ( 

798 f"No conversations found matching '{query}'. " 

799 f"Try a different search query or request access to more conversations." 

800 ) 

801 else: 

802 response_lines = [ 

803 ( 

804 f"Found {len(limited_conversations)} conversation(s) matching '{query}':" 

805 if query 

806 else f"Showing {len(limited_conversations)} recent conversation(s):" 

807 ) 

808 ] 

809 

810 for i, conv_id in enumerate(limited_conversations, 1): 

811 # Extract human-readable info from conversation ID 

812 # In production, fetch metadata like title, date, participants 

813 response_lines.append(f"{i}. {conv_id}") 

814 

815 # Add guidance if results were truncated 

816 if len(filtered_conversations) > limit: 

817 response_lines.append( 

818 f"\n[Showing {limit} of {len(filtered_conversations)} results. " 

819 f"Use a more specific query to narrow results.]" 

820 ) 

821 

822 response_text = "\n".join(response_lines) 

823 

824 logger.info( 

825 "Searched conversations", 

826 extra={ 

827 "user_id": user_id, 

828 "query": query, 

829 "total_accessible": len(all_conversations), 

830 "filtered_count": len(filtered_conversations), 

831 "returned_count": len(limited_conversations), 

832 }, 

833 ) 

834 

835 return [TextContent(type="text", text=response_text)] 

836 

837 async def _handle_search_tools(self, arguments: dict[str, Any], span: Any) -> list[TextContent]: 

838 """ 

839 Handle search_tools invocation for progressive tool discovery. 

840 

841 Implements Anthropic best practice for token-efficient tool discovery. 

842 """ 

843 with tracer.start_as_current_span("tools.search"): 

844 from mcp_server_langgraph.tools.tool_discovery import search_tools 

845 

846 # Extract arguments 

847 query = arguments.get("query") 

848 category = arguments.get("category") 

849 detail_level = arguments.get("detail_level", "minimal") 

850 

851 logger.info( 

852 "Searching tools", 

853 extra={"query": query, "category": category, "detail_level": detail_level}, 

854 ) 

855 

856 # Execute search_tools 

857 result = search_tools.invoke( 

858 { 

859 "query": query, 

860 "category": category, 

861 "detail_level": detail_level, 

862 } 

863 ) 

864 

865 span.set_attribute("tools.query", query or "") 

866 span.set_attribute("tools.category", category or "") 

867 span.set_attribute("tools.detail_level", detail_level) 

868 

869 return [TextContent(type="text", text=result)] 

870 

871 async def _handle_execute_python(self, arguments: dict[str, Any], span: Any, user_id: str) -> list[TextContent]: 

872 """ 

873 Handle execute_python invocation for secure code execution. 

874 

875 Implements sandboxed Python execution with validation and resource limits. 

876 """ 

877 with tracer.start_as_current_span("code.execute"): 

878 from mcp_server_langgraph.tools.code_execution_tools import execute_python 

879 

880 # Extract arguments 

881 code = arguments.get("code", "") 

882 timeout = arguments.get("timeout") 

883 

884 logger.info( 

885 "Executing Python code", 

886 extra={ 

887 "user_id": user_id, 

888 "code_length": len(code), 

889 "timeout": timeout, 

890 }, 

891 ) 

892 

893 # Execute code 

894 start_time = time.time() 

895 result = execute_python.invoke({"code": code, "timeout": timeout}) 

896 execution_time = time.time() - start_time 

897 

898 span.set_attribute("code.length", len(code)) 

899 span.set_attribute("code.execution_time", execution_time) 

900 span.set_attribute("code.success", "success" in result.lower()) 

901 

902 metrics.code_executions.add(1, {"user_id": user_id, "success": "success" in result.lower()}) 

903 

904 return [TextContent(type="text", text=result)] 

905 

906 async def run(self) -> None: 

907 """Run the MCP server""" 

908 logger.info("Starting MCP Agent Server") 

909 async with stdio_server() as (read_stream, write_stream): 

910 await self.server.run(read_stream, write_stream, self.server.create_initialization_options()) 

911 

912 

913async def main() -> None: 

914 """Main entry point""" 

915 # Initialize observability system before creating server 

916 from mcp_server_langgraph.core.config import settings 

917 from mcp_server_langgraph.observability.telemetry import init_observability 

918 

919 # Initialize with settings and enable file logging if configured 

920 init_observability(settings=settings, enable_file_logging=getattr(settings, "enable_file_logging", False)) 

921 

922 server = MCPAgentServer() 

923 await server.run() 

924 

925 

926if __name__ == "__main__": 

927 asyncio.run(main())