Coverage for src / mcp_server_langgraph / builder / api / server.py: 89%

160 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Visual Workflow Builder API 

3 

4FastAPI backend for the visual workflow builder. 

5 

6Endpoints: 

7- POST /api/builder/generate - Generate Python code from workflow 

8- POST /api/builder/validate - Validate workflow structure 

9- GET /api/builder/templates - List workflow templates 

10- POST /api/builder/save - Save workflow to file 

11 

12Example: 

13 uvicorn mcp_server_langgraph.builder.api.server:app --reload 

14""" 

15 

16import os 

17import tempfile 

18from pathlib import Path 

19from typing import Any, Literal 

20 

21from fastapi import Depends, FastAPI, Header, HTTPException, status 

22from fastapi.middleware.cors import CORSMiddleware 

23from pydantic import BaseModel, field_validator 

24 

25from ..codegen import CodeGenerator, WorkflowDefinition 

26from ..workflow import WorkflowBuilder 

27 

28# ============================================================================== 

29# API Models 

30# ============================================================================== 

31 

32 

33class GenerateCodeRequest(BaseModel): 

34 """Request to generate code from workflow.""" 

35 

36 workflow: dict[str, Any] 

37 

38 

39class GenerateCodeResponse(BaseModel): 

40 """Response with generated code.""" 

41 

42 code: str 

43 formatted: bool 

44 warnings: list[str] = [] 

45 

46 

47class ValidateWorkflowRequest(BaseModel): 

48 """Request to validate workflow.""" 

49 

50 workflow: dict[str, Any] 

51 

52 

53class ValidateWorkflowResponse(BaseModel): 

54 """Response with validation results.""" 

55 

56 valid: bool 

57 errors: list[str] = [] 

58 warnings: list[str] = [] 

59 

60 

61class SaveWorkflowRequest(BaseModel): 

62 """Request to save workflow to file.""" 

63 

64 workflow: dict[str, Any] 

65 output_path: str 

66 

67 @field_validator("output_path") 

68 @classmethod 

69 def validate_output_path(cls, v: str) -> str: 

70 """ 

71 Validate output path for security. 

72 

73 SECURITY: Prevents CWE-73 (External Control of File Name or Path) and 

74 CWE-434 (Unrestricted Upload of File with Dangerous Type) by enforcing 

75 strict path validation. 

76 

77 Only allows paths within designated safe directories. 

78 """ 

79 # Convert to Path for normalization 

80 path = Path(v).resolve() 

81 

82 # Define allowed base directories (configurable via environment) 

83 # Use application-specific temp directory instead of hardcoded /tmp for better security 

84 default_dir = Path(tempfile.gettempdir()) / "mcp-server-workflows" 

85 allowed_base = os.getenv("BUILDER_OUTPUT_DIR", str(default_dir)) 

86 allowed_base_path = Path(allowed_base).resolve() 

87 

88 # Ensure the allowed base directory exists with secure permissions 

89 allowed_base_path.mkdir(mode=0o700, parents=True, exist_ok=True) 

90 

91 # Check if path is within allowed directory 

92 try: 

93 path.relative_to(allowed_base_path) 

94 except ValueError: 

95 msg = ( 

96 f"Invalid output path. Must be within allowed directory: {allowed_base}. " 

97 f"Use BUILDER_OUTPUT_DIR environment variable to configure safe directory." 

98 ) 

99 raise ValueError(msg) 

100 

101 # Additional checks for common attack patterns 

102 path_str = str(path) 

103 if ".." in path_str or path_str.startswith("/etc/") or path_str.startswith("/sys/"): 103 ↛ 104line 103 didn't jump to line 104 because the condition on line 103 was never true

104 msg = "Path traversal detected. Invalid output path." 

105 raise ValueError(msg) 

106 

107 # Ensure .py extension 

108 if path.suffix != ".py": 108 ↛ 109line 108 didn't jump to line 109 because the condition on line 108 was never true

109 msg = "Output path must have .py extension" 

110 raise ValueError(msg) 

111 

112 return str(path) 

113 

114 

115class ImportWorkflowRequest(BaseModel): 

116 """Request to import Python code into workflow.""" 

117 

118 code: str 

119 layout: Literal["hierarchical", "force", "grid"] = "hierarchical" 

120 

121 

122# ============================================================================== 

123# Security & Authentication 

124# ============================================================================== 

125 

126 

127def verify_builder_auth(authorization: str = Header(None)) -> None: 

128 """ 

129 Verify authentication for builder endpoints. 

130 

131 SECURITY: Prevents unauthenticated access to code generation and file write endpoints. 

132 Addresses OWASP A01:2021 - Broken Access Control. 

133 

134 In production, this should integrate with your main authentication system. 

135 For now, we use a simple bearer token approach. 

136 

137 Args: 

138 authorization: Authorization header (Bearer token) 

139 

140 Raises: 

141 HTTPException: 401 if not authenticated 

142 

143 TODO: Integrate with main auth system (Keycloak, etc.) 

144 """ 

145 # Allow unauthenticated access in development (local testing) 

146 environment = os.getenv("ENVIRONMENT", "development") 

147 if environment == "development" and not authorization: 147 ↛ 149line 147 didn't jump to line 149 because the condition on line 147 was never true

148 # Log warning but allow 

149 print("WARNING: Builder accessed without auth in development mode") 

150 return 

151 

152 # In production, require authentication 

153 if not authorization: 153 ↛ 161line 153 didn't jump to line 161 because the condition on line 153 was always true

154 raise HTTPException( 

155 status_code=status.HTTP_401_UNAUTHORIZED, 

156 detail="Authentication required. Provide Bearer token in Authorization header.", 

157 headers={"WWW-Authenticate": "Bearer"}, 

158 ) 

159 

160 # Validate bearer token 

161 if not authorization.startswith("Bearer "): 

162 raise HTTPException( 

163 status_code=status.HTTP_401_UNAUTHORIZED, 

164 detail="Invalid authorization header format. Expected: Bearer <token>", 

165 ) 

166 

167 token = authorization[7:] # Remove "Bearer " prefix 

168 

169 # TODO: Validate token against your auth system 

170 # For now, we check against environment variable 

171 expected_token = os.getenv("BUILDER_AUTH_TOKEN") 

172 if expected_token and token != expected_token: 

173 raise HTTPException( 

174 status_code=status.HTTP_401_UNAUTHORIZED, 

175 detail="Invalid authentication token", 

176 ) 

177 

178 

179# ============================================================================== 

180# FastAPI Application 

181# ============================================================================== 

182 

183app = FastAPI( 

184 title="MCP Server Visual Workflow Builder", 

185 description="Visual editor for LangGraph agent workflows with code export", 

186 version="1.0.0", 

187) 

188 

189# CORS for frontend 

190app.add_middleware( 

191 CORSMiddleware, 

192 allow_origins=["http://localhost:3000", "http://localhost:5173"], # React dev servers 

193 allow_credentials=True, 

194 allow_methods=["*"], 

195 allow_headers=["*"], 

196) 

197 

198 

199# ============================================================================== 

200# Endpoints 

201# ============================================================================== 

202 

203 

204@app.get("/") 

205def root() -> dict[str, Any]: 

206 """API information.""" 

207 return { 

208 "name": "Visual Workflow Builder API", 

209 "version": "1.0.0", 

210 "features": [ 

211 "Generate Python code from visual workflows", 

212 "Validate workflow structure", 

213 "Export production-ready code", 

214 "Template library", 

215 ], 

216 "unique_feature": "Code export (OpenAI AgentKit doesn't have this!)", 

217 } 

218 

219 

220@app.post("/api/builder/generate") 

221async def generate_code( 

222 request: GenerateCodeRequest, 

223 _auth: None = Depends(verify_builder_auth), 

224) -> GenerateCodeResponse: 

225 """ 

226 Generate Python code from visual workflow. 

227 

228 This is our unique feature! OpenAI AgentKit can't export code. 

229 

230 Args: 

231 request: Workflow definition 

232 

233 Returns: 

234 Generated Python code 

235 

236 Example: 

237 POST /api/builder/generate 

238 { 

239 "workflow": { 

240 "name": "my_agent", 

241 "nodes": [...], 

242 "edges": [...] 

243 } 

244 } 

245 """ 

246 try: 

247 # Parse workflow 

248 workflow = WorkflowDefinition(**request.workflow) 

249 

250 # Generate code 

251 generator = CodeGenerator() 

252 code = generator.generate(workflow) 

253 

254 # Check for warnings 

255 warnings = [] 

256 if not workflow.description: 

257 warnings.append("Workflow description is empty - consider adding one") 

258 

259 if len(workflow.nodes) < 2: 

260 warnings.append("Workflow has only one node - consider adding more steps") 

261 

262 return GenerateCodeResponse(code=code, formatted=True, warnings=warnings) 

263 

264 except Exception as e: 

265 raise HTTPException(status_code=400, detail=f"Code generation failed: {e!s}") 

266 

267 

268@app.post("/api/builder/validate") 

269async def validate_workflow(request: ValidateWorkflowRequest) -> ValidateWorkflowResponse: 

270 """ 

271 Validate workflow structure. 

272 

273 Checks for: 

274 - Valid node IDs 

275 - Valid edge connections 

276 - Entry point exists 

277 - No circular dependencies (future) 

278 - Unreachable nodes 

279 

280 Args: 

281 request: Workflow to validate 

282 

283 Returns: 

284 Validation results 

285 """ 

286 try: 

287 workflow = WorkflowDefinition(**request.workflow) 

288 

289 errors = [] 

290 warnings = [] 

291 

292 # Validate nodes 

293 if not workflow.nodes: 

294 errors.append("Workflow must have at least one node") 

295 

296 # Validate entry point 

297 node_ids = {n.id for n in workflow.nodes} 

298 if workflow.entry_point not in node_ids: 

299 errors.append(f"Entry point '{workflow.entry_point}' not found in nodes") 

300 

301 # Validate edges 

302 for edge in workflow.edges: 

303 if edge.from_node not in node_ids: 

304 errors.append(f"Edge source '{edge.from_node}' not found") 

305 if edge.to_node not in node_ids: 

306 errors.append(f"Edge target '{edge.to_node}' not found") 

307 

308 # Check for unreachable nodes 

309 reachable = {workflow.entry_point} 

310 queue = [workflow.entry_point] 

311 

312 while queue: 

313 current = queue.pop(0) 

314 for edge in workflow.edges: 

315 if edge.from_node == current and edge.to_node not in reachable: 

316 reachable.add(edge.to_node) 

317 queue.append(edge.to_node) 

318 

319 unreachable = node_ids - reachable 

320 if unreachable: 

321 warnings.append(f"Unreachable nodes: {', '.join(unreachable)}") 

322 

323 # Check for nodes with no outgoing edges (potential dead ends) 

324 nodes_with_outgoing = {e.from_node for e in workflow.edges} 

325 terminal_nodes = node_ids - nodes_with_outgoing 

326 

327 if len(terminal_nodes) > 3: 

328 warnings.append(f"Many terminal nodes ({len(terminal_nodes)}): {', '.join(list(terminal_nodes)[:3])}...") 

329 

330 return ValidateWorkflowResponse(valid=len(errors) == 0, errors=errors, warnings=warnings) 

331 

332 except Exception as e: 

333 return ValidateWorkflowResponse(valid=False, errors=[f"Validation error: {e!s}"]) 

334 

335 

336@app.post("/api/builder/save") 

337async def save_workflow( 

338 request: SaveWorkflowRequest, 

339 _auth: None = Depends(verify_builder_auth), 

340) -> dict[str, Any]: 

341 """ 

342 Save workflow to Python file. 

343 

344 Args: 

345 request: Workflow and output path 

346 

347 Returns: 

348 Success message 

349 

350 Example: 

351 POST /api/builder/save 

352 { 

353 "workflow": {...}, 

354 "output_path": "src/agents/my_agent.py" 

355 } 

356 """ 

357 try: 

358 # Parse and generate 

359 workflow = WorkflowDefinition(**request.workflow) 

360 generator = CodeGenerator() 

361 

362 # SECURITY: Ensure directory exists and is writable (but path is already validated) 

363 output_path = Path(request.output_path) 

364 output_path.parent.mkdir(parents=True, exist_ok=True) 

365 

366 # Generate and save 

367 generator.generate_to_file(workflow, str(output_path)) 

368 

369 return {"success": True, "message": f"Workflow saved to {request.output_path}", "path": request.output_path} 

370 

371 except ValueError as e: 

372 # Path validation errors 

373 raise HTTPException(status_code=400, detail=str(e)) 

374 except Exception as e: 

375 raise HTTPException(status_code=500, detail=f"Save failed: {e!s}") 

376 

377 

378@app.get("/api/builder/templates") 

379async def list_templates() -> dict[str, Any]: 

380 """ 

381 List available workflow templates. 

382 

383 Returns: 

384 List of templates 

385 

386 Example: 

387 GET /api/builder/templates 

388 """ 

389 templates = [ 

390 { 

391 "id": "simple_agent", 

392 "name": "Simple Agent", 

393 "description": "Basic single-node agent", 

394 "complexity": "beginner", 

395 "nodes": 1, 

396 }, 

397 { 

398 "id": "research_agent", 

399 "name": "Research Agent", 

400 "description": "Search and summarize workflow", 

401 "complexity": "intermediate", 

402 "nodes": 3, 

403 }, 

404 { 

405 "id": "customer_support", 

406 "name": "Customer Support", 

407 "description": "Multi-path support agent with escalation", 

408 "complexity": "advanced", 

409 "nodes": 6, 

410 }, 

411 { 

412 "id": "multi_agent_supervisor", 

413 "name": "Multi-Agent Supervisor", 

414 "description": "Supervisor coordinating multiple specialists", 

415 "complexity": "advanced", 

416 "nodes": 5, 

417 }, 

418 ] 

419 

420 return {"templates": templates} 

421 

422 

423@app.get("/api/builder/templates/{template_id}") 

424async def get_template(template_id: str) -> dict[str, Any]: 

425 """ 

426 Get a specific workflow template. 

427 

428 Args: 

429 template_id: Template identifier 

430 

431 Returns: 

432 Template definition 

433 

434 Example: 

435 GET /api/builder/templates/research_agent 

436 """ 

437 # Simple research agent template 

438 if template_id == "research_agent": 

439 builder = WorkflowBuilder("research_agent", "Research and summarize topics") 

440 

441 builder.add_state_field("query", "str") 

442 builder.add_state_field("search_results", "List[str]") 

443 builder.add_state_field("summary", "str") 

444 

445 builder.add_node("search", "tool", {"tool": "web_search"}, "Web Search") 

446 builder.add_node("filter", "custom", {}, "Filter Results") 

447 builder.add_node("summarize", "llm", {"model": "gemini-flash"}, "Summarize") 

448 

449 builder.add_edge("search", "filter") 

450 builder.add_edge("filter", "summarize") 

451 

452 return {"template": builder.to_json()} 

453 

454 raise HTTPException(status_code=404, detail=f"Template '{template_id}' not found") 

455 

456 

457@app.post("/api/builder/import") 

458async def import_workflow( 

459 request: ImportWorkflowRequest, 

460 _auth: None = Depends(verify_builder_auth), 

461) -> dict[str, Any]: 

462 """ 

463 Import Python code into visual workflow. 

464 

465 Round-trip capability: Code → Visual (import) + Visual → Code (export) 

466 

467 Args: 

468 request: Import request containing code and layout 

469 

470 Returns: 

471 Workflow definition ready for visual builder 

472 

473 Example: 

474 POST /api/builder/import 

475 { 

476 "code": "from langgraph.graph import StateGraph\\n...", 

477 "layout": "hierarchical" 

478 } 

479 """ 

480 try: 

481 from ..importer import import_from_code, validate_import 

482 

483 # Import code 

484 workflow = import_from_code(request.code, layout_algorithm=request.layout) 

485 

486 # Validate 

487 validation = validate_import(workflow) 

488 

489 return { 

490 "workflow": workflow, 

491 "validation": validation, 

492 "message": "Code imported successfully", 

493 } 

494 

495 except SyntaxError as e: 

496 raise HTTPException(status_code=400, detail=f"Invalid Python syntax: {e!s}") 

497 except Exception as e: 

498 raise HTTPException(status_code=500, detail=f"Import failed: {e!s}") 

499 

500 

501@app.get("/api/builder/node-types") 

502async def list_node_types() -> dict[str, Any]: 

503 """ 

504 List available node types for the builder. 

505 

506 Returns: 

507 Node type definitions with configuration schemas 

508 """ 

509 node_types = [ 

510 { 

511 "type": "tool", 

512 "name": "Tool", 

513 "description": "Execute a tool or function", 

514 "icon": "tool", 

515 "config_schema": {"tool": {"type": "string", "required": True, "description": "Tool name"}}, 

516 }, 

517 { 

518 "type": "llm", 

519 "name": "LLM", 

520 "description": "Call language model", 

521 "icon": "brain", 

522 "config_schema": { 

523 "model": {"type": "string", "required": True, "description": "Model name"}, 

524 "temperature": {"type": "number", "required": False, "default": 0.7}, 

525 }, 

526 }, 

527 { 

528 "type": "conditional", 

529 "name": "Conditional", 

530 "description": "Conditional routing based on state", 

531 "icon": "split", 

532 "config_schema": {}, 

533 }, 

534 { 

535 "type": "approval", 

536 "name": "Approval", 

537 "description": "Human approval checkpoint", 

538 "icon": "user-check", 

539 "config_schema": {"risk_level": {"type": "string", "enum": ["low", "medium", "high", "critical"]}}, 

540 }, 

541 { 

542 "type": "custom", 

543 "name": "Custom", 

544 "description": "Custom Python function", 

545 "icon": "code", 

546 "config_schema": {}, 

547 }, 

548 ] 

549 

550 return {"node_types": node_types} 

551 

552 

553# ============================================================================== 

554# Run Server 

555# ============================================================================== 

556 

557if __name__ == "__main__": 

558 import uvicorn 

559 

560 print("=" * 80) 

561 print("🎨 Visual Workflow Builder API") 

562 print("=" * 80) 

563 print("\nStarting server...") 

564 print("\n📍 Endpoints:") 

565 print(" • Info: http://localhost:8001/") 

566 print(" • Generate: POST http://localhost:8001/api/builder/generate") 

567 print(" • Validate: POST http://localhost:8001/api/builder/validate") 

568 print(" • Templates: GET http://localhost:8001/api/builder/templates") 

569 print(" • Docs: http://localhost:8001/docs") 

570 print("\n🌟 Unique Feature: Code Export (OpenAI AgentKit doesn't have this!)") 

571 print("=" * 80) 

572 print() 

573 

574 uvicorn.run(app, host="0.0.0.0", port=8001, reload=True) # nosec B104