Coverage for src / mcp_server_langgraph / tools / filesystem_tools.py: 79%
130 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Filesystem tools for file operations
4Provides READ-ONLY file system access for the agent.
5All tools are restricted to safe operations for security.
6"""
8from pathlib import Path
9from typing import Annotated
11from langchain_core.tools import tool
12from pydantic import Field
14from mcp_server_langgraph.observability.telemetry import logger, metrics
16# Maximum file size to read (1MB for safety)
17MAX_FILE_SIZE = 1024 * 1024
19# Allowed file extensions (read-only, safe formats)
20SAFE_EXTENSIONS = {".txt", ".md", ".json", ".yaml", ".yml", ".log", ".csv", ".xml", ".html", ".py", ".js", ".ts"}
23def _is_safe_path(path: str) -> bool:
24 """
25 Check if path is safe to access.
27 Args:
28 path: File or directory path
30 Returns:
31 True if path is safe, False otherwise
32 """
33 try:
34 # Resolve to absolute path and check it exists
35 abs_path = Path(path).resolve()
37 # Block access to system directories
38 dangerous_paths = [
39 "/etc",
40 "/sys",
41 "/proc",
42 "/dev",
43 "/boot",
44 "/root",
45 Path.home() / ".ssh",
46 Path.home() / ".aws",
47 ]
49 for dangerous in dangerous_paths:
50 dangerous_path = Path(dangerous) if isinstance(dangerous, str) else dangerous
51 # Resolve dangerous path too (handles macOS /etc -> /private/etc symlinks)
52 dangerous_path_resolved = dangerous_path.resolve() # type: ignore[attr-defined]
53 if abs_path.is_relative_to(dangerous_path_resolved):
54 return False
56 return True
58 except Exception:
59 return False
62@tool
63def read_file(
64 file_path: Annotated[str, Field(description="Path to file to read")],
65 max_bytes: Annotated[int, Field(ge=100, le=MAX_FILE_SIZE, description="Maximum bytes to read (100-1048576)")] = 10000,
66) -> str:
67 """
68 Read contents of a text file.
70 Supports: .txt, .md, .json, .yaml, .yml, .log, .csv, .xml, .html, .py, .js, .ts
71 Maximum file size: 1MB for safety.
73 Use this to:
74 - Read configuration files
75 - View log files
76 - Inspect code or documentation
78 SECURITY: Read-only access, blocks system directories.
79 """
80 try:
81 logger.info("Read file tool invoked", extra={"file_path": file_path})
82 metrics.tool_calls.add(1, {"tool": "read_file"})
84 # Validate path safety
85 if not _is_safe_path(file_path):
86 return f"Error: Access denied - path '{file_path}' is not safe to read"
88 path = Path(file_path).resolve()
90 # Check file exists
91 if not path.exists():
92 return f"Error: File '{file_path}' does not exist"
94 if not path.is_file():
95 return f"Error: Path '{file_path}' is not a file"
97 # Check file extension
98 if path.suffix.lower() not in SAFE_EXTENSIONS:
99 return f"Error: File type '{path.suffix}' not allowed. Allowed: {', '.join(SAFE_EXTENSIONS)}"
101 # Check file size
102 file_size = path.stat().st_size
103 if file_size > MAX_FILE_SIZE: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 return f"Error: File too large ({file_size} bytes). Maximum: {MAX_FILE_SIZE} bytes"
106 # Read file (up to max_bytes)
107 with open(path, encoding="utf-8", errors="replace") as f:
108 content = f.read(max_bytes)
110 truncated = len(content) >= max_bytes
111 result = f"File: {file_path}\nSize: {file_size} bytes\n"
112 if truncated:
113 result += f"Content (first {max_bytes} bytes):\n"
114 else:
115 result += "Content:\n"
116 result += "-" * 40 + "\n"
117 result += content
118 if truncated:
119 result += f"\n\n[... truncated at {max_bytes} bytes ...]"
121 logger.info("File read successfully", extra={"file_path": file_path, "bytes_read": len(content)})
122 return result
124 except Exception as e:
125 error_msg = f"Error reading file '{file_path}': {e}"
126 logger.error(error_msg, exc_info=True)
127 return f"Error: {e}"
130@tool
131def list_directory(
132 directory_path: Annotated[str, Field(description="Path to directory to list")],
133 show_hidden: Annotated[bool, Field(description="Whether to show hidden files (starting with .)")] = False,
134) -> str:
135 """
136 List contents of a directory.
138 Returns file and directory names with types and sizes.
140 Use this to:
141 - Explore directory structure
142 - Find available files
143 - Check what exists before reading
145 SECURITY: Read-only access, blocks system directories.
146 """
147 try:
148 logger.info("List directory tool invoked", extra={"directory_path": directory_path})
149 metrics.tool_calls.add(1, {"tool": "list_directory"})
151 # Validate path safety
152 if not _is_safe_path(directory_path):
153 return f"Error: Access denied - path '{directory_path}' is not safe to access"
155 path = Path(directory_path).resolve()
157 # Check directory exists
158 if not path.exists():
159 return f"Error: Directory '{directory_path}' does not exist"
161 if not path.is_dir():
162 return f"Error: Path '{directory_path}' is not a directory"
164 # List contents
165 items = []
166 for item in sorted(path.iterdir()):
167 # Skip hidden files unless requested
168 if not show_hidden and item.name.startswith("."):
169 continue
171 item_type = "DIR" if item.is_dir() else "FILE"
172 size = ""
173 if item.is_file():
174 try:
175 file_size = item.stat().st_size
176 if file_size < 1024: 176 ↛ 178line 176 didn't jump to line 178 because the condition on line 176 was always true
177 size = f" ({file_size} B)"
178 elif file_size < 1024 * 1024:
179 size = f" ({file_size / 1024:.1f} KB)"
180 else:
181 size = f" ({file_size / (1024 * 1024):.1f} MB)"
182 except Exception:
183 size = " (size unknown)"
185 items.append(f" [{item_type}] {item.name}{size}")
187 result = f"Directory: {directory_path}\n"
188 result += f"Items: {len(items)}\n"
189 result += "-" * 40 + "\n"
190 result += "\n".join(items) if items else " (empty)"
192 logger.info("Directory listed successfully", extra={"directory_path": directory_path, "item_count": len(items)})
193 return result
195 except Exception as e:
196 error_msg = f"Error listing directory '{directory_path}': {e}"
197 logger.error(error_msg, exc_info=True)
198 return f"Error: {e}"
201@tool
202def search_files(
203 directory_path: Annotated[str, Field(description="Directory to search in")],
204 pattern: Annotated[str, Field(description="Filename pattern to search for (e.g., '*.py', 'config.yaml')")],
205 max_results: Annotated[int, Field(ge=1, le=100, description="Maximum number of results (1-100)")] = 20,
206) -> str:
207 """
208 Search for files matching a pattern in a directory (recursive).
210 Supports wildcards: * (any characters), ? (single character)
212 Use this to:
213 - Find files by name or extension
214 - Locate configuration files
215 - Search for specific file patterns
217 SECURITY: Read-only access, blocks system directories.
218 """
219 try:
220 logger.info("Search files tool invoked", extra={"directory_path": directory_path, "pattern": pattern})
221 metrics.tool_calls.add(1, {"tool": "search_files"})
223 # Validate path safety
224 if not _is_safe_path(directory_path): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true
225 return f"Error: Access denied - path '{directory_path}' is not safe to access"
227 path = Path(directory_path).resolve()
229 # Check directory exists
230 if not path.exists(): 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 return f"Error: Directory '{directory_path}' does not exist"
233 if not path.is_dir(): 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true
234 return f"Error: Path '{directory_path}' is not a directory"
236 # Search for files
237 matches = []
238 for match in path.rglob(pattern):
239 if match.is_file(): 239 ↛ 238line 239 didn't jump to line 238 because the condition on line 239 was always true
240 # Check each match is in a safe location
241 if _is_safe_path(str(match)): 241 ↛ 238line 241 didn't jump to line 238 because the condition on line 241 was always true
242 rel_path = match.relative_to(path)
243 file_size = match.stat().st_size
244 if file_size < 1024: 244 ↛ 246line 244 didn't jump to line 246 because the condition on line 244 was always true
245 size = f"{file_size} B"
246 elif file_size < 1024 * 1024:
247 size = f"{file_size / 1024:.1f} KB"
248 else:
249 size = f"{file_size / (1024 * 1024):.1f} MB"
251 matches.append(f" {rel_path} ({size})")
253 if len(matches) >= max_results:
254 break
256 result = f"Search: {pattern} in {directory_path}\n"
257 result += f"Found: {len(matches)} files\n"
258 result += "-" * 40 + "\n"
259 result += "\n".join(matches) if matches else " (no matches)"
261 if len(matches) >= max_results:
262 result += f"\n\n[... limited to {max_results} results ...]"
264 logger.info(
265 "File search completed", extra={"directory_path": directory_path, "pattern": pattern, "matches": len(matches)}
266 )
267 return result
269 except Exception as e:
270 error_msg = f"Error searching files in '{directory_path}': {e}"
271 logger.error(error_msg, exc_info=True)
272 return f"Error: {e}"