Coverage for src / mcp_server_langgraph / builder / api / server.py: 88%
190 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 06:31 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 06:31 +0000
1"""
2Visual Workflow Builder API
4FastAPI backend for the visual workflow builder.
6Endpoints:
7- POST /api/builder/generate - Generate Python code from workflow
8- POST /api/builder/validate - Validate workflow structure
9- GET /api/builder/templates - List workflow templates
10- POST /api/builder/save - Save workflow to file
12Example:
13 uvicorn mcp_server_langgraph.builder.api.server:app --reload
14"""
16import os
17import tempfile
18from contextlib import asynccontextmanager
19from pathlib import Path
20from typing import Any, AsyncIterator, Literal
22from fastapi import Depends, FastAPI, Header, HTTPException, status
23from fastapi.middleware.cors import CORSMiddleware
24from pydantic import BaseModel, field_validator
26from mcp_server_langgraph.observability.telemetry import (
27 init_observability,
28 is_initialized,
29 logger,
30 shutdown_observability,
31 tracer,
32)
34from ..codegen import CodeGenerator, WorkflowDefinition
35from ..workflow import WorkflowBuilder
37# ==============================================================================
38# API Models
39# ==============================================================================
42class GenerateCodeRequest(BaseModel):
43 """Request to generate code from workflow."""
45 workflow: dict[str, Any]
48class GenerateCodeResponse(BaseModel):
49 """Response with generated code."""
51 code: str
52 formatted: bool
53 warnings: list[str] = []
56class ValidateWorkflowRequest(BaseModel):
57 """Request to validate workflow."""
59 workflow: dict[str, Any]
62class ValidateWorkflowResponse(BaseModel):
63 """Response with validation results."""
65 valid: bool
66 errors: list[str] = []
67 warnings: list[str] = []
70class SaveWorkflowRequest(BaseModel):
71 """Request to save workflow to file."""
73 workflow: dict[str, Any]
74 output_path: str
76 @field_validator("output_path")
77 @classmethod
78 def validate_output_path(cls, v: str) -> str:
79 """
80 Validate output path for security.
82 SECURITY: Prevents CWE-73 (External Control of File Name or Path) and
83 CWE-434 (Unrestricted Upload of File with Dangerous Type) by enforcing
84 strict path validation.
86 Only allows paths within designated safe directories.
87 """
88 # Convert to Path for normalization
89 path = Path(v).resolve()
91 # Define allowed base directories (configurable via environment)
92 # Use application-specific temp directory instead of hardcoded /tmp for better security
93 default_dir = Path(tempfile.gettempdir()) / "mcp-server-workflows"
94 allowed_base = os.getenv("BUILDER_OUTPUT_DIR", str(default_dir))
95 allowed_base_path = Path(allowed_base).resolve()
97 # Ensure the allowed base directory exists with secure permissions
98 allowed_base_path.mkdir(mode=0o700, parents=True, exist_ok=True)
100 # Check if path is within allowed directory
101 try:
102 path.relative_to(allowed_base_path)
103 except ValueError:
104 msg = (
105 f"Invalid output path. Must be within allowed directory: {allowed_base}. "
106 f"Use BUILDER_OUTPUT_DIR environment variable to configure safe directory."
107 )
108 raise ValueError(msg)
110 # Additional checks for common attack patterns
111 path_str = str(path)
112 if ".." in path_str or path_str.startswith("/etc/") or path_str.startswith("/sys/"): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true
113 msg = "Path traversal detected. Invalid output path."
114 raise ValueError(msg)
116 # Ensure .py extension
117 if path.suffix != ".py": 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true
118 msg = "Output path must have .py extension"
119 raise ValueError(msg)
121 return str(path)
124class ImportWorkflowRequest(BaseModel):
125 """Request to import Python code into workflow."""
127 code: str
128 layout: Literal["hierarchical", "force", "grid"] = "hierarchical"
131# ==============================================================================
132# Security & Authentication
133# ==============================================================================
136def verify_builder_auth(authorization: str = Header(None)) -> None:
137 """
138 Verify authentication for builder endpoints.
140 SECURITY: Prevents unauthenticated access to code generation and file write endpoints.
141 Addresses OWASP A01:2021 - Broken Access Control.
143 In production, this should integrate with your main authentication system.
144 For now, we use a simple bearer token approach.
146 Args:
147 authorization: Authorization header (Bearer token)
149 Raises:
150 HTTPException: 401 if not authenticated
152 TODO: Integrate with main auth system (Keycloak, etc.)
153 """
154 # Allow unauthenticated access in development (local testing)
155 environment = os.getenv("ENVIRONMENT", "development")
156 if environment == "development" and not authorization:
157 # Log warning but allow
158 print("WARNING: Builder accessed without auth in development mode")
159 return
161 # In production, require authentication
162 if not authorization: 162 ↛ 170line 162 didn't jump to line 170 because the condition on line 162 was always true
163 raise HTTPException(
164 status_code=status.HTTP_401_UNAUTHORIZED,
165 detail="Authentication required. Provide Bearer token in Authorization header.",
166 headers={"WWW-Authenticate": "Bearer"},
167 )
169 # Validate bearer token
170 if not authorization.startswith("Bearer "):
171 raise HTTPException(
172 status_code=status.HTTP_401_UNAUTHORIZED,
173 detail="Invalid authorization header format. Expected: Bearer <token>",
174 )
176 token = authorization[7:] # Remove "Bearer " prefix
178 # TODO: Validate token against your auth system
179 # For now, we check against environment variable
180 expected_token = os.getenv("BUILDER_AUTH_TOKEN")
181 if expected_token and token != expected_token:
182 raise HTTPException(
183 status_code=status.HTTP_401_UNAUTHORIZED,
184 detail="Invalid authentication token",
185 )
188# ==============================================================================
189# FastAPI Application
190# ==============================================================================
193@asynccontextmanager
194async def lifespan(app: FastAPI) -> AsyncIterator[None]:
195 """
196 Builder service lifecycle with observability.
198 Initializes OpenTelemetry tracing and metrics on startup,
199 gracefully shuts down exporters on termination.
200 """
201 # STARTUP
202 if not is_initialized(): 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true
203 try:
204 from mcp_server_langgraph.core.config import Settings
206 settings = Settings()
207 init_observability(
208 settings=settings,
209 enable_file_logging=False,
210 )
211 logger.info("Builder service started with observability")
212 except Exception as e:
213 # Don't fail startup if observability init fails
214 print(f"WARNING: Observability initialization failed: {e}")
216 yield # Application runs here
218 # SHUTDOWN
219 logger.info("Builder service shutting down")
220 shutdown_observability()
223app = FastAPI(
224 title="MCP Server Visual Workflow Builder",
225 description="Visual editor for LangGraph agent workflows with code export",
226 version="1.0.0",
227 lifespan=lifespan,
228)
230# CORS for frontend
231app.add_middleware(
232 CORSMiddleware,
233 allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers
234 allow_credentials=True,
235 allow_methods=["*"],
236 allow_headers=["*"],
237)
240# ==============================================================================
241# Endpoints
242# ==============================================================================
245@app.get("/")
246def root() -> dict[str, Any]:
247 """API information."""
248 return {
249 "name": "Visual Workflow Builder API",
250 "version": "1.0.0",
251 "features": [
252 "Generate Python code from visual workflows",
253 "Validate workflow structure",
254 "Export production-ready code",
255 "Template library",
256 ],
257 "unique_feature": "Code export (OpenAI AgentKit doesn't have this!)",
258 }
261@app.get("/api/builder/health")
262def health_check() -> dict[str, str]:
263 """
264 Health check endpoint for Kubernetes probes.
266 This endpoint is publicly accessible (no authentication required)
267 to support liveness and readiness probes.
269 Returns:
270 Health status
271 """
272 return {"status": "healthy"}
275@app.post("/api/builder/generate")
276async def generate_code(
277 request: GenerateCodeRequest,
278 _auth: None = Depends(verify_builder_auth),
279) -> GenerateCodeResponse:
280 """
281 Generate Python code from visual workflow.
283 This is our unique feature! OpenAI AgentKit can't export code.
285 Args:
286 request: Workflow definition
288 Returns:
289 Generated Python code
291 Example:
292 POST /api/builder/generate
293 {
294 "workflow": {
295 "name": "my_agent",
296 "nodes": [...],
297 "edges": [...]
298 }
299 }
300 """
301 with tracer.start_as_current_span(
302 "builder.generate_code",
303 attributes={
304 "workflow.name": request.workflow.get("name", "unknown"),
305 "workflow.node_count": len(request.workflow.get("nodes", [])),
306 },
307 ):
308 try:
309 # Parse workflow
310 workflow = WorkflowDefinition(**request.workflow)
312 # Generate code
313 generator = CodeGenerator()
314 code = generator.generate(workflow)
316 # Check for warnings
317 warnings = []
318 if not workflow.description:
319 warnings.append("Workflow description is empty - consider adding one")
321 if len(workflow.nodes) < 2:
322 warnings.append("Workflow has only one node - consider adding more steps")
324 logger.info(
325 "Code generated successfully",
326 extra={
327 "workflow_name": workflow.name,
328 "node_count": len(workflow.nodes),
329 "code_length": len(code),
330 },
331 )
333 return GenerateCodeResponse(code=code, formatted=True, warnings=warnings)
335 except Exception as e:
336 logger.warning(f"Code generation failed: {e!s}")
337 raise HTTPException(status_code=400, detail=f"Code generation failed: {e!s}")
340@app.post("/api/builder/validate")
341async def validate_workflow(request: ValidateWorkflowRequest) -> ValidateWorkflowResponse:
342 """
343 Validate workflow structure.
345 Checks for:
346 - Valid node IDs
347 - Valid edge connections
348 - Entry point exists
349 - No circular dependencies (future)
350 - Unreachable nodes
352 Args:
353 request: Workflow to validate
355 Returns:
356 Validation results
357 """
358 with tracer.start_as_current_span(
359 "builder.validate_workflow",
360 attributes={
361 "workflow.name": request.workflow.get("name", "unknown"),
362 "workflow.node_count": len(request.workflow.get("nodes", [])),
363 "workflow.edge_count": len(request.workflow.get("edges", [])),
364 },
365 ):
366 try:
367 workflow = WorkflowDefinition(**request.workflow)
369 errors = []
370 warnings = []
372 # Validate nodes
373 if not workflow.nodes:
374 errors.append("Workflow must have at least one node")
376 # Validate entry point
377 node_ids = {n.id for n in workflow.nodes}
378 if workflow.entry_point not in node_ids:
379 errors.append(f"Entry point '{workflow.entry_point}' not found in nodes")
381 # Validate edges
382 for edge in workflow.edges:
383 if edge.from_node not in node_ids:
384 errors.append(f"Edge source '{edge.from_node}' not found")
385 if edge.to_node not in node_ids:
386 errors.append(f"Edge target '{edge.to_node}' not found")
388 # Check for unreachable nodes
389 reachable = {workflow.entry_point}
390 queue = [workflow.entry_point]
392 while queue:
393 current = queue.pop(0)
394 for edge in workflow.edges:
395 if edge.from_node == current and edge.to_node not in reachable:
396 reachable.add(edge.to_node)
397 queue.append(edge.to_node)
399 unreachable = node_ids - reachable
400 if unreachable:
401 warnings.append(f"Unreachable nodes: {', '.join(unreachable)}")
403 # Check for nodes with no outgoing edges (potential dead ends)
404 nodes_with_outgoing = {e.from_node for e in workflow.edges}
405 terminal_nodes = node_ids - nodes_with_outgoing
407 if len(terminal_nodes) > 3:
408 warnings.append(f"Many terminal nodes ({len(terminal_nodes)}): {', '.join(list(terminal_nodes)[:3])}...")
410 is_valid = len(errors) == 0
411 logger.info(
412 "Workflow validation complete",
413 extra={
414 "workflow_name": workflow.name,
415 "valid": is_valid,
416 "error_count": len(errors),
417 "warning_count": len(warnings),
418 },
419 )
421 return ValidateWorkflowResponse(valid=is_valid, errors=errors, warnings=warnings)
423 except Exception as e:
424 logger.warning(f"Workflow validation failed: {e!s}")
425 return ValidateWorkflowResponse(valid=False, errors=[f"Validation error: {e!s}"])
428@app.post("/api/builder/save")
429async def save_workflow(
430 request: SaveWorkflowRequest,
431 _auth: None = Depends(verify_builder_auth),
432) -> dict[str, Any]:
433 """
434 Save workflow to Python file.
436 Args:
437 request: Workflow and output path
439 Returns:
440 Success message
442 Example:
443 POST /api/builder/save
444 {
445 "workflow": {...},
446 "output_path": "src/agents/my_agent.py"
447 }
448 """
449 try:
450 # Parse and generate
451 workflow = WorkflowDefinition(**request.workflow)
452 generator = CodeGenerator()
454 # SECURITY: Ensure directory exists and is writable (but path is already validated)
455 output_path = Path(request.output_path)
456 output_path.parent.mkdir(parents=True, exist_ok=True)
458 # Generate and save
459 generator.generate_to_file(workflow, str(output_path))
461 return {"success": True, "message": f"Workflow saved to {request.output_path}", "path": request.output_path}
463 except ValueError as e:
464 # Path validation errors
465 raise HTTPException(status_code=400, detail=str(e))
466 except Exception as e:
467 raise HTTPException(status_code=500, detail=f"Save failed: {e!s}")
470@app.get("/api/builder/templates")
471async def list_templates() -> dict[str, Any]:
472 """
473 List available workflow templates.
475 Returns:
476 List of templates
478 Example:
479 GET /api/builder/templates
480 """
481 templates = [
482 {
483 "id": "simple_agent",
484 "name": "Simple Agent",
485 "description": "Basic single-node agent",
486 "complexity": "beginner",
487 "nodes": 1,
488 },
489 {
490 "id": "research_agent",
491 "name": "Research Agent",
492 "description": "Search and summarize workflow",
493 "complexity": "intermediate",
494 "nodes": 3,
495 },
496 {
497 "id": "customer_support",
498 "name": "Customer Support",
499 "description": "Multi-path support agent with escalation",
500 "complexity": "advanced",
501 "nodes": 6,
502 },
503 {
504 "id": "multi_agent_supervisor",
505 "name": "Multi-Agent Supervisor",
506 "description": "Supervisor coordinating multiple specialists",
507 "complexity": "advanced",
508 "nodes": 5,
509 },
510 ]
512 return {"templates": templates}
515@app.get("/api/builder/templates/{template_id}")
516async def get_template(template_id: str) -> dict[str, Any]:
517 """
518 Get a specific workflow template.
520 Args:
521 template_id: Template identifier
523 Returns:
524 Template definition
526 Example:
527 GET /api/builder/templates/research_agent
528 """
529 # Simple research agent template
530 if template_id == "research_agent":
531 builder = WorkflowBuilder("research_agent", "Research and summarize topics")
533 builder.add_state_field("query", "str")
534 builder.add_state_field("search_results", "List[str]")
535 builder.add_state_field("summary", "str")
537 builder.add_node("search", "tool", {"tool": "web_search"}, "Web Search")
538 builder.add_node("filter", "custom", {}, "Filter Results")
539 builder.add_node("summarize", "llm", {"model": "gemini-flash"}, "Summarize")
541 builder.add_edge("search", "filter")
542 builder.add_edge("filter", "summarize")
544 return {"template": builder.to_json()}
546 raise HTTPException(status_code=404, detail=f"Template '{template_id}' not found")
549@app.post("/api/builder/import")
550async def import_workflow(
551 request: ImportWorkflowRequest,
552 _auth: None = Depends(verify_builder_auth),
553) -> dict[str, Any]:
554 """
555 Import Python code into visual workflow.
557 Round-trip capability: Code → Visual (import) + Visual → Code (export)
559 Args:
560 request: Import request containing code and layout
562 Returns:
563 Workflow definition ready for visual builder
565 Example:
566 POST /api/builder/import
567 {
568 "code": "from langgraph.graph import StateGraph\\n...",
569 "layout": "hierarchical"
570 }
571 """
572 try:
573 from ..importer import import_from_code, validate_import
575 # Import code
576 workflow = import_from_code(request.code, layout_algorithm=request.layout)
578 # Validate
579 validation = validate_import(workflow)
581 return {
582 "workflow": workflow,
583 "validation": validation,
584 "message": "Code imported successfully",
585 }
587 except SyntaxError as e:
588 raise HTTPException(status_code=400, detail=f"Invalid Python syntax: {e!s}")
589 except Exception as e:
590 raise HTTPException(status_code=500, detail=f"Import failed: {e!s}")
593@app.get("/api/builder/node-types")
594async def list_node_types() -> dict[str, Any]:
595 """
596 List available node types for the builder.
598 Returns:
599 Node type definitions with configuration schemas
600 """
601 node_types = [
602 {
603 "type": "tool",
604 "name": "Tool",
605 "description": "Execute a tool or function",
606 "icon": "tool",
607 "config_schema": {"tool": {"type": "string", "required": True, "description": "Tool name"}},
608 },
609 {
610 "type": "llm",
611 "name": "LLM",
612 "description": "Call language model",
613 "icon": "brain",
614 "config_schema": {
615 "model": {"type": "string", "required": True, "description": "Model name"},
616 "temperature": {"type": "number", "required": False, "default": 0.7},
617 },
618 },
619 {
620 "type": "conditional",
621 "name": "Conditional",
622 "description": "Conditional routing based on state",
623 "icon": "split",
624 "config_schema": {},
625 },
626 {
627 "type": "approval",
628 "name": "Approval",
629 "description": "Human approval checkpoint",
630 "icon": "user-check",
631 "config_schema": {"risk_level": {"type": "string", "enum": ["low", "medium", "high", "critical"]}},
632 },
633 {
634 "type": "custom",
635 "name": "Custom",
636 "description": "Custom Python function",
637 "icon": "code",
638 "config_schema": {},
639 },
640 ]
642 return {"node_types": node_types}
645# ==============================================================================
646# SPA Static Files Mount (React Frontend)
647# ==============================================================================
648# Mount AFTER all API routes - SPAStaticFiles is a catch-all for client-side routing
650from mcp_server_langgraph.utils.spa_static_files import create_spa_static_files
652# Calculate frontend dist path relative to this module
653_frontend_dist = Path(__file__).parent.parent / "frontend" / "dist"
655# Only mount if frontend is built (graceful degradation for API-only mode)
656_spa_handler = create_spa_static_files(str(_frontend_dist), caching=True)
657if _spa_handler is not None:
658 app.mount("/", _spa_handler, name="spa")
661# ==============================================================================
662# Run Server
663# ==============================================================================
665if __name__ == "__main__":
666 import uvicorn
668 print("=" * 80)
669 print("🎨 Visual Workflow Builder API")
670 print("=" * 80)
671 print("\nStarting server...")
672 print("\n📍 Endpoints:")
673 print(" • Info: http://localhost:8001/")
674 print(" • Generate: POST http://localhost:8001/api/builder/generate")
675 print(" • Validate: POST http://localhost:8001/api/builder/validate")
676 print(" • Templates: GET http://localhost:8001/api/builder/templates")
677 print(" • Docs: http://localhost:8001/docs")
678 print("\n🌟 Unique Feature: Code Export (OpenAI AgentKit doesn't have this!)")
679 print("=" * 80)
680 print()
682 uvicorn.run(app, host="0.0.0.0", port=8001, reload=True) # nosec B104