Coverage for src / mcp_server_langgraph / tools / filesystem_tools.py: 79%

130 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Filesystem tools for file operations 

3 

4Provides READ-ONLY file system access for the agent. 

5All tools are restricted to safe operations for security. 

6""" 

7 

8from pathlib import Path 

9from typing import Annotated 

10 

11from langchain_core.tools import tool 

12from pydantic import Field 

13 

14from mcp_server_langgraph.observability.telemetry import logger, metrics 

15 

16# Maximum file size to read (1MB for safety) 

17MAX_FILE_SIZE = 1024 * 1024 

18 

19# Allowed file extensions (read-only, safe formats) 

20SAFE_EXTENSIONS = {".txt", ".md", ".json", ".yaml", ".yml", ".log", ".csv", ".xml", ".html", ".py", ".js", ".ts"} 

21 

22 

23def _is_safe_path(path: str) -> bool: 

24 """ 

25 Check if path is safe to access. 

26 

27 Args: 

28 path: File or directory path 

29 

30 Returns: 

31 True if path is safe, False otherwise 

32 """ 

33 try: 

34 # Resolve to absolute path and check it exists 

35 abs_path = Path(path).resolve() 

36 

37 # Block access to system directories 

38 dangerous_paths = [ 

39 "/etc", 

40 "/sys", 

41 "/proc", 

42 "/dev", 

43 "/boot", 

44 "/root", 

45 Path.home() / ".ssh", 

46 Path.home() / ".aws", 

47 ] 

48 

49 for dangerous in dangerous_paths: 

50 dangerous_path = Path(dangerous) if isinstance(dangerous, str) else dangerous 

51 # Resolve dangerous path too (handles macOS /etc -> /private/etc symlinks) 

52 dangerous_path_resolved = dangerous_path.resolve() # type: ignore[attr-defined] 

53 if abs_path.is_relative_to(dangerous_path_resolved): 

54 return False 

55 

56 return True 

57 

58 except Exception: 

59 return False 

60 

61 

62@tool 

63def read_file( 

64 file_path: Annotated[str, Field(description="Path to file to read")], 

65 max_bytes: Annotated[int, Field(ge=100, le=MAX_FILE_SIZE, description="Maximum bytes to read (100-1048576)")] = 10000, 

66) -> str: 

67 """ 

68 Read contents of a text file. 

69 

70 Supports: .txt, .md, .json, .yaml, .yml, .log, .csv, .xml, .html, .py, .js, .ts 

71 Maximum file size: 1MB for safety. 

72 

73 Use this to: 

74 - Read configuration files 

75 - View log files 

76 - Inspect code or documentation 

77 

78 SECURITY: Read-only access, blocks system directories. 

79 """ 

80 try: 

81 logger.info("Read file tool invoked", extra={"file_path": file_path}) 

82 metrics.tool_calls.add(1, {"tool": "read_file"}) 

83 

84 # Validate path safety 

85 if not _is_safe_path(file_path): 

86 return f"Error: Access denied - path '{file_path}' is not safe to read" 

87 

88 path = Path(file_path).resolve() 

89 

90 # Check file exists 

91 if not path.exists(): 

92 return f"Error: File '{file_path}' does not exist" 

93 

94 if not path.is_file(): 

95 return f"Error: Path '{file_path}' is not a file" 

96 

97 # Check file extension 

98 if path.suffix.lower() not in SAFE_EXTENSIONS: 

99 return f"Error: File type '{path.suffix}' not allowed. Allowed: {', '.join(SAFE_EXTENSIONS)}" 

100 

101 # Check file size 

102 file_size = path.stat().st_size 

103 if file_size > MAX_FILE_SIZE: 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 return f"Error: File too large ({file_size} bytes). Maximum: {MAX_FILE_SIZE} bytes" 

105 

106 # Read file (up to max_bytes) 

107 with open(path, encoding="utf-8", errors="replace") as f: 

108 content = f.read(max_bytes) 

109 

110 truncated = len(content) >= max_bytes 

111 result = f"File: {file_path}\nSize: {file_size} bytes\n" 

112 if truncated: 

113 result += f"Content (first {max_bytes} bytes):\n" 

114 else: 

115 result += "Content:\n" 

116 result += "-" * 40 + "\n" 

117 result += content 

118 if truncated: 

119 result += f"\n\n[... truncated at {max_bytes} bytes ...]" 

120 

121 logger.info("File read successfully", extra={"file_path": file_path, "bytes_read": len(content)}) 

122 return result 

123 

124 except Exception as e: 

125 error_msg = f"Error reading file '{file_path}': {e}" 

126 logger.error(error_msg, exc_info=True) 

127 return f"Error: {e}" 

128 

129 

130@tool 

131def list_directory( 

132 directory_path: Annotated[str, Field(description="Path to directory to list")], 

133 show_hidden: Annotated[bool, Field(description="Whether to show hidden files (starting with .)")] = False, 

134) -> str: 

135 """ 

136 List contents of a directory. 

137 

138 Returns file and directory names with types and sizes. 

139 

140 Use this to: 

141 - Explore directory structure 

142 - Find available files 

143 - Check what exists before reading 

144 

145 SECURITY: Read-only access, blocks system directories. 

146 """ 

147 try: 

148 logger.info("List directory tool invoked", extra={"directory_path": directory_path}) 

149 metrics.tool_calls.add(1, {"tool": "list_directory"}) 

150 

151 # Validate path safety 

152 if not _is_safe_path(directory_path): 

153 return f"Error: Access denied - path '{directory_path}' is not safe to access" 

154 

155 path = Path(directory_path).resolve() 

156 

157 # Check directory exists 

158 if not path.exists(): 

159 return f"Error: Directory '{directory_path}' does not exist" 

160 

161 if not path.is_dir(): 

162 return f"Error: Path '{directory_path}' is not a directory" 

163 

164 # List contents 

165 items = [] 

166 for item in sorted(path.iterdir()): 

167 # Skip hidden files unless requested 

168 if not show_hidden and item.name.startswith("."): 

169 continue 

170 

171 item_type = "DIR" if item.is_dir() else "FILE" 

172 size = "" 

173 if item.is_file(): 

174 try: 

175 file_size = item.stat().st_size 

176 if file_size < 1024: 176 ↛ 178line 176 didn't jump to line 178 because the condition on line 176 was always true

177 size = f" ({file_size} B)" 

178 elif file_size < 1024 * 1024: 

179 size = f" ({file_size / 1024:.1f} KB)" 

180 else: 

181 size = f" ({file_size / (1024 * 1024):.1f} MB)" 

182 except Exception: 

183 size = " (size unknown)" 

184 

185 items.append(f" [{item_type}] {item.name}{size}") 

186 

187 result = f"Directory: {directory_path}\n" 

188 result += f"Items: {len(items)}\n" 

189 result += "-" * 40 + "\n" 

190 result += "\n".join(items) if items else " (empty)" 

191 

192 logger.info("Directory listed successfully", extra={"directory_path": directory_path, "item_count": len(items)}) 

193 return result 

194 

195 except Exception as e: 

196 error_msg = f"Error listing directory '{directory_path}': {e}" 

197 logger.error(error_msg, exc_info=True) 

198 return f"Error: {e}" 

199 

200 

201@tool 

202def search_files( 

203 directory_path: Annotated[str, Field(description="Directory to search in")], 

204 pattern: Annotated[str, Field(description="Filename pattern to search for (e.g., '*.py', 'config.yaml')")], 

205 max_results: Annotated[int, Field(ge=1, le=100, description="Maximum number of results (1-100)")] = 20, 

206) -> str: 

207 """ 

208 Search for files matching a pattern in a directory (recursive). 

209 

210 Supports wildcards: * (any characters), ? (single character) 

211 

212 Use this to: 

213 - Find files by name or extension 

214 - Locate configuration files 

215 - Search for specific file patterns 

216 

217 SECURITY: Read-only access, blocks system directories. 

218 """ 

219 try: 

220 logger.info("Search files tool invoked", extra={"directory_path": directory_path, "pattern": pattern}) 

221 metrics.tool_calls.add(1, {"tool": "search_files"}) 

222 

223 # Validate path safety 

224 if not _is_safe_path(directory_path): 224 ↛ 225line 224 didn't jump to line 225 because the condition on line 224 was never true

225 return f"Error: Access denied - path '{directory_path}' is not safe to access" 

226 

227 path = Path(directory_path).resolve() 

228 

229 # Check directory exists 

230 if not path.exists(): 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 return f"Error: Directory '{directory_path}' does not exist" 

232 

233 if not path.is_dir(): 233 ↛ 234line 233 didn't jump to line 234 because the condition on line 233 was never true

234 return f"Error: Path '{directory_path}' is not a directory" 

235 

236 # Search for files 

237 matches = [] 

238 for match in path.rglob(pattern): 

239 if match.is_file(): 239 ↛ 238line 239 didn't jump to line 238 because the condition on line 239 was always true

240 # Check each match is in a safe location 

241 if _is_safe_path(str(match)): 241 ↛ 238line 241 didn't jump to line 238 because the condition on line 241 was always true

242 rel_path = match.relative_to(path) 

243 file_size = match.stat().st_size 

244 if file_size < 1024: 244 ↛ 246line 244 didn't jump to line 246 because the condition on line 244 was always true

245 size = f"{file_size} B" 

246 elif file_size < 1024 * 1024: 

247 size = f"{file_size / 1024:.1f} KB" 

248 else: 

249 size = f"{file_size / (1024 * 1024):.1f} MB" 

250 

251 matches.append(f" {rel_path} ({size})") 

252 

253 if len(matches) >= max_results: 

254 break 

255 

256 result = f"Search: {pattern} in {directory_path}\n" 

257 result += f"Found: {len(matches)} files\n" 

258 result += "-" * 40 + "\n" 

259 result += "\n".join(matches) if matches else " (no matches)" 

260 

261 if len(matches) >= max_results: 

262 result += f"\n\n[... limited to {max_results} results ...]" 

263 

264 logger.info( 

265 "File search completed", extra={"directory_path": directory_path, "pattern": pattern, "matches": len(matches)} 

266 ) 

267 return result 

268 

269 except Exception as e: 

270 error_msg = f"Error searching files in '{directory_path}': {e}" 

271 logger.error(error_msg, exc_info=True) 

272 return f"Error: {e}"