Coverage for src / mcp_server_langgraph / builder / api / server.py: 88%

190 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-08 06:31 +0000

1""" 

2Visual Workflow Builder API 

3 

4FastAPI backend for the visual workflow builder. 

5 

6Endpoints: 

7- POST /api/builder/generate - Generate Python code from workflow 

8- POST /api/builder/validate - Validate workflow structure 

9- GET /api/builder/templates - List workflow templates 

10- POST /api/builder/save - Save workflow to file 

11 

12Example: 

13 uvicorn mcp_server_langgraph.builder.api.server:app --reload 

14""" 

15 

16import os 

17import tempfile 

18from contextlib import asynccontextmanager 

19from pathlib import Path 

20from typing import Any, AsyncIterator, Literal 

21 

22from fastapi import Depends, FastAPI, Header, HTTPException, status 

23from fastapi.middleware.cors import CORSMiddleware 

24from pydantic import BaseModel, field_validator 

25 

26from mcp_server_langgraph.observability.telemetry import ( 

27 init_observability, 

28 is_initialized, 

29 logger, 

30 shutdown_observability, 

31 tracer, 

32) 

33 

34from ..codegen import CodeGenerator, WorkflowDefinition 

35from ..workflow import WorkflowBuilder 

36 

37# ============================================================================== 

38# API Models 

39# ============================================================================== 

40 

41 

42class GenerateCodeRequest(BaseModel): 

43 """Request to generate code from workflow.""" 

44 

45 workflow: dict[str, Any] 

46 

47 

48class GenerateCodeResponse(BaseModel): 

49 """Response with generated code.""" 

50 

51 code: str 

52 formatted: bool 

53 warnings: list[str] = [] 

54 

55 

56class ValidateWorkflowRequest(BaseModel): 

57 """Request to validate workflow.""" 

58 

59 workflow: dict[str, Any] 

60 

61 

62class ValidateWorkflowResponse(BaseModel): 

63 """Response with validation results.""" 

64 

65 valid: bool 

66 errors: list[str] = [] 

67 warnings: list[str] = [] 

68 

69 

70class SaveWorkflowRequest(BaseModel): 

71 """Request to save workflow to file.""" 

72 

73 workflow: dict[str, Any] 

74 output_path: str 

75 

76 @field_validator("output_path") 

77 @classmethod 

78 def validate_output_path(cls, v: str) -> str: 

79 """ 

80 Validate output path for security. 

81 

82 SECURITY: Prevents CWE-73 (External Control of File Name or Path) and 

83 CWE-434 (Unrestricted Upload of File with Dangerous Type) by enforcing 

84 strict path validation. 

85 

86 Only allows paths within designated safe directories. 

87 """ 

88 # Convert to Path for normalization 

89 path = Path(v).resolve() 

90 

91 # Define allowed base directories (configurable via environment) 

92 # Use application-specific temp directory instead of hardcoded /tmp for better security 

93 default_dir = Path(tempfile.gettempdir()) / "mcp-server-workflows" 

94 allowed_base = os.getenv("BUILDER_OUTPUT_DIR", str(default_dir)) 

95 allowed_base_path = Path(allowed_base).resolve() 

96 

97 # Ensure the allowed base directory exists with secure permissions 

98 allowed_base_path.mkdir(mode=0o700, parents=True, exist_ok=True) 

99 

100 # Check if path is within allowed directory 

101 try: 

102 path.relative_to(allowed_base_path) 

103 except ValueError: 

104 msg = ( 

105 f"Invalid output path. Must be within allowed directory: {allowed_base}. " 

106 f"Use BUILDER_OUTPUT_DIR environment variable to configure safe directory." 

107 ) 

108 raise ValueError(msg) 

109 

110 # Additional checks for common attack patterns 

111 path_str = str(path) 

112 if ".." in path_str or path_str.startswith("/etc/") or path_str.startswith("/sys/"): 112 ↛ 113line 112 didn't jump to line 113 because the condition on line 112 was never true

113 msg = "Path traversal detected. Invalid output path." 

114 raise ValueError(msg) 

115 

116 # Ensure .py extension 

117 if path.suffix != ".py": 117 ↛ 118line 117 didn't jump to line 118 because the condition on line 117 was never true

118 msg = "Output path must have .py extension" 

119 raise ValueError(msg) 

120 

121 return str(path) 

122 

123 

124class ImportWorkflowRequest(BaseModel): 

125 """Request to import Python code into workflow.""" 

126 

127 code: str 

128 layout: Literal["hierarchical", "force", "grid"] = "hierarchical" 

129 

130 

131# ============================================================================== 

132# Security & Authentication 

133# ============================================================================== 

134 

135 

136def verify_builder_auth(authorization: str = Header(None)) -> None: 

137 """ 

138 Verify authentication for builder endpoints. 

139 

140 SECURITY: Prevents unauthenticated access to code generation and file write endpoints. 

141 Addresses OWASP A01:2021 - Broken Access Control. 

142 

143 In production, this should integrate with your main authentication system. 

144 For now, we use a simple bearer token approach. 

145 

146 Args: 

147 authorization: Authorization header (Bearer token) 

148 

149 Raises: 

150 HTTPException: 401 if not authenticated 

151 

152 TODO: Integrate with main auth system (Keycloak, etc.) 

153 """ 

154 # Allow unauthenticated access in development (local testing) 

155 environment = os.getenv("ENVIRONMENT", "development") 

156 if environment == "development" and not authorization: 

157 # Log warning but allow 

158 print("WARNING: Builder accessed without auth in development mode") 

159 return 

160 

161 # In production, require authentication 

162 if not authorization: 162 ↛ 170line 162 didn't jump to line 170 because the condition on line 162 was always true

163 raise HTTPException( 

164 status_code=status.HTTP_401_UNAUTHORIZED, 

165 detail="Authentication required. Provide Bearer token in Authorization header.", 

166 headers={"WWW-Authenticate": "Bearer"}, 

167 ) 

168 

169 # Validate bearer token 

170 if not authorization.startswith("Bearer "): 

171 raise HTTPException( 

172 status_code=status.HTTP_401_UNAUTHORIZED, 

173 detail="Invalid authorization header format. Expected: Bearer <token>", 

174 ) 

175 

176 token = authorization[7:] # Remove "Bearer " prefix 

177 

178 # TODO: Validate token against your auth system 

179 # For now, we check against environment variable 

180 expected_token = os.getenv("BUILDER_AUTH_TOKEN") 

181 if expected_token and token != expected_token: 

182 raise HTTPException( 

183 status_code=status.HTTP_401_UNAUTHORIZED, 

184 detail="Invalid authentication token", 

185 ) 

186 

187 

188# ============================================================================== 

189# FastAPI Application 

190# ============================================================================== 

191 

192 

193@asynccontextmanager 

194async def lifespan(app: FastAPI) -> AsyncIterator[None]: 

195 """ 

196 Builder service lifecycle with observability. 

197 

198 Initializes OpenTelemetry tracing and metrics on startup, 

199 gracefully shuts down exporters on termination. 

200 """ 

201 # STARTUP 

202 if not is_initialized(): 202 ↛ 203line 202 didn't jump to line 203 because the condition on line 202 was never true

203 try: 

204 from mcp_server_langgraph.core.config import Settings 

205 

206 settings = Settings() 

207 init_observability( 

208 settings=settings, 

209 enable_file_logging=False, 

210 ) 

211 logger.info("Builder service started with observability") 

212 except Exception as e: 

213 # Don't fail startup if observability init fails 

214 print(f"WARNING: Observability initialization failed: {e}") 

215 

216 yield # Application runs here 

217 

218 # SHUTDOWN 

219 logger.info("Builder service shutting down") 

220 shutdown_observability() 

221 

222 

223app = FastAPI( 

224 title="MCP Server Visual Workflow Builder", 

225 description="Visual editor for LangGraph agent workflows with code export", 

226 version="1.0.0", 

227 lifespan=lifespan, 

228) 

229 

230# CORS for frontend 

231app.add_middleware( 

232 CORSMiddleware, 

233 allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers 

234 allow_credentials=True, 

235 allow_methods=["*"], 

236 allow_headers=["*"], 

237) 

238 

239 

240# ============================================================================== 

241# Endpoints 

242# ============================================================================== 

243 

244 

245@app.get("/") 

246def root() -> dict[str, Any]: 

247 """API information.""" 

248 return { 

249 "name": "Visual Workflow Builder API", 

250 "version": "1.0.0", 

251 "features": [ 

252 "Generate Python code from visual workflows", 

253 "Validate workflow structure", 

254 "Export production-ready code", 

255 "Template library", 

256 ], 

257 "unique_feature": "Code export (OpenAI AgentKit doesn't have this!)", 

258 } 

259 

260 

261@app.get("/api/builder/health") 

262def health_check() -> dict[str, str]: 

263 """ 

264 Health check endpoint for Kubernetes probes. 

265 

266 This endpoint is publicly accessible (no authentication required) 

267 to support liveness and readiness probes. 

268 

269 Returns: 

270 Health status 

271 """ 

272 return {"status": "healthy"} 

273 

274 

275@app.post("/api/builder/generate") 

276async def generate_code( 

277 request: GenerateCodeRequest, 

278 _auth: None = Depends(verify_builder_auth), 

279) -> GenerateCodeResponse: 

280 """ 

281 Generate Python code from visual workflow. 

282 

283 This is our unique feature! OpenAI AgentKit can't export code. 

284 

285 Args: 

286 request: Workflow definition 

287 

288 Returns: 

289 Generated Python code 

290 

291 Example: 

292 POST /api/builder/generate 

293 { 

294 "workflow": { 

295 "name": "my_agent", 

296 "nodes": [...], 

297 "edges": [...] 

298 } 

299 } 

300 """ 

301 with tracer.start_as_current_span( 

302 "builder.generate_code", 

303 attributes={ 

304 "workflow.name": request.workflow.get("name", "unknown"), 

305 "workflow.node_count": len(request.workflow.get("nodes", [])), 

306 }, 

307 ): 

308 try: 

309 # Parse workflow 

310 workflow = WorkflowDefinition(**request.workflow) 

311 

312 # Generate code 

313 generator = CodeGenerator() 

314 code = generator.generate(workflow) 

315 

316 # Check for warnings 

317 warnings = [] 

318 if not workflow.description: 

319 warnings.append("Workflow description is empty - consider adding one") 

320 

321 if len(workflow.nodes) < 2: 

322 warnings.append("Workflow has only one node - consider adding more steps") 

323 

324 logger.info( 

325 "Code generated successfully", 

326 extra={ 

327 "workflow_name": workflow.name, 

328 "node_count": len(workflow.nodes), 

329 "code_length": len(code), 

330 }, 

331 ) 

332 

333 return GenerateCodeResponse(code=code, formatted=True, warnings=warnings) 

334 

335 except Exception as e: 

336 logger.warning(f"Code generation failed: {e!s}") 

337 raise HTTPException(status_code=400, detail=f"Code generation failed: {e!s}") 

338 

339 

340@app.post("/api/builder/validate") 

341async def validate_workflow(request: ValidateWorkflowRequest) -> ValidateWorkflowResponse: 

342 """ 

343 Validate workflow structure. 

344 

345 Checks for: 

346 - Valid node IDs 

347 - Valid edge connections 

348 - Entry point exists 

349 - No circular dependencies (future) 

350 - Unreachable nodes 

351 

352 Args: 

353 request: Workflow to validate 

354 

355 Returns: 

356 Validation results 

357 """ 

358 with tracer.start_as_current_span( 

359 "builder.validate_workflow", 

360 attributes={ 

361 "workflow.name": request.workflow.get("name", "unknown"), 

362 "workflow.node_count": len(request.workflow.get("nodes", [])), 

363 "workflow.edge_count": len(request.workflow.get("edges", [])), 

364 }, 

365 ): 

366 try: 

367 workflow = WorkflowDefinition(**request.workflow) 

368 

369 errors = [] 

370 warnings = [] 

371 

372 # Validate nodes 

373 if not workflow.nodes: 

374 errors.append("Workflow must have at least one node") 

375 

376 # Validate entry point 

377 node_ids = {n.id for n in workflow.nodes} 

378 if workflow.entry_point not in node_ids: 

379 errors.append(f"Entry point '{workflow.entry_point}' not found in nodes") 

380 

381 # Validate edges 

382 for edge in workflow.edges: 

383 if edge.from_node not in node_ids: 

384 errors.append(f"Edge source '{edge.from_node}' not found") 

385 if edge.to_node not in node_ids: 

386 errors.append(f"Edge target '{edge.to_node}' not found") 

387 

388 # Check for unreachable nodes 

389 reachable = {workflow.entry_point} 

390 queue = [workflow.entry_point] 

391 

392 while queue: 

393 current = queue.pop(0) 

394 for edge in workflow.edges: 

395 if edge.from_node == current and edge.to_node not in reachable: 

396 reachable.add(edge.to_node) 

397 queue.append(edge.to_node) 

398 

399 unreachable = node_ids - reachable 

400 if unreachable: 

401 warnings.append(f"Unreachable nodes: {', '.join(unreachable)}") 

402 

403 # Check for nodes with no outgoing edges (potential dead ends) 

404 nodes_with_outgoing = {e.from_node for e in workflow.edges} 

405 terminal_nodes = node_ids - nodes_with_outgoing 

406 

407 if len(terminal_nodes) > 3: 

408 warnings.append(f"Many terminal nodes ({len(terminal_nodes)}): {', '.join(list(terminal_nodes)[:3])}...") 

409 

410 is_valid = len(errors) == 0 

411 logger.info( 

412 "Workflow validation complete", 

413 extra={ 

414 "workflow_name": workflow.name, 

415 "valid": is_valid, 

416 "error_count": len(errors), 

417 "warning_count": len(warnings), 

418 }, 

419 ) 

420 

421 return ValidateWorkflowResponse(valid=is_valid, errors=errors, warnings=warnings) 

422 

423 except Exception as e: 

424 logger.warning(f"Workflow validation failed: {e!s}") 

425 return ValidateWorkflowResponse(valid=False, errors=[f"Validation error: {e!s}"]) 

426 

427 

428@app.post("/api/builder/save") 

429async def save_workflow( 

430 request: SaveWorkflowRequest, 

431 _auth: None = Depends(verify_builder_auth), 

432) -> dict[str, Any]: 

433 """ 

434 Save workflow to Python file. 

435 

436 Args: 

437 request: Workflow and output path 

438 

439 Returns: 

440 Success message 

441 

442 Example: 

443 POST /api/builder/save 

444 { 

445 "workflow": {...}, 

446 "output_path": "src/agents/my_agent.py" 

447 } 

448 """ 

449 try: 

450 # Parse and generate 

451 workflow = WorkflowDefinition(**request.workflow) 

452 generator = CodeGenerator() 

453 

454 # SECURITY: Ensure directory exists and is writable (but path is already validated) 

455 output_path = Path(request.output_path) 

456 output_path.parent.mkdir(parents=True, exist_ok=True) 

457 

458 # Generate and save 

459 generator.generate_to_file(workflow, str(output_path)) 

460 

461 return {"success": True, "message": f"Workflow saved to {request.output_path}", "path": request.output_path} 

462 

463 except ValueError as e: 

464 # Path validation errors 

465 raise HTTPException(status_code=400, detail=str(e)) 

466 except Exception as e: 

467 raise HTTPException(status_code=500, detail=f"Save failed: {e!s}") 

468 

469 

470@app.get("/api/builder/templates") 

471async def list_templates() -> dict[str, Any]: 

472 """ 

473 List available workflow templates. 

474 

475 Returns: 

476 List of templates 

477 

478 Example: 

479 GET /api/builder/templates 

480 """ 

481 templates = [ 

482 { 

483 "id": "simple_agent", 

484 "name": "Simple Agent", 

485 "description": "Basic single-node agent", 

486 "complexity": "beginner", 

487 "nodes": 1, 

488 }, 

489 { 

490 "id": "research_agent", 

491 "name": "Research Agent", 

492 "description": "Search and summarize workflow", 

493 "complexity": "intermediate", 

494 "nodes": 3, 

495 }, 

496 { 

497 "id": "customer_support", 

498 "name": "Customer Support", 

499 "description": "Multi-path support agent with escalation", 

500 "complexity": "advanced", 

501 "nodes": 6, 

502 }, 

503 { 

504 "id": "multi_agent_supervisor", 

505 "name": "Multi-Agent Supervisor", 

506 "description": "Supervisor coordinating multiple specialists", 

507 "complexity": "advanced", 

508 "nodes": 5, 

509 }, 

510 ] 

511 

512 return {"templates": templates} 

513 

514 

515@app.get("/api/builder/templates/{template_id}") 

516async def get_template(template_id: str) -> dict[str, Any]: 

517 """ 

518 Get a specific workflow template. 

519 

520 Args: 

521 template_id: Template identifier 

522 

523 Returns: 

524 Template definition 

525 

526 Example: 

527 GET /api/builder/templates/research_agent 

528 """ 

529 # Simple research agent template 

530 if template_id == "research_agent": 

531 builder = WorkflowBuilder("research_agent", "Research and summarize topics") 

532 

533 builder.add_state_field("query", "str") 

534 builder.add_state_field("search_results", "List[str]") 

535 builder.add_state_field("summary", "str") 

536 

537 builder.add_node("search", "tool", {"tool": "web_search"}, "Web Search") 

538 builder.add_node("filter", "custom", {}, "Filter Results") 

539 builder.add_node("summarize", "llm", {"model": "gemini-flash"}, "Summarize") 

540 

541 builder.add_edge("search", "filter") 

542 builder.add_edge("filter", "summarize") 

543 

544 return {"template": builder.to_json()} 

545 

546 raise HTTPException(status_code=404, detail=f"Template '{template_id}' not found") 

547 

548 

549@app.post("/api/builder/import") 

550async def import_workflow( 

551 request: ImportWorkflowRequest, 

552 _auth: None = Depends(verify_builder_auth), 

553) -> dict[str, Any]: 

554 """ 

555 Import Python code into visual workflow. 

556 

557 Round-trip capability: Code → Visual (import) + Visual → Code (export) 

558 

559 Args: 

560 request: Import request containing code and layout 

561 

562 Returns: 

563 Workflow definition ready for visual builder 

564 

565 Example: 

566 POST /api/builder/import 

567 { 

568 "code": "from langgraph.graph import StateGraph\\n...", 

569 "layout": "hierarchical" 

570 } 

571 """ 

572 try: 

573 from ..importer import import_from_code, validate_import 

574 

575 # Import code 

576 workflow = import_from_code(request.code, layout_algorithm=request.layout) 

577 

578 # Validate 

579 validation = validate_import(workflow) 

580 

581 return { 

582 "workflow": workflow, 

583 "validation": validation, 

584 "message": "Code imported successfully", 

585 } 

586 

587 except SyntaxError as e: 

588 raise HTTPException(status_code=400, detail=f"Invalid Python syntax: {e!s}") 

589 except Exception as e: 

590 raise HTTPException(status_code=500, detail=f"Import failed: {e!s}") 

591 

592 

593@app.get("/api/builder/node-types") 

594async def list_node_types() -> dict[str, Any]: 

595 """ 

596 List available node types for the builder. 

597 

598 Returns: 

599 Node type definitions with configuration schemas 

600 """ 

601 node_types = [ 

602 { 

603 "type": "tool", 

604 "name": "Tool", 

605 "description": "Execute a tool or function", 

606 "icon": "tool", 

607 "config_schema": {"tool": {"type": "string", "required": True, "description": "Tool name"}}, 

608 }, 

609 { 

610 "type": "llm", 

611 "name": "LLM", 

612 "description": "Call language model", 

613 "icon": "brain", 

614 "config_schema": { 

615 "model": {"type": "string", "required": True, "description": "Model name"}, 

616 "temperature": {"type": "number", "required": False, "default": 0.7}, 

617 }, 

618 }, 

619 { 

620 "type": "conditional", 

621 "name": "Conditional", 

622 "description": "Conditional routing based on state", 

623 "icon": "split", 

624 "config_schema": {}, 

625 }, 

626 { 

627 "type": "approval", 

628 "name": "Approval", 

629 "description": "Human approval checkpoint", 

630 "icon": "user-check", 

631 "config_schema": {"risk_level": {"type": "string", "enum": ["low", "medium", "high", "critical"]}}, 

632 }, 

633 { 

634 "type": "custom", 

635 "name": "Custom", 

636 "description": "Custom Python function", 

637 "icon": "code", 

638 "config_schema": {}, 

639 }, 

640 ] 

641 

642 return {"node_types": node_types} 

643 

644 

645# ============================================================================== 

646# SPA Static Files Mount (React Frontend) 

647# ============================================================================== 

648# Mount AFTER all API routes - SPAStaticFiles is a catch-all for client-side routing 

649 

650from mcp_server_langgraph.utils.spa_static_files import create_spa_static_files 

651 

652# Calculate frontend dist path relative to this module 

653_frontend_dist = Path(__file__).parent.parent / "frontend" / "dist" 

654 

655# Only mount if frontend is built (graceful degradation for API-only mode) 

656_spa_handler = create_spa_static_files(str(_frontend_dist), caching=True) 

657if _spa_handler is not None: 

658 app.mount("/", _spa_handler, name="spa") 

659 

660 

661# ============================================================================== 

662# Run Server 

663# ============================================================================== 

664 

665if __name__ == "__main__": 

666 import uvicorn 

667 

668 print("=" * 80) 

669 print("🎨 Visual Workflow Builder API") 

670 print("=" * 80) 

671 print("\nStarting server...") 

672 print("\n📍 Endpoints:") 

673 print(" • Info: http://localhost:8001/") 

674 print(" • Generate: POST http://localhost:8001/api/builder/generate") 

675 print(" • Validate: POST http://localhost:8001/api/builder/validate") 

676 print(" • Templates: GET http://localhost:8001/api/builder/templates") 

677 print(" • Docs: http://localhost:8001/docs") 

678 print("\n🌟 Unique Feature: Code Export (OpenAI AgentKit doesn't have this!)") 

679 print("=" * 80) 

680 print() 

681 

682 uvicorn.run(app, host="0.0.0.0", port=8001, reload=True) # nosec B104