Coverage for src / mcp_server_langgraph / execution / kubernetes_sandbox.py: 11%
121 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Kubernetes-based sandbox for code execution
4Provides secure isolated Python code execution using Kubernetes Jobs.
5Supports resource limits, automatic cleanup with TTL, and pod security policies.
6"""
8import logging
9import time
11from kubernetes import client, config
12from kubernetes.client.rest import ApiException
14from mcp_server_langgraph.execution.resource_limits import ResourceLimits
15from mcp_server_langgraph.execution.sandbox import ExecutionResult, Sandbox, SandboxError
16import contextlib
18logger = logging.getLogger(__name__)
21class KubernetesSandbox(Sandbox):
22 """
23 Kubernetes-based sandbox for executing Python code in isolated pods.
25 Features:
26 - Ephemeral Jobs (created and destroyed per execution)
27 - Resource limits (CPU, memory, timeout)
28 - Pod security policies
29 - Automatic cleanup with TTL
30 - Read-only root filesystem
31 - No privilege escalation
33 Example:
34 >>> limits = ResourceLimits(timeout_seconds=30, memory_limit_mb=512)
35 >>> sandbox = KubernetesSandbox(limits=limits, namespace="default")
36 >>> result = sandbox.execute("print('Hello')")
37 >>> assert result.success
38 >>> assert "Hello" in result.stdout
39 """
41 def __init__(
42 self,
43 limits: ResourceLimits,
44 namespace: str = "default",
45 image: str = "python:3.12-slim",
46 job_ttl: int = 300, # TTL in seconds for job cleanup
47 ):
48 """
49 Initialize Kubernetes sandbox.
51 Args:
52 limits: Resource limits to enforce
53 namespace: Kubernetes namespace for jobs
54 image: Container image to use
55 job_ttl: Time to live for completed jobs (seconds)
57 Raises:
58 SandboxError: If Kubernetes is not available
59 """
60 super().__init__(limits)
61 self.namespace = namespace
62 self.image = image
63 self.job_ttl = job_ttl
65 try:
66 # Load kubeconfig (in-cluster or from ~/.kube/config)
67 try:
68 config.load_incluster_config() # Try in-cluster first
69 logger.debug("Loaded in-cluster Kubernetes config")
70 except config.ConfigException:
71 config.load_kube_config() # Fall back to kubeconfig file
72 logger.debug("Loaded Kubernetes config from kubeconfig file")
74 self.batch_v1 = client.BatchV1Api()
75 self.core_v1 = client.CoreV1Api()
77 # Verify namespace exists
78 self._verify_namespace()
80 except Exception as e:
81 msg = f"Kubernetes not available: {e}"
82 raise SandboxError(msg)
84 def _verify_namespace(self) -> None:
85 """Verify that the namespace exists"""
86 try:
87 self.core_v1.read_namespace(name=self.namespace)
88 except ApiException as e:
89 if e.status == 404:
90 msg = f"Namespace '{self.namespace}' does not exist"
91 raise SandboxError(msg)
92 msg = f"Failed to verify namespace: {e}"
93 raise SandboxError(msg)
95 def execute(self, code: str) -> ExecutionResult:
96 """
97 Execute Python code in a Kubernetes Job.
99 Args:
100 code: Python source code to execute
102 Returns:
103 ExecutionResult with execution status and outputs
105 Raises:
106 SandboxError: If job creation or execution fails
107 """
108 if not code or not code.strip():
109 return self._create_failure_result(
110 stdout="",
111 stderr="Error: Empty code provided",
112 exit_code=1,
113 execution_time=0.0,
114 error_message="Empty code provided",
115 )
117 job_name = None
118 start_time = time.time()
120 try:
121 # Create job
122 job_name = self._create_job(code)
124 # Wait for completion with timeout
125 timed_out, exit_code = self._wait_for_job(job_name, start_time)
127 execution_time = self._measure_time(start_time)
129 # Get logs from pod
130 stdout, stderr = self._get_job_logs(job_name, exit_code, timed_out)
132 # Cleanup job (TTL will also clean up, but we can do it immediately)
133 self._cleanup_job(job_name)
135 # Create result
136 if timed_out:
137 return self._create_failure_result(
138 stdout=stdout,
139 stderr=stderr or f"Execution timed out after {self.limits.timeout_seconds}s",
140 exit_code=exit_code,
141 execution_time=execution_time,
142 timed_out=True,
143 error_message=f"Timeout after {self.limits.timeout_seconds}s",
144 )
145 elif exit_code == 0:
146 return self._create_success_result(
147 stdout=stdout,
148 stderr=stderr,
149 execution_time=execution_time,
150 )
151 else:
152 return self._create_failure_result(
153 stdout=stdout,
154 stderr=stderr,
155 exit_code=exit_code,
156 execution_time=execution_time,
157 error_message=f"Process exited with code {exit_code}",
158 )
160 except Exception as e:
161 execution_time = self._measure_time(start_time)
163 # Cleanup on error
164 if job_name:
165 self._cleanup_job(job_name)
167 logger.error(f"Kubernetes execution failed: {e}", exc_info=True)
168 msg = f"Kubernetes execution failed: {e}"
169 raise SandboxError(msg)
171 def _create_job(self, code: str) -> str:
172 """
173 Create Kubernetes Job for code execution.
175 Args:
176 code: Python code to execute
178 Returns:
179 Job name
181 Raises:
182 SandboxError: If job creation fails
183 """
184 import hashlib
186 # Generate unique job name
187 # Use SHA-256 for better security hygiene (even though this is just for naming, not cryptographic security)
188 code_hash = hashlib.sha256(code.encode()).hexdigest()[:8]
189 timestamp = int(time.time())
190 job_name = f"code-exec-{timestamp}-{code_hash}"
192 try:
193 # Configure resource requests and limits
194 resources = client.V1ResourceRequirements(
195 requests={
196 "cpu": str(self.limits.cpu_quota),
197 "memory": f"{self.limits.memory_limit_mb}Mi",
198 },
199 limits={
200 "cpu": str(self.limits.cpu_quota),
201 "memory": f"{self.limits.memory_limit_mb}Mi",
202 },
203 )
205 # Configure security context
206 security_context = client.V1SecurityContext(
207 allow_privilege_escalation=False,
208 run_as_non_root=True,
209 run_as_user=1000, # Non-root user
210 read_only_root_filesystem=False, # Need writable /tmp
211 capabilities=client.V1Capabilities(drop=["ALL"]),
212 )
214 # Configure container
215 container = client.V1Container(
216 name="executor",
217 image=self.image,
218 command=["python", "-c", code],
219 resources=resources,
220 security_context=security_context,
221 )
223 # Configure pod template
224 pod_template = client.V1PodTemplateSpec(
225 metadata=client.V1ObjectMeta(labels={"app": "code-execution"}),
226 spec=client.V1PodSpec(
227 containers=[container],
228 restart_policy="Never",
229 # Pod security
230 security_context=client.V1PodSecurityContext(
231 run_as_non_root=True,
232 run_as_user=1000,
233 fs_group=1000,
234 ),
235 ),
236 )
238 # Configure job
239 job = client.V1Job(
240 api_version="batch/v1",
241 kind="Job",
242 metadata=client.V1ObjectMeta(name=job_name),
243 spec=client.V1JobSpec(
244 template=pod_template,
245 backoff_limit=0, # Don't retry on failure
246 ttl_seconds_after_finished=self.job_ttl, # Auto-cleanup
247 active_deadline_seconds=self.limits.timeout_seconds, # Timeout
248 ),
249 )
251 # Create job
252 self.batch_v1.create_namespaced_job(namespace=self.namespace, body=job)
253 logger.debug(f"Created Kubernetes job: {job_name}")
255 return job_name
257 except Exception as e:
258 logger.error(f"Failed to create Kubernetes job: {e}", exc_info=True)
259 msg = f"Failed to create Kubernetes job: {e}"
260 raise SandboxError(msg)
262 def _wait_for_job(self, job_name: str, start_time: float) -> tuple[bool, int]:
263 """
264 Wait for job to complete.
266 Args:
267 job_name: Name of the job
268 start_time: Start time for timeout calculation
270 Returns:
271 Tuple of (timed_out, exit_code)
272 """
273 timeout = self.limits.timeout_seconds
274 poll_interval = 1 # Poll every second
276 while True:
277 elapsed = time.time() - start_time
278 if elapsed >= timeout:
279 # Timeout - delete job
280 with contextlib.suppress(Exception):
281 self.batch_v1.delete_namespaced_job(
282 name=job_name,
283 namespace=self.namespace,
284 propagation_policy="Background",
285 )
286 return True, 124 # Timeout exit code
288 try:
289 # Check job status
290 job = self.batch_v1.read_namespaced_job(name=job_name, namespace=self.namespace)
292 if job.status.succeeded:
293 return False, 0
294 elif job.status.failed:
295 return False, 1
297 # Job still running, wait and check again
298 time.sleep(poll_interval)
300 except ApiException as e:
301 if e.status == 404:
302 # Job not found (might have been deleted)
303 return False, 1
304 raise
306 def _get_job_logs(self, job_name: str, exit_code: int, timed_out: bool) -> tuple[str, str]:
307 """
308 Get logs from job pod and separate stdout/stderr based on content.
310 Kubernetes combines stdout and stderr into a single log stream.
311 We separate them based on exit code and content patterns to match
312 the behavior of DockerSandbox and meet test expectations.
314 Args:
315 job_name: Name of the job
316 exit_code: Job exit code (0 for success, non-zero for failure)
317 timed_out: Whether the job timed out
319 Returns:
320 Tuple of (stdout, stderr)
321 """
322 try:
323 # Find pod for job
324 pods = self.core_v1.list_namespaced_pod(namespace=self.namespace, label_selector=f"job-name={job_name}")
326 if not pods.items:
327 return "", "Error: No pod found for job"
329 pod_name = pods.items[0].metadata.name
331 # Get logs
332 logs = self.core_v1.read_namespaced_pod_log(name=pod_name, namespace=self.namespace)
334 # Separate stdout/stderr based on content (similar to DockerSandbox)
335 # This matches test expectations and provides consistent behavior
336 if exit_code != 0 and not timed_out:
337 # Job failed - check if logs contain Python error output
338 if "Traceback" in logs or "Error" in logs or "RuntimeError" in logs or "SyntaxError" in logs:
339 # Put error output in stderr
340 return "", logs
341 else:
342 # No recognizable error pattern, but still failed
343 # Put output in stderr since execution failed
344 return "", logs
345 else:
346 # Success or timeout - everything is stdout
347 # (timeout errors are handled by the caller)
348 return logs, ""
350 except Exception as e:
351 logger.warning(f"Failed to get logs for job {job_name}: {e}")
352 return "", f"Error retrieving logs: {e}"
354 def _cleanup_job(self, job_name: str) -> None:
355 """
356 Clean up Kubernetes job.
358 Args:
359 job_name: Name of the job to delete
360 """
361 try:
362 self.batch_v1.delete_namespaced_job(
363 name=job_name,
364 namespace=self.namespace,
365 propagation_policy="Background", # Delete pods too
366 )
367 logger.debug(f"Deleted Kubernetes job: {job_name}")
368 except ApiException as e:
369 if e.status != 404: # Ignore if already deleted
370 logger.warning(f"Failed to delete job {job_name}: {e}")
371 except Exception as e:
372 logger.warning(f"Error during job cleanup: {e}")