Coverage for src / mcp_server_langgraph / execution / kubernetes_sandbox.py: 11%

121 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Kubernetes-based sandbox for code execution 

3 

4Provides secure isolated Python code execution using Kubernetes Jobs. 

5Supports resource limits, automatic cleanup with TTL, and pod security policies. 

6""" 

7 

8import logging 

9import time 

10 

11from kubernetes import client, config 

12from kubernetes.client.rest import ApiException 

13 

14from mcp_server_langgraph.execution.resource_limits import ResourceLimits 

15from mcp_server_langgraph.execution.sandbox import ExecutionResult, Sandbox, SandboxError 

16import contextlib 

17 

18logger = logging.getLogger(__name__) 

19 

20 

21class KubernetesSandbox(Sandbox): 

22 """ 

23 Kubernetes-based sandbox for executing Python code in isolated pods. 

24 

25 Features: 

26 - Ephemeral Jobs (created and destroyed per execution) 

27 - Resource limits (CPU, memory, timeout) 

28 - Pod security policies 

29 - Automatic cleanup with TTL 

30 - Read-only root filesystem 

31 - No privilege escalation 

32 

33 Example: 

34 >>> limits = ResourceLimits(timeout_seconds=30, memory_limit_mb=512) 

35 >>> sandbox = KubernetesSandbox(limits=limits, namespace="default") 

36 >>> result = sandbox.execute("print('Hello')") 

37 >>> assert result.success 

38 >>> assert "Hello" in result.stdout 

39 """ 

40 

41 def __init__( 

42 self, 

43 limits: ResourceLimits, 

44 namespace: str = "default", 

45 image: str = "python:3.12-slim", 

46 job_ttl: int = 300, # TTL in seconds for job cleanup 

47 ): 

48 """ 

49 Initialize Kubernetes sandbox. 

50 

51 Args: 

52 limits: Resource limits to enforce 

53 namespace: Kubernetes namespace for jobs 

54 image: Container image to use 

55 job_ttl: Time to live for completed jobs (seconds) 

56 

57 Raises: 

58 SandboxError: If Kubernetes is not available 

59 """ 

60 super().__init__(limits) 

61 self.namespace = namespace 

62 self.image = image 

63 self.job_ttl = job_ttl 

64 

65 try: 

66 # Load kubeconfig (in-cluster or from ~/.kube/config) 

67 try: 

68 config.load_incluster_config() # Try in-cluster first 

69 logger.debug("Loaded in-cluster Kubernetes config") 

70 except config.ConfigException: 

71 config.load_kube_config() # Fall back to kubeconfig file 

72 logger.debug("Loaded Kubernetes config from kubeconfig file") 

73 

74 self.batch_v1 = client.BatchV1Api() 

75 self.core_v1 = client.CoreV1Api() 

76 

77 # Verify namespace exists 

78 self._verify_namespace() 

79 

80 except Exception as e: 

81 msg = f"Kubernetes not available: {e}" 

82 raise SandboxError(msg) 

83 

84 def _verify_namespace(self) -> None: 

85 """Verify that the namespace exists""" 

86 try: 

87 self.core_v1.read_namespace(name=self.namespace) 

88 except ApiException as e: 

89 if e.status == 404: 

90 msg = f"Namespace '{self.namespace}' does not exist" 

91 raise SandboxError(msg) 

92 msg = f"Failed to verify namespace: {e}" 

93 raise SandboxError(msg) 

94 

95 def execute(self, code: str) -> ExecutionResult: 

96 """ 

97 Execute Python code in a Kubernetes Job. 

98 

99 Args: 

100 code: Python source code to execute 

101 

102 Returns: 

103 ExecutionResult with execution status and outputs 

104 

105 Raises: 

106 SandboxError: If job creation or execution fails 

107 """ 

108 if not code or not code.strip(): 

109 return self._create_failure_result( 

110 stdout="", 

111 stderr="Error: Empty code provided", 

112 exit_code=1, 

113 execution_time=0.0, 

114 error_message="Empty code provided", 

115 ) 

116 

117 job_name = None 

118 start_time = time.time() 

119 

120 try: 

121 # Create job 

122 job_name = self._create_job(code) 

123 

124 # Wait for completion with timeout 

125 timed_out, exit_code = self._wait_for_job(job_name, start_time) 

126 

127 execution_time = self._measure_time(start_time) 

128 

129 # Get logs from pod 

130 stdout, stderr = self._get_job_logs(job_name, exit_code, timed_out) 

131 

132 # Cleanup job (TTL will also clean up, but we can do it immediately) 

133 self._cleanup_job(job_name) 

134 

135 # Create result 

136 if timed_out: 

137 return self._create_failure_result( 

138 stdout=stdout, 

139 stderr=stderr or f"Execution timed out after {self.limits.timeout_seconds}s", 

140 exit_code=exit_code, 

141 execution_time=execution_time, 

142 timed_out=True, 

143 error_message=f"Timeout after {self.limits.timeout_seconds}s", 

144 ) 

145 elif exit_code == 0: 

146 return self._create_success_result( 

147 stdout=stdout, 

148 stderr=stderr, 

149 execution_time=execution_time, 

150 ) 

151 else: 

152 return self._create_failure_result( 

153 stdout=stdout, 

154 stderr=stderr, 

155 exit_code=exit_code, 

156 execution_time=execution_time, 

157 error_message=f"Process exited with code {exit_code}", 

158 ) 

159 

160 except Exception as e: 

161 execution_time = self._measure_time(start_time) 

162 

163 # Cleanup on error 

164 if job_name: 

165 self._cleanup_job(job_name) 

166 

167 logger.error(f"Kubernetes execution failed: {e}", exc_info=True) 

168 msg = f"Kubernetes execution failed: {e}" 

169 raise SandboxError(msg) 

170 

171 def _create_job(self, code: str) -> str: 

172 """ 

173 Create Kubernetes Job for code execution. 

174 

175 Args: 

176 code: Python code to execute 

177 

178 Returns: 

179 Job name 

180 

181 Raises: 

182 SandboxError: If job creation fails 

183 """ 

184 import hashlib 

185 

186 # Generate unique job name 

187 # Use SHA-256 for better security hygiene (even though this is just for naming, not cryptographic security) 

188 code_hash = hashlib.sha256(code.encode()).hexdigest()[:8] 

189 timestamp = int(time.time()) 

190 job_name = f"code-exec-{timestamp}-{code_hash}" 

191 

192 try: 

193 # Configure resource requests and limits 

194 resources = client.V1ResourceRequirements( 

195 requests={ 

196 "cpu": str(self.limits.cpu_quota), 

197 "memory": f"{self.limits.memory_limit_mb}Mi", 

198 }, 

199 limits={ 

200 "cpu": str(self.limits.cpu_quota), 

201 "memory": f"{self.limits.memory_limit_mb}Mi", 

202 }, 

203 ) 

204 

205 # Configure security context 

206 security_context = client.V1SecurityContext( 

207 allow_privilege_escalation=False, 

208 run_as_non_root=True, 

209 run_as_user=1000, # Non-root user 

210 read_only_root_filesystem=False, # Need writable /tmp 

211 capabilities=client.V1Capabilities(drop=["ALL"]), 

212 ) 

213 

214 # Configure container 

215 container = client.V1Container( 

216 name="executor", 

217 image=self.image, 

218 command=["python", "-c", code], 

219 resources=resources, 

220 security_context=security_context, 

221 ) 

222 

223 # Configure pod template 

224 pod_template = client.V1PodTemplateSpec( 

225 metadata=client.V1ObjectMeta(labels={"app": "code-execution"}), 

226 spec=client.V1PodSpec( 

227 containers=[container], 

228 restart_policy="Never", 

229 # Pod security 

230 security_context=client.V1PodSecurityContext( 

231 run_as_non_root=True, 

232 run_as_user=1000, 

233 fs_group=1000, 

234 ), 

235 ), 

236 ) 

237 

238 # Configure job 

239 job = client.V1Job( 

240 api_version="batch/v1", 

241 kind="Job", 

242 metadata=client.V1ObjectMeta(name=job_name), 

243 spec=client.V1JobSpec( 

244 template=pod_template, 

245 backoff_limit=0, # Don't retry on failure 

246 ttl_seconds_after_finished=self.job_ttl, # Auto-cleanup 

247 active_deadline_seconds=self.limits.timeout_seconds, # Timeout 

248 ), 

249 ) 

250 

251 # Create job 

252 self.batch_v1.create_namespaced_job(namespace=self.namespace, body=job) 

253 logger.debug(f"Created Kubernetes job: {job_name}") 

254 

255 return job_name 

256 

257 except Exception as e: 

258 logger.error(f"Failed to create Kubernetes job: {e}", exc_info=True) 

259 msg = f"Failed to create Kubernetes job: {e}" 

260 raise SandboxError(msg) 

261 

262 def _wait_for_job(self, job_name: str, start_time: float) -> tuple[bool, int]: 

263 """ 

264 Wait for job to complete. 

265 

266 Args: 

267 job_name: Name of the job 

268 start_time: Start time for timeout calculation 

269 

270 Returns: 

271 Tuple of (timed_out, exit_code) 

272 """ 

273 timeout = self.limits.timeout_seconds 

274 poll_interval = 1 # Poll every second 

275 

276 while True: 

277 elapsed = time.time() - start_time 

278 if elapsed >= timeout: 

279 # Timeout - delete job 

280 with contextlib.suppress(Exception): 

281 self.batch_v1.delete_namespaced_job( 

282 name=job_name, 

283 namespace=self.namespace, 

284 propagation_policy="Background", 

285 ) 

286 return True, 124 # Timeout exit code 

287 

288 try: 

289 # Check job status 

290 job = self.batch_v1.read_namespaced_job(name=job_name, namespace=self.namespace) 

291 

292 if job.status.succeeded: 

293 return False, 0 

294 elif job.status.failed: 

295 return False, 1 

296 

297 # Job still running, wait and check again 

298 time.sleep(poll_interval) 

299 

300 except ApiException as e: 

301 if e.status == 404: 

302 # Job not found (might have been deleted) 

303 return False, 1 

304 raise 

305 

306 def _get_job_logs(self, job_name: str, exit_code: int, timed_out: bool) -> tuple[str, str]: 

307 """ 

308 Get logs from job pod and separate stdout/stderr based on content. 

309 

310 Kubernetes combines stdout and stderr into a single log stream. 

311 We separate them based on exit code and content patterns to match 

312 the behavior of DockerSandbox and meet test expectations. 

313 

314 Args: 

315 job_name: Name of the job 

316 exit_code: Job exit code (0 for success, non-zero for failure) 

317 timed_out: Whether the job timed out 

318 

319 Returns: 

320 Tuple of (stdout, stderr) 

321 """ 

322 try: 

323 # Find pod for job 

324 pods = self.core_v1.list_namespaced_pod(namespace=self.namespace, label_selector=f"job-name={job_name}") 

325 

326 if not pods.items: 

327 return "", "Error: No pod found for job" 

328 

329 pod_name = pods.items[0].metadata.name 

330 

331 # Get logs 

332 logs = self.core_v1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace) 

333 

334 # Separate stdout/stderr based on content (similar to DockerSandbox) 

335 # This matches test expectations and provides consistent behavior 

336 if exit_code != 0 and not timed_out: 

337 # Job failed - check if logs contain Python error output 

338 if "Traceback" in logs or "Error" in logs or "RuntimeError" in logs or "SyntaxError" in logs: 

339 # Put error output in stderr 

340 return "", logs 

341 else: 

342 # No recognizable error pattern, but still failed 

343 # Put output in stderr since execution failed 

344 return "", logs 

345 else: 

346 # Success or timeout - everything is stdout 

347 # (timeout errors are handled by the caller) 

348 return logs, "" 

349 

350 except Exception as e: 

351 logger.warning(f"Failed to get logs for job {job_name}: {e}") 

352 return "", f"Error retrieving logs: {e}" 

353 

354 def _cleanup_job(self, job_name: str) -> None: 

355 """ 

356 Clean up Kubernetes job. 

357 

358 Args: 

359 job_name: Name of the job to delete 

360 """ 

361 try: 

362 self.batch_v1.delete_namespaced_job( 

363 name=job_name, 

364 namespace=self.namespace, 

365 propagation_policy="Background", # Delete pods too 

366 ) 

367 logger.debug(f"Deleted Kubernetes job: {job_name}") 

368 except ApiException as e: 

369 if e.status != 404: # Ignore if already deleted 

370 logger.warning(f"Failed to delete job {job_name}: {e}") 

371 except Exception as e: 

372 logger.warning(f"Error during job cleanup: {e}")