Coverage for src / mcp_server_langgraph / mcp / server_stdio.py: 52%
312 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2MCP Server implementation for LangGraph agent with OpenFGA and Infisical
4Implements Anthropic's best practices for writing tools for agents:
5- Token-efficient responses with truncation
6- Search-focused tools instead of list-all
7- Response format control (concise vs detailed)
8- Namespaced tools for clarity
9- High-signal information in responses
10"""
12import asyncio
13import sys
14import time
15from typing import Any, Literal
17from langchain_core.messages import HumanMessage
18from mcp.server import Server
19from mcp.server.stdio import stdio_server
20from mcp.types import Resource, TextContent, Tool
21from pydantic import AnyUrl, BaseModel, Field
23from mcp_server_langgraph.auth.factory import create_auth_middleware
24from mcp_server_langgraph.auth.middleware import AuthMiddleware
25from mcp_server_langgraph.auth.openfga import OpenFGAClient
26from mcp_server_langgraph.core.agent import AgentState, get_agent_graph
27from mcp_server_langgraph.core.config import Settings, settings
28from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer
29from mcp_server_langgraph.utils.response_optimizer import format_response
32class ChatInput(BaseModel):
33 """
34 Input schema for agent_chat tool.
36 Follows Anthropic best practices:
37 - Unambiguous parameter names (user_id not username, message not query)
38 - Response format control for token efficiency
39 - Clear field descriptions
40 """
42 message: str = Field(description="The user message to send to the agent", min_length=1, max_length=10000)
43 token: str = Field(
44 description=(
45 "JWT authentication token. Obtain via /auth/login endpoint (HTTP) "
46 "or external authentication service. Required for all tool calls."
47 )
48 )
49 user_id: str = Field(
50 description=(
51 "User identifier for authentication and authorization. "
52 "Accepts both plain usernames ('alice') and OpenFGA-prefixed IDs ('user:alice'). "
53 "The system will normalize both formats automatically."
54 )
55 )
56 thread_id: str | None = Field(
57 default=None, description="Optional thread ID for conversation continuity (e.g., 'conv_123')"
58 )
59 response_format: Literal["concise", "detailed"] = Field(
60 default="concise",
61 description=(
62 "Response verbosity level. "
63 "'concise' returns ~500 tokens (faster, less context). "
64 "'detailed' returns ~2000 tokens (comprehensive, more context)."
65 ),
66 )
68 # Backward compatibility - DEPRECATED
69 username: str | None = Field(
70 default=None, deprecated=True, description="DEPRECATED: Use 'user_id' instead. Maintained for backward compatibility."
71 )
73 @property
74 def effective_user_id(self) -> str:
75 """Get effective user ID, prioritizing user_id over deprecated username."""
76 return self.user_id if hasattr(self, "user_id") and self.user_id else (self.username or "")
79class SearchConversationsInput(BaseModel):
80 """Input schema for conversation_search tool."""
82 query: str = Field(
83 description="Search query to filter conversations. Empty string returns recent conversations.",
84 min_length=0,
85 max_length=500,
86 )
87 token: str = Field(
88 description=(
89 "JWT authentication token. Obtain via /auth/login endpoint (HTTP) "
90 "or external authentication service. Required for all tool calls."
91 )
92 )
93 user_id: str = Field(description="User identifier for authentication and authorization")
94 limit: int = Field(default=10, ge=1, le=50, description="Maximum number of conversations to return (1-50)")
96 # Backward compatibility - DEPRECATED
97 username: str | None = Field(
98 default=None, deprecated=True, description="DEPRECATED: Use 'user_id' instead. Maintained for backward compatibility."
99 )
101 @property
102 def effective_user_id(self) -> str:
103 """Get effective user ID, prioritizing user_id over deprecated username."""
104 return self.user_id if hasattr(self, "user_id") and self.user_id else (self.username or "")
107class MCPAgentServer:
108 """MCP Server exposing LangGraph agent with OpenFGA authorization"""
110 def __init__(
111 self,
112 openfga_client: OpenFGAClient | None = None,
113 auth: AuthMiddleware | None = None,
114 settings: Settings | None = None,
115 ) -> None:
116 """
117 Initialize MCP Agent Server with optional dependency injection.
119 Args:
120 openfga_client: Optional OpenFGA client for authorization.
121 If None, creates one from settings.
122 auth: Optional pre-configured AuthMiddleware instance.
123 If provided, this takes precedence over creating auth from settings.
124 This enables dependency injection for testing and custom configurations.
125 settings: Optional Settings instance for runtime configuration.
126 If provided, enables dynamic feature toggling (e.g., code execution).
127 If None, uses global settings. This allows tests to inject custom
128 configuration without module reloading.
130 Example:
131 # Default creation (production):
132 server = MCPAgentServer()
134 # Custom auth injection (testing):
135 custom_auth = AuthMiddleware(user_provider=custom_provider, ...)
136 server = MCPAgentServer(auth=custom_auth)
138 # Custom settings injection (testing):
139 test_settings = Settings(enable_code_execution=True)
140 server = MCPAgentServer(settings=test_settings)
142 OpenAI Codex Finding (2025-11-16):
143 ===================================
144 Added `auth` parameter for constructor-based dependency injection.
145 This allows tests to inject pre-configured AuthMiddleware with registered users,
146 fixing the "user not found" failures in integration tests.
148 Added `settings` parameter (2025-11-16):
149 ========================================
150 Enables runtime configuration without module reloading. Fixes code execution
151 tool visibility issues in integration tests where ENABLE_CODE_EXECUTION env
152 var changes didn't take effect due to module-level settings caching.
153 """
154 # Store settings for runtime configuration
155 # NOTE: When settings=None, we must reference the module-level 'settings'
156 # imported at the top of this file. This allows tests to mock settings via
157 # @patch("mcp_server_langgraph.mcp.server_stdio.settings", ...)
158 # Using sys.modules[__name__] to avoid self-import (CodeQL py/import-own-module)
159 self.settings = settings if settings is not None else sys.modules[__name__].settings
161 self.server = Server("langgraph-agent")
163 # Initialize OpenFGA client
164 self.openfga = openfga_client or self._create_openfga_client()
166 # Initialize auth middleware
167 if auth is not None:
168 # Use injected auth (dependency injection pattern)
169 logger.info("Using injected AuthMiddleware instance")
170 self.auth = auth
172 # If injected auth doesn't have OpenFGA but we have a client, update it
173 # This handles the case where auth is injected but openfga_client is also provided
174 if self.openfga is not None and self.auth.openfga is None: 174 ↛ 175line 174 didn't jump to line 175 because the condition on line 174 was never true
175 logger.info("Updating injected auth with provided OpenFGA client")
176 self.auth.openfga = self.openfga
177 else:
178 # Create auth using factory (respects settings.auth_provider)
179 # Validate JWT secret is configured for in-memory auth provider
180 # Keycloak uses RS256 with JWKS (public key crypto) and doesn't need JWT_SECRET_KEY
181 if self.settings.auth_provider == "inmemory" and not self.settings.jwt_secret_key:
182 msg = (
183 "CRITICAL: JWT secret key not configured for in-memory auth provider. "
184 "Set JWT_SECRET_KEY environment variable or configure via Infisical. "
185 "The in-memory auth provider requires a secure secret key for HS256 token signing."
186 )
187 raise ValueError(msg)
189 # SECURITY: Fail-closed pattern - require OpenFGA in production
190 if self.settings.environment == "production" and self.openfga is None:
191 msg = (
192 "CRITICAL: OpenFGA authorization is required in production mode. "
193 "Configure OPENFGA_STORE_ID and OPENFGA_MODEL_ID environment variables, "
194 "or set ENVIRONMENT=development for local testing. "
195 "Fallback authorization is not secure enough for production use."
196 )
197 raise ValueError(msg)
199 self.auth = create_auth_middleware(self.settings, openfga_client=self.openfga)
201 self._setup_handlers()
203 def _create_openfga_client(self, settings_override: Settings | None = None) -> OpenFGAClient | None:
204 """
205 Create OpenFGA client from settings.
207 Args:
208 settings_override: Optional settings override for testing/dependency injection.
209 If None, uses global settings.
211 Returns:
212 OpenFGAClient instance or None if not configured
213 """
214 _settings = settings_override or settings
216 if _settings.openfga_store_id and _settings.openfga_model_id:
217 logger.info(
218 "Initializing OpenFGA client",
219 extra={"store_id": _settings.openfga_store_id, "model_id": _settings.openfga_model_id},
220 )
221 return OpenFGAClient(
222 api_url=_settings.openfga_api_url,
223 store_id=_settings.openfga_store_id,
224 model_id=_settings.openfga_model_id,
225 )
226 else:
227 logger.warning("OpenFGA not configured, authorization will use fallback mode")
228 return None
230 async def list_tools_public(self) -> list[Tool]:
231 """
232 Public method to list available tools (used for testing and external access).
234 Returns the same tools list as the MCP protocol handler.
235 """
236 tools = [
237 Tool(
238 name="agent_chat",
239 description=(
240 "Chat with the AI agent for questions, research, and problem-solving. "
241 "Returns responses optimized for agent consumption. "
242 "Response format: 'concise' (~500 tokens, 2-5 sec) or 'detailed' (~2000 tokens, 5-10 sec). "
243 "For specialized tasks like code execution or web search, use dedicated tools instead. "
244 "Rate limit: 60 requests/minute per user."
245 ),
246 inputSchema=ChatInput.model_json_schema(),
247 ),
248 Tool(
249 name="conversation_get",
250 description=(
251 "Retrieve a specific conversation thread by ID. "
252 "Returns conversation history with messages, participants, and metadata. "
253 "Response time: <1 second. "
254 "Use conversation_search to find conversation IDs first."
255 ),
256 inputSchema={
257 "type": "object",
258 "properties": {
259 "thread_id": {
260 "type": "string",
261 "description": "Conversation thread identifier (e.g., 'conv_abc123')",
262 },
263 "token": {
264 "type": "string",
265 "description": "JWT authentication token. Required for all tool calls.",
266 },
267 "user_id": {
268 "type": "string",
269 "description": "User identifier for authentication and authorization",
270 },
271 "username": {"type": "string", "description": "DEPRECATED: Use 'user_id' instead"},
272 },
273 "required": ["thread_id", "token", "user_id"],
274 },
275 ),
276 Tool(
277 name="conversation_search",
278 description=(
279 "Search conversations using keywords or filters. "
280 "Returns matching conversations sorted by relevance. "
281 "Much more efficient than listing all conversations. "
282 "Response time: <2 seconds. "
283 "Examples: 'project updates', 'conversations with alice', 'last week'. "
284 "Results limited to 50 conversations max to prevent context overflow."
285 ),
286 inputSchema=SearchConversationsInput.model_json_schema(),
287 ),
288 ]
290 # Add search_tools for progressive discovery (Anthropic best practice)
291 tools.append(
292 Tool(
293 name="search_tools",
294 description=(
295 "Search and discover available tools using progressive disclosure. "
296 "Query by keyword or category instead of loading all tool definitions. "
297 "Saves 98%+ tokens compared to list-all approach. "
298 "Detail levels: minimal (name+desc), standard (+params), full (+schema). "
299 "Categories: calculator, search, filesystem, execution. "
300 "Response time: <1 second."
301 ),
302 inputSchema={
303 "type": "object",
304 "properties": {
305 "query": {"type": "string", "description": "Search query (keyword)"},
306 "category": {"type": "string", "description": "Tool category filter"},
307 "detail_level": {
308 "type": "string",
309 "enum": ["minimal", "standard", "full"],
310 "description": "Level of detail in results",
311 },
312 },
313 },
314 )
315 )
317 # Add execute_python if code execution is enabled
318 # Use runtime settings evaluation (not module-level cached value)
319 if self.settings.enable_code_execution:
320 from mcp_server_langgraph.tools.code_execution_tools import ExecutePythonInput
322 tools.append(
323 Tool(
324 name="execute_python",
325 description=(
326 "Execute Python code in a secure sandboxed environment. "
327 "Security: Import whitelist, no eval/exec, resource limits (CPU, memory, timeout). "
328 "Backends: docker-engine (local/dev) or kubernetes (production). "
329 "Network: Configurable isolation (none/allowlist/unrestricted). "
330 "Response time: 1-30 seconds depending on code complexity. "
331 "Use for data processing, calculations, and Python-specific tasks."
332 ),
333 inputSchema=ExecutePythonInput.model_json_schema(),
334 )
335 )
337 return tools
339 async def call_tool_public(self, name: str, arguments: dict[str, Any]) -> list[TextContent]:
340 """
341 Public method to call tools (used for testing and external access).
343 This method contains the core tool invocation logic including authentication,
344 authorization, and routing. The MCP protocol handler delegates to this method.
346 Args:
347 name: Name of the tool to call
348 arguments: Tool arguments including 'token' for authentication
350 Returns:
351 List of TextContent responses from the tool
353 Raises:
354 PermissionError: If authentication or authorization fails
355 ValueError: If the tool name is unknown
356 """
357 with tracer.start_as_current_span("mcp.call_tool", attributes={"tool.name": name}) as span:
358 from mcp_server_langgraph.core.security import sanitize_for_logging
360 logger.info(f"Tool called: {name}", extra={"tool": name, "args": sanitize_for_logging(arguments)})
361 metrics.tool_calls.add(1, {"tool": name})
363 # SECURITY: Require JWT token for all tool calls
364 token = arguments.get("token")
366 if not token:
367 logger.warning("No authentication token provided")
368 metrics.auth_failures.add(1)
369 msg = (
370 "Authentication token required. Provide 'token' parameter with a valid JWT. "
371 "Obtain token via /auth/login endpoint or external authentication service."
372 )
373 raise PermissionError(msg)
375 # Verify JWT token
376 token_verification = await self.auth.verify_token(token)
378 if not token_verification.valid:
379 logger.warning("Token verification failed", extra={"error": token_verification.error})
380 metrics.auth_failures.add(1)
381 msg = f"Invalid authentication token: {token_verification.error or 'token verification failed'}"
382 raise PermissionError(msg)
384 # Extract user_id from validated token payload
385 if not token_verification.payload or "sub" not in token_verification.payload:
386 logger.error("Token payload missing 'sub' claim")
387 metrics.auth_failures.add(1)
388 msg = "Invalid token: missing user identifier"
389 raise PermissionError(msg)
391 # Extract username with defensive fallback
392 # Priority: preferred_username > username claim > sub parsing
393 username = token_verification.payload.get("preferred_username")
394 if not username:
395 # Try 'username' claim (alternative standard claim)
396 username = token_verification.payload.get("username")
397 if not username:
398 # Fallback: extract from sub if it's in "user:username" format
399 sub = token_verification.payload.get("sub", "")
400 if sub.startswith("user:"):
401 username = sub.split(":", 1)[1]
402 elif sub and ":" not in sub:
403 # Log warning for UUID-style subs (may cause issues)
404 logger.warning(
405 f"Using sub as username fallback (may be UUID): {sub[:8]}...",
406 extra={"sub_prefix": sub[:8]},
407 )
408 username = sub
409 else:
410 msg = "Invalid token: cannot extract username from claims"
411 raise PermissionError(msg)
413 # Normalize user_id to "user:username" format for OpenFGA compatibility
414 user_id = f"user:{username}" if not username.startswith("user:") else username
415 span.set_attribute("user.id", user_id)
417 logger.info("User authenticated via token", extra={"user_id": user_id, "tool": name})
419 # Check OpenFGA authorization
420 resource = f"tool:{name}"
422 authorized = await self.auth.authorize(user_id=user_id, relation="executor", resource=resource)
424 if not authorized:
425 logger.warning(
426 "Authorization failed (OpenFGA)",
427 extra={"user_id": user_id, "resource": resource, "relation": "executor"},
428 )
429 metrics.authz_failures.add(1, {"resource": resource})
430 msg = f"Not authorized to execute {resource}"
431 raise PermissionError(msg)
433 logger.info("Authorization granted", extra={"user_id": user_id, "resource": resource})
435 # Route to appropriate handler (with backward compatibility)
436 if name == "agent_chat" or name == "chat": # Support old name for compatibility
437 return await self._handle_chat(arguments, span, user_id)
438 elif name == "conversation_get" or name == "get_conversation":
439 return await self._handle_get_conversation(arguments, span, user_id)
440 elif name == "conversation_search" or name == "list_conversations":
441 return await self._handle_search_conversations(arguments, span, user_id)
442 elif name == "search_tools":
443 return await self._handle_search_tools(arguments, span)
444 elif name == "execute_python":
445 return await self._handle_execute_python(arguments, span, user_id)
446 else:
447 msg = f"Unknown tool: {name}"
448 raise ValueError(msg)
450 def _setup_handlers(self) -> None:
451 """Setup MCP protocol handlers"""
453 @self.server.list_tools() # type: ignore[no-untyped-call, untyped-decorator]
454 async def list_tools() -> list[Tool]:
455 """
456 List available tools.
458 Tools follow Anthropic best practices:
459 - Namespaced for clarity (agent_*, conversation_*)
460 - Search-focused instead of list-all
461 - Clear usage guidance in descriptions
462 - Token limits and expected response times documented
463 """
464 with tracer.start_as_current_span("mcp.list_tools"):
465 logger.info("Listing available tools")
466 return await self.list_tools_public()
468 @self.server.call_tool() # type: ignore[untyped-decorator]
469 async def call_tool(name: str, arguments: dict[str, Any]) -> list[TextContent]:
470 """Handle tool calls with OpenFGA authorization and tracing"""
471 return await self.call_tool_public(name, arguments)
473 @self.server.list_resources() # type: ignore[no-untyped-call, untyped-decorator]
474 async def list_resources() -> list[Resource]:
475 """List available resources"""
476 with tracer.start_as_current_span("mcp.list_resources"):
477 return [Resource(uri=AnyUrl("agent://config"), name="Agent Configuration", mimeType="application/json")]
479 async def _handle_chat(self, arguments: dict[str, Any], span: Any, user_id: str) -> list[TextContent]:
480 """
481 Handle agent_chat tool invocation.
483 Implements Anthropic best practices:
484 - Response format control (concise vs detailed)
485 - Token-efficient responses with truncation
486 - Clear error messages
487 - Performance tracking
488 """
489 with tracer.start_as_current_span("agent.chat"):
490 # BUGFIX: Validate input with Pydantic schema to enforce length limits and required fields
491 try:
492 chat_input = ChatInput.model_validate(arguments)
493 except Exception as e:
494 logger.error(f"Invalid chat input: {e}", extra={"arguments": arguments})
495 msg = f"Invalid chat input: {e}"
496 raise ValueError(msg)
498 message = chat_input.message
499 thread_id = chat_input.thread_id or "default"
500 response_format_type = chat_input.response_format
502 span.set_attribute("message.length", len(message))
503 span.set_attribute("thread.id", thread_id)
504 span.set_attribute("user.id", user_id)
505 span.set_attribute("response.format", response_format_type)
507 # Check if user can access this conversation
508 # BUGFIX: Allow first-time conversation creation without pre-existing OpenFGA tuples
509 # For new conversations, we short-circuit authorization and will seed ownership after creation
510 conversation_resource = f"conversation:{thread_id}"
512 # Check if conversation exists by trying to get state from checkpointer
513 graph = get_agent_graph() # type: ignore[func-returns-value]
514 conversation_exists = False
515 if hasattr(graph, "checkpointer") and graph.checkpointer is not None: 515 ↛ 525line 515 didn't jump to line 525 because the condition on line 515 was always true
516 try:
517 config = {"configurable": {"thread_id": thread_id}}
518 state_snapshot = await graph.aget_state(config)
519 conversation_exists = state_snapshot is not None and state_snapshot.values is not None
520 except Exception:
521 # If we can't check, assume it doesn't exist (fail-open for creation)
522 conversation_exists = False
524 # Only check authorization for existing conversations
525 if conversation_exists:
526 can_edit = await self.auth.authorize(user_id=user_id, relation="editor", resource=conversation_resource)
527 if not can_edit:
528 logger.warning("User cannot edit conversation", extra={"user_id": user_id, "thread_id": thread_id})
529 msg = (
530 f"Not authorized to edit conversation '{thread_id}'. "
531 f"Request access from conversation owner or use a different thread_id."
532 )
533 raise PermissionError(msg)
534 else:
535 # New conversation - user becomes implicit owner (OpenFGA tuples should be seeded after creation)
536 logger.info(
537 "Creating new conversation, user granted implicit ownership",
538 extra={"user_id": user_id, "thread_id": thread_id},
539 )
541 logger.info(
542 "Processing chat message",
543 extra={
544 "thread_id": thread_id,
545 "user_id": user_id,
546 "message_preview": message[:100],
547 "response_format": response_format_type,
548 },
549 )
551 # Create initial state with proper LangChain message objects
552 initial_state: AgentState = {
553 "messages": [HumanMessage(content=message)], # Use HumanMessage, not dict
554 "next_action": "",
555 "user_id": user_id,
556 "request_id": str(span.get_span_context().trace_id) if span.get_span_context() else None,
557 "routing_confidence": None,
558 "reasoning": None,
559 "compaction_applied": None,
560 "original_message_count": None,
561 "verification_passed": None,
562 "verification_score": None,
563 "verification_feedback": None,
564 "refinement_attempts": None,
565 "user_request": message,
566 }
568 # Run the agent graph
569 config = {"configurable": {"thread_id": thread_id}}
571 try:
572 result = await get_agent_graph().ainvoke(initial_state, config) # type: ignore[func-returns-value]
574 # Extract response
575 response_message = result["messages"][-1]
576 response_text = response_message.content
578 # Apply response formatting based on format type
579 # Follows Anthropic guidance: offer response_format enum parameter
580 formatted_response = format_response(response_text, format_type=response_format_type)
582 span.set_attribute("response.length.original", len(response_text))
583 span.set_attribute("response.length.formatted", len(formatted_response))
584 metrics.successful_calls.add(1, {"tool": "agent_chat", "format": response_format_type})
586 logger.info(
587 "Chat response generated",
588 extra={
589 "thread_id": thread_id,
590 "original_length": len(response_text),
591 "formatted_length": len(formatted_response),
592 "format": response_format_type,
593 },
594 )
596 # Record conversation metadata in store for search functionality
597 try:
598 from mcp_server_langgraph.core.storage.conversation_store import get_conversation_store
600 store = get_conversation_store()
601 # Count messages in result
602 message_count = len(result.get("messages", []))
603 # Extract title from first few words of user message
604 title = message[:50] + "..." if len(message) > 50 else message
606 await store.record_conversation(
607 thread_id=thread_id, user_id=user_id, message_count=message_count, title=title
608 )
610 logger.debug(f"Recorded conversation metadata for {thread_id}")
611 except Exception as e:
612 # Non-critical - don't fail the request
613 logger.debug(f"Failed to record conversation metadata: {e}")
615 return [TextContent(type="text", text=formatted_response)]
617 except Exception as e:
618 logger.error(f"Error processing chat: {e}", extra={"error": str(e), "thread_id": thread_id}, exc_info=True)
619 metrics.failed_calls.add(1, {"tool": "agent_chat", "error": type(e).__name__})
620 span.record_exception(e)
621 raise
623 async def _handle_get_conversation(self, arguments: dict[str, Any], span: Any, user_id: str) -> list[TextContent]:
624 """Retrieve conversation history from checkpointer"""
625 with tracer.start_as_current_span("agent.get_conversation"):
626 thread_id = arguments["thread_id"]
628 # Check if user can view this conversation
629 conversation_resource = f"conversation:{thread_id}"
631 can_view = await self.auth.authorize(user_id=user_id, relation="viewer", resource=conversation_resource)
633 if not can_view:
634 logger.warning("User cannot view conversation", extra={"user_id": user_id, "thread_id": thread_id})
635 msg = f"Not authorized to view conversation {thread_id}"
636 raise PermissionError(msg)
638 # Retrieve conversation state from checkpointer
639 try:
640 # Get the checkpointer from agent_graph
641 graph = get_agent_graph() # type: ignore[func-returns-value]
642 if not hasattr(graph, "checkpointer") or graph.checkpointer is None:
643 logger.warning("Checkpointing not enabled, cannot retrieve conversation history")
644 return [
645 TextContent(
646 type="text",
647 text=f"Conversation history not available for thread {thread_id}. "
648 "Checkpointing is disabled. Enable it by setting ENABLE_CHECKPOINTING=true.",
649 )
650 ]
652 # Get state from checkpointer
653 config = {"configurable": {"thread_id": thread_id}}
654 state_snapshot = await graph.aget_state(config)
656 if not state_snapshot or not state_snapshot.values:
657 logger.info("No conversation history found", extra={"thread_id": thread_id})
658 return [
659 TextContent(
660 type="text",
661 text=f"No conversation history found for thread {thread_id}. "
662 "This thread may not exist or has no messages yet.",
663 )
664 ]
666 # Extract messages from state
667 messages = state_snapshot.values.get("messages", [])
669 if not messages:
670 return [
671 TextContent(
672 type="text",
673 text=f"Thread {thread_id} exists but has no messages yet.",
674 )
675 ]
677 # Format messages for display
678 formatted_messages = []
679 for i, msg in enumerate(messages, 1):
680 role = "unknown"
681 content = str(msg)
683 if hasattr(msg, "type"): 683 ↛ 685line 683 didn't jump to line 685 because the condition on line 683 was always true
684 role = msg.type
685 elif hasattr(msg, "__class__"):
686 role = msg.__class__.__name__.replace("Message", "").lower()
688 if hasattr(msg, "content"): 688 ↛ 691line 688 didn't jump to line 691 because the condition on line 688 was always true
689 content = msg.content
691 formatted_messages.append(f"{i}. [{role}] {content[:200]}{'...' if len(content) > 200 else ''}")
693 # Build response
694 response_text = (
695 f"Conversation history for thread {thread_id}\n"
696 f"Total messages: {len(messages)}\n"
697 f"User: {user_id}\n\n"
698 f"Messages:\n" + "\n".join(formatted_messages)
699 )
701 logger.info(
702 "Retrieved conversation history",
703 extra={"thread_id": thread_id, "message_count": len(messages), "user_id": user_id},
704 )
706 return [TextContent(type="text", text=response_text)]
708 except Exception as e:
709 logger.error(f"Failed to retrieve conversation: {e}", extra={"thread_id": thread_id}, exc_info=True)
710 return [
711 TextContent(
712 type="text",
713 text=f"Error retrieving conversation {thread_id}: {e!s}. "
714 "This may indicate a checkpointer issue or the conversation may not exist.",
715 )
716 ]
718 async def _handle_search_conversations(self, arguments: dict[str, Any], span: Any, user_id: str) -> list[TextContent]:
719 """
720 Search conversations (replacing list-all approach).
722 Implements Anthropic best practice:
723 "Implement search-focused tools (like search_contacts) rather than
724 list-all tools (list_contacts)"
726 Benefits:
727 - Prevents context overflow with large conversation lists
728 - Forces agents to be specific in their requests
729 - More token-efficient
730 - Better for users with many conversations
731 """
732 with tracer.start_as_current_span("agent.search_conversations"):
733 # BUGFIX: Validate input with Pydantic schema to enforce query length and limit constraints
734 try:
735 search_input = SearchConversationsInput.model_validate(arguments)
736 except Exception as e:
737 logger.error(f"Invalid search input: {e}", extra={"arguments": arguments})
738 msg = f"Invalid search input: {e}"
739 raise ValueError(msg)
741 query = search_input.query
742 limit = search_input.limit
744 span.set_attribute("search.query", query)
745 span.set_attribute("search.limit", limit)
747 # Initialize all_conversations for logging
748 all_conversations = []
750 # Try to get conversations from OpenFGA first, fall back to conversation store
751 try:
752 # Get all conversations user can view from OpenFGA
753 all_conversations = await self.auth.list_accessible_resources(
754 user_id=user_id, relation="viewer", resource_type="conversation"
755 )
757 # Filter conversations based on query
758 # Normalize query and conversation names to handle spaces/underscores/hyphens
759 if query:
760 normalized_query = query.lower().replace(" ", "_").replace("-", "_")
761 filtered_conversations = [
762 conv
763 for conv in all_conversations
764 if (
765 query.lower() in conv.lower()
766 or normalized_query in conv.lower().replace(" ", "_").replace("-", "_")
767 )
768 ]
769 else:
770 filtered_conversations = all_conversations
772 except Exception:
773 # Fall back to conversation store
774 logger.info("Using conversation store for search (OpenFGA unavailable or returning mock data)")
776 try:
777 from mcp_server_langgraph.core.storage.conversation_store import get_conversation_store
779 store = get_conversation_store()
780 metadata_list = await store.search_conversations(user_id=user_id, query=query, limit=limit)
782 # Convert metadata to conversation IDs
783 filtered_conversations = [f"conversation:{m.thread_id}" for m in metadata_list]
785 except Exception as e:
786 logger.warning(f"Conversation store also unavailable: {e}")
787 # Ultimate fallback: empty list
788 filtered_conversations = []
790 # Apply limit to prevent context overflow
791 # Follows Anthropic guidance: "Restrict responses to ~25,000 tokens"
792 limited_conversations = filtered_conversations[:limit]
794 # Build response with high-signal information
795 # Avoid technical IDs where possible
796 if not limited_conversations:
797 response_text = (
798 f"No conversations found matching '{query}'. "
799 f"Try a different search query or request access to more conversations."
800 )
801 else:
802 response_lines = [
803 (
804 f"Found {len(limited_conversations)} conversation(s) matching '{query}':"
805 if query
806 else f"Showing {len(limited_conversations)} recent conversation(s):"
807 )
808 ]
810 for i, conv_id in enumerate(limited_conversations, 1):
811 # Extract human-readable info from conversation ID
812 # In production, fetch metadata like title, date, participants
813 response_lines.append(f"{i}. {conv_id}")
815 # Add guidance if results were truncated
816 if len(filtered_conversations) > limit:
817 response_lines.append(
818 f"\n[Showing {limit} of {len(filtered_conversations)} results. "
819 f"Use a more specific query to narrow results.]"
820 )
822 response_text = "\n".join(response_lines)
824 logger.info(
825 "Searched conversations",
826 extra={
827 "user_id": user_id,
828 "query": query,
829 "total_accessible": len(all_conversations),
830 "filtered_count": len(filtered_conversations),
831 "returned_count": len(limited_conversations),
832 },
833 )
835 return [TextContent(type="text", text=response_text)]
837 async def _handle_search_tools(self, arguments: dict[str, Any], span: Any) -> list[TextContent]:
838 """
839 Handle search_tools invocation for progressive tool discovery.
841 Implements Anthropic best practice for token-efficient tool discovery.
842 """
843 with tracer.start_as_current_span("tools.search"):
844 from mcp_server_langgraph.tools.tool_discovery import search_tools
846 # Extract arguments
847 query = arguments.get("query")
848 category = arguments.get("category")
849 detail_level = arguments.get("detail_level", "minimal")
851 logger.info(
852 "Searching tools",
853 extra={"query": query, "category": category, "detail_level": detail_level},
854 )
856 # Execute search_tools
857 result = search_tools.invoke(
858 {
859 "query": query,
860 "category": category,
861 "detail_level": detail_level,
862 }
863 )
865 span.set_attribute("tools.query", query or "")
866 span.set_attribute("tools.category", category or "")
867 span.set_attribute("tools.detail_level", detail_level)
869 return [TextContent(type="text", text=result)]
871 async def _handle_execute_python(self, arguments: dict[str, Any], span: Any, user_id: str) -> list[TextContent]:
872 """
873 Handle execute_python invocation for secure code execution.
875 Implements sandboxed Python execution with validation and resource limits.
876 """
877 with tracer.start_as_current_span("code.execute"):
878 from mcp_server_langgraph.tools.code_execution_tools import execute_python
880 # Extract arguments
881 code = arguments.get("code", "")
882 timeout = arguments.get("timeout")
884 logger.info(
885 "Executing Python code",
886 extra={
887 "user_id": user_id,
888 "code_length": len(code),
889 "timeout": timeout,
890 },
891 )
893 # Execute code
894 start_time = time.time()
895 result = execute_python.invoke({"code": code, "timeout": timeout})
896 execution_time = time.time() - start_time
898 span.set_attribute("code.length", len(code))
899 span.set_attribute("code.execution_time", execution_time)
900 span.set_attribute("code.success", "success" in result.lower())
902 metrics.code_executions.add(1, {"user_id": user_id, "success": "success" in result.lower()})
904 return [TextContent(type="text", text=result)]
906 async def run(self) -> None:
907 """Run the MCP server"""
908 logger.info("Starting MCP Agent Server")
909 async with stdio_server() as (read_stream, write_stream):
910 await self.server.run(read_stream, write_stream, self.server.create_initialization_options())
913async def main() -> None:
914 """Main entry point"""
915 # Initialize observability system before creating server
916 from mcp_server_langgraph.core.config import settings
917 from mcp_server_langgraph.observability.telemetry import init_observability
919 # Initialize with settings and enable file logging if configured
920 init_observability(settings=settings, enable_file_logging=getattr(settings, "enable_file_logging", False))
922 server = MCPAgentServer()
923 await server.run()
926if __name__ == "__main__":
927 asyncio.run(main())