Coverage for src / mcp_server_langgraph / builder / api / server.py: 89%
160 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Visual Workflow Builder API
4FastAPI backend for the visual workflow builder.
6Endpoints:
7- POST /api/builder/generate - Generate Python code from workflow
8- POST /api/builder/validate - Validate workflow structure
9- GET /api/builder/templates - List workflow templates
10- POST /api/builder/save - Save workflow to file
12Example:
13 uvicorn mcp_server_langgraph.builder.api.server:app --reload
14"""
16import os
17import tempfile
18from pathlib import Path
19from typing import Any, Literal
21from fastapi import Depends, FastAPI, Header, HTTPException, status
22from fastapi.middleware.cors import CORSMiddleware
23from pydantic import BaseModel, field_validator
25from ..codegen import CodeGenerator, WorkflowDefinition
26from ..workflow import WorkflowBuilder
28# ==============================================================================
29# API Models
30# ==============================================================================
33class GenerateCodeRequest(BaseModel):
34 """Request to generate code from workflow."""
36 workflow: dict[str, Any]
39class GenerateCodeResponse(BaseModel):
40 """Response with generated code."""
42 code: str
43 formatted: bool
44 warnings: list[str] = []
47class ValidateWorkflowRequest(BaseModel):
48 """Request to validate workflow."""
50 workflow: dict[str, Any]
53class ValidateWorkflowResponse(BaseModel):
54 """Response with validation results."""
56 valid: bool
57 errors: list[str] = []
58 warnings: list[str] = []
61class SaveWorkflowRequest(BaseModel):
62 """Request to save workflow to file."""
64 workflow: dict[str, Any]
65 output_path: str
67 @field_validator("output_path")
68 @classmethod
69 def validate_output_path(cls, v: str) -> str:
70 """
71 Validate output path for security.
73 SECURITY: Prevents CWE-73 (External Control of File Name or Path) and
74 CWE-434 (Unrestricted Upload of File with Dangerous Type) by enforcing
75 strict path validation.
77 Only allows paths within designated safe directories.
78 """
79 # Convert to Path for normalization
80 path = Path(v).resolve()
82 # Define allowed base directories (configurable via environment)
83 # Use application-specific temp directory instead of hardcoded /tmp for better security
84 default_dir = Path(tempfile.gettempdir()) / "mcp-server-workflows"
85 allowed_base = os.getenv("BUILDER_OUTPUT_DIR", str(default_dir))
86 allowed_base_path = Path(allowed_base).resolve()
88 # Ensure the allowed base directory exists with secure permissions
89 allowed_base_path.mkdir(mode=0o700, parents=True, exist_ok=True)
91 # Check if path is within allowed directory
92 try:
93 path.relative_to(allowed_base_path)
94 except ValueError:
95 msg = (
96 f"Invalid output path. Must be within allowed directory: {allowed_base}. "
97 f"Use BUILDER_OUTPUT_DIR environment variable to configure safe directory."
98 )
99 raise ValueError(msg)
101 # Additional checks for common attack patterns
102 path_str = str(path)
103 if ".." in path_str or path_str.startswith("/etc/") or path_str.startswith("/sys/"): 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true
104 msg = "Path traversal detected. Invalid output path."
105 raise ValueError(msg)
107 # Ensure .py extension
108 if path.suffix != ".py": 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true
109 msg = "Output path must have .py extension"
110 raise ValueError(msg)
112 return str(path)
115class ImportWorkflowRequest(BaseModel):
116 """Request to import Python code into workflow."""
118 code: str
119 layout: Literal["hierarchical", "force", "grid"] = "hierarchical"
122# ==============================================================================
123# Security & Authentication
124# ==============================================================================
127def verify_builder_auth(authorization: str = Header(None)) -> None:
128 """
129 Verify authentication for builder endpoints.
131 SECURITY: Prevents unauthenticated access to code generation and file write endpoints.
132 Addresses OWASP A01:2021 - Broken Access Control.
134 In production, this should integrate with your main authentication system.
135 For now, we use a simple bearer token approach.
137 Args:
138 authorization: Authorization header (Bearer token)
140 Raises:
141 HTTPException: 401 if not authenticated
143 TODO: Integrate with main auth system (Keycloak, etc.)
144 """
145 # Allow unauthenticated access in development (local testing)
146 environment = os.getenv("ENVIRONMENT", "development")
147 if environment == "development" and not authorization: 147 ↛ 149line 147 didn't jump to line 149 because the condition on line 147 was never true
148 # Log warning but allow
149 print("WARNING: Builder accessed without auth in development mode")
150 return
152 # In production, require authentication
153 if not authorization: 153 ↛ 161line 153 didn't jump to line 161 because the condition on line 153 was always true
154 raise HTTPException(
155 status_code=status.HTTP_401_UNAUTHORIZED,
156 detail="Authentication required. Provide Bearer token in Authorization header.",
157 headers={"WWW-Authenticate": "Bearer"},
158 )
160 # Validate bearer token
161 if not authorization.startswith("Bearer "):
162 raise HTTPException(
163 status_code=status.HTTP_401_UNAUTHORIZED,
164 detail="Invalid authorization header format. Expected: Bearer <token>",
165 )
167 token = authorization[7:] # Remove "Bearer " prefix
169 # TODO: Validate token against your auth system
170 # For now, we check against environment variable
171 expected_token = os.getenv("BUILDER_AUTH_TOKEN")
172 if expected_token and token != expected_token:
173 raise HTTPException(
174 status_code=status.HTTP_401_UNAUTHORIZED,
175 detail="Invalid authentication token",
176 )
179# ==============================================================================
180# FastAPI Application
181# ==============================================================================
183app = FastAPI(
184 title="MCP Server Visual Workflow Builder",
185 description="Visual editor for LangGraph agent workflows with code export",
186 version="1.0.0",
187)
189# CORS for frontend
190app.add_middleware(
191 CORSMiddleware,
192 allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers
193 allow_credentials=True,
194 allow_methods=["*"],
195 allow_headers=["*"],
196)
199# ==============================================================================
200# Endpoints
201# ==============================================================================
204@app.get("/")
205def root() -> dict[str, Any]:
206 """API information."""
207 return {
208 "name": "Visual Workflow Builder API",
209 "version": "1.0.0",
210 "features": [
211 "Generate Python code from visual workflows",
212 "Validate workflow structure",
213 "Export production-ready code",
214 "Template library",
215 ],
216 "unique_feature": "Code export (OpenAI AgentKit doesn't have this!)",
217 }
220@app.post("/api/builder/generate")
221async def generate_code(
222 request: GenerateCodeRequest,
223 _auth: None = Depends(verify_builder_auth),
224) -> GenerateCodeResponse:
225 """
226 Generate Python code from visual workflow.
228 This is our unique feature! OpenAI AgentKit can't export code.
230 Args:
231 request: Workflow definition
233 Returns:
234 Generated Python code
236 Example:
237 POST /api/builder/generate
238 {
239 "workflow": {
240 "name": "my_agent",
241 "nodes": [...],
242 "edges": [...]
243 }
244 }
245 """
246 try:
247 # Parse workflow
248 workflow = WorkflowDefinition(**request.workflow)
250 # Generate code
251 generator = CodeGenerator()
252 code = generator.generate(workflow)
254 # Check for warnings
255 warnings = []
256 if not workflow.description:
257 warnings.append("Workflow description is empty - consider adding one")
259 if len(workflow.nodes) < 2:
260 warnings.append("Workflow has only one node - consider adding more steps")
262 return GenerateCodeResponse(code=code, formatted=True, warnings=warnings)
264 except Exception as e:
265 raise HTTPException(status_code=400, detail=f"Code generation failed: {e!s}")
268@app.post("/api/builder/validate")
269async def validate_workflow(request: ValidateWorkflowRequest) -> ValidateWorkflowResponse:
270 """
271 Validate workflow structure.
273 Checks for:
274 - Valid node IDs
275 - Valid edge connections
276 - Entry point exists
277 - No circular dependencies (future)
278 - Unreachable nodes
280 Args:
281 request: Workflow to validate
283 Returns:
284 Validation results
285 """
286 try:
287 workflow = WorkflowDefinition(**request.workflow)
289 errors = []
290 warnings = []
292 # Validate nodes
293 if not workflow.nodes:
294 errors.append("Workflow must have at least one node")
296 # Validate entry point
297 node_ids = {n.id for n in workflow.nodes}
298 if workflow.entry_point not in node_ids:
299 errors.append(f"Entry point '{workflow.entry_point}' not found in nodes")
301 # Validate edges
302 for edge in workflow.edges:
303 if edge.from_node not in node_ids:
304 errors.append(f"Edge source '{edge.from_node}' not found")
305 if edge.to_node not in node_ids:
306 errors.append(f"Edge target '{edge.to_node}' not found")
308 # Check for unreachable nodes
309 reachable = {workflow.entry_point}
310 queue = [workflow.entry_point]
312 while queue:
313 current = queue.pop(0)
314 for edge in workflow.edges:
315 if edge.from_node == current and edge.to_node not in reachable:
316 reachable.add(edge.to_node)
317 queue.append(edge.to_node)
319 unreachable = node_ids - reachable
320 if unreachable:
321 warnings.append(f"Unreachable nodes: {', '.join(unreachable)}")
323 # Check for nodes with no outgoing edges (potential dead ends)
324 nodes_with_outgoing = {e.from_node for e in workflow.edges}
325 terminal_nodes = node_ids - nodes_with_outgoing
327 if len(terminal_nodes) > 3:
328 warnings.append(f"Many terminal nodes ({len(terminal_nodes)}): {', '.join(list(terminal_nodes)[:3])}...")
330 return ValidateWorkflowResponse(valid=len(errors) == 0, errors=errors, warnings=warnings)
332 except Exception as e:
333 return ValidateWorkflowResponse(valid=False, errors=[f"Validation error: {e!s}"])
336@app.post("/api/builder/save")
337async def save_workflow(
338 request: SaveWorkflowRequest,
339 _auth: None = Depends(verify_builder_auth),
340) -> dict[str, Any]:
341 """
342 Save workflow to Python file.
344 Args:
345 request: Workflow and output path
347 Returns:
348 Success message
350 Example:
351 POST /api/builder/save
352 {
353 "workflow": {...},
354 "output_path": "src/agents/my_agent.py"
355 }
356 """
357 try:
358 # Parse and generate
359 workflow = WorkflowDefinition(**request.workflow)
360 generator = CodeGenerator()
362 # SECURITY: Ensure directory exists and is writable (but path is already validated)
363 output_path = Path(request.output_path)
364 output_path.parent.mkdir(parents=True, exist_ok=True)
366 # Generate and save
367 generator.generate_to_file(workflow, str(output_path))
369 return {"success": True, "message": f"Workflow saved to {request.output_path}", "path": request.output_path}
371 except ValueError as e:
372 # Path validation errors
373 raise HTTPException(status_code=400, detail=str(e))
374 except Exception as e:
375 raise HTTPException(status_code=500, detail=f"Save failed: {e!s}")
378@app.get("/api/builder/templates")
379async def list_templates() -> dict[str, Any]:
380 """
381 List available workflow templates.
383 Returns:
384 List of templates
386 Example:
387 GET /api/builder/templates
388 """
389 templates = [
390 {
391 "id": "simple_agent",
392 "name": "Simple Agent",
393 "description": "Basic single-node agent",
394 "complexity": "beginner",
395 "nodes": 1,
396 },
397 {
398 "id": "research_agent",
399 "name": "Research Agent",
400 "description": "Search and summarize workflow",
401 "complexity": "intermediate",
402 "nodes": 3,
403 },
404 {
405 "id": "customer_support",
406 "name": "Customer Support",
407 "description": "Multi-path support agent with escalation",
408 "complexity": "advanced",
409 "nodes": 6,
410 },
411 {
412 "id": "multi_agent_supervisor",
413 "name": "Multi-Agent Supervisor",
414 "description": "Supervisor coordinating multiple specialists",
415 "complexity": "advanced",
416 "nodes": 5,
417 },
418 ]
420 return {"templates": templates}
423@app.get("/api/builder/templates/{template_id}")
424async def get_template(template_id: str) -> dict[str, Any]:
425 """
426 Get a specific workflow template.
428 Args:
429 template_id: Template identifier
431 Returns:
432 Template definition
434 Example:
435 GET /api/builder/templates/research_agent
436 """
437 # Simple research agent template
438 if template_id == "research_agent":
439 builder = WorkflowBuilder("research_agent", "Research and summarize topics")
441 builder.add_state_field("query", "str")
442 builder.add_state_field("search_results", "List[str]")
443 builder.add_state_field("summary", "str")
445 builder.add_node("search", "tool", {"tool": "web_search"}, "Web Search")
446 builder.add_node("filter", "custom", {}, "Filter Results")
447 builder.add_node("summarize", "llm", {"model": "gemini-flash"}, "Summarize")
449 builder.add_edge("search", "filter")
450 builder.add_edge("filter", "summarize")
452 return {"template": builder.to_json()}
454 raise HTTPException(status_code=404, detail=f"Template '{template_id}' not found")
457@app.post("/api/builder/import")
458async def import_workflow(
459 request: ImportWorkflowRequest,
460 _auth: None = Depends(verify_builder_auth),
461) -> dict[str, Any]:
462 """
463 Import Python code into visual workflow.
465 Round-trip capability: Code → Visual (import) + Visual → Code (export)
467 Args:
468 request: Import request containing code and layout
470 Returns:
471 Workflow definition ready for visual builder
473 Example:
474 POST /api/builder/import
475 {
476 "code": "from langgraph.graph import StateGraph\\n...",
477 "layout": "hierarchical"
478 }
479 """
480 try:
481 from ..importer import import_from_code, validate_import
483 # Import code
484 workflow = import_from_code(request.code, layout_algorithm=request.layout)
486 # Validate
487 validation = validate_import(workflow)
489 return {
490 "workflow": workflow,
491 "validation": validation,
492 "message": "Code imported successfully",
493 }
495 except SyntaxError as e:
496 raise HTTPException(status_code=400, detail=f"Invalid Python syntax: {e!s}")
497 except Exception as e:
498 raise HTTPException(status_code=500, detail=f"Import failed: {e!s}")
501@app.get("/api/builder/node-types")
502async def list_node_types() -> dict[str, Any]:
503 """
504 List available node types for the builder.
506 Returns:
507 Node type definitions with configuration schemas
508 """
509 node_types = [
510 {
511 "type": "tool",
512 "name": "Tool",
513 "description": "Execute a tool or function",
514 "icon": "tool",
515 "config_schema": {"tool": {"type": "string", "required": True, "description": "Tool name"}},
516 },
517 {
518 "type": "llm",
519 "name": "LLM",
520 "description": "Call language model",
521 "icon": "brain",
522 "config_schema": {
523 "model": {"type": "string", "required": True, "description": "Model name"},
524 "temperature": {"type": "number", "required": False, "default": 0.7},
525 },
526 },
527 {
528 "type": "conditional",
529 "name": "Conditional",
530 "description": "Conditional routing based on state",
531 "icon": "split",
532 "config_schema": {},
533 },
534 {
535 "type": "approval",
536 "name": "Approval",
537 "description": "Human approval checkpoint",
538 "icon": "user-check",
539 "config_schema": {"risk_level": {"type": "string", "enum": ["low", "medium", "high", "critical"]}},
540 },
541 {
542 "type": "custom",
543 "name": "Custom",
544 "description": "Custom Python function",
545 "icon": "code",
546 "config_schema": {},
547 },
548 ]
550 return {"node_types": node_types}
553# ==============================================================================
554# Run Server
555# ==============================================================================
557if __name__ == "__main__":
558 import uvicorn
560 print("=" * 80)
561 print("🎨 Visual Workflow Builder API")
562 print("=" * 80)
563 print("\nStarting server...")
564 print("\n📍 Endpoints:")
565 print(" • Info: http://localhost:8001/")
566 print(" • Generate: POST http://localhost:8001/api/builder/generate")
567 print(" • Validate: POST http://localhost:8001/api/builder/validate")
568 print(" • Templates: GET http://localhost:8001/api/builder/templates")
569 print(" • Docs: http://localhost:8001/docs")
570 print("\n🌟 Unique Feature: Code Export (OpenAI AgentKit doesn't have this!)")
571 print("=" * 80)
572 print()
574 uvicorn.run(app, host="0.0.0.0", port=8001, reload=True) # nosec B104