Coverage for src / mcp_server_langgraph / builder / codegen / generator.py: 91%

166 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Code Generator for Visual Workflow Builder 

3 

4Unique Feature: Export visual workflows to production-ready Python code. 

5 

6This is what differentiates us from OpenAI AgentKit - they have visual builder 

7but NO code export! We provide: 

8- Full Python code generation 

9- Production-ready patterns 

10- Type-safe code with Pydantic 

11- Black-formatted output 

12- Import/export round-trip capability 

13 

14Example: 

15 from mcp_server_langgraph.builder.codegen import CodeGenerator, WorkflowDefinition 

16 

17 # Define workflow 

18 workflow = WorkflowDefinition( 

19 name="research_agent", 

20 nodes=[ 

21 {"id": "search", "type": "tool", "config": {"tool": "web_search"}}, 

22 {"id": "summarize", "type": "llm", "config": {"model": "gemini-flash"}} 

23 ], 

24 edges=[{"from": "search", "to": "summarize"}] 

25 ) 

26 

27 # Generate Python code 

28 generator = CodeGenerator() 

29 python_code = generator.generate(workflow) 

30 

31 # Result: Production-ready Python code 

32 print(python_code) 

33""" 

34 

35import os 

36import tempfile 

37from pathlib import Path 

38from typing import Any 

39 

40from pydantic import BaseModel, ConfigDict, Field 

41 

42 

43def _validate_output_path(output_path: str) -> Path: 

44 """ 

45 Validate output path for security (defense-in-depth). 

46 

47 Security: Prevents path injection attacks (CWE-73) by: 

48 1. Resolving to absolute path (prevents path traversal via ..) 

49 2. Validating against allowed directories (temp dir or BUILDER_OUTPUT_DIR) 

50 3. Blocking system directories 

51 4. Requiring .py extension 

52 

53 This validation runs even when called programmatically (not just via API), 

54 providing defense-in-depth against path injection. 

55 

56 Args: 

57 output_path: The output file path to validate 

58 

59 Returns: 

60 Validated and resolved Path object 

61 

62 Raises: 

63 ValueError: If path fails validation 

64 """ 

65 # nosemgrep: python.lang.security.audit.path-traversal.path-traversal-open 

66 # Security: Path is validated against allowlist below (temp dir, custom allowed dir) 

67 path = Path(output_path).resolve() 

68 path_str = str(path) 

69 

70 # System directories are NEVER allowed (check first for fail-fast) 

71 forbidden_prefixes = ("/etc/", "/sys/", "/proc/", "/dev/", "/var/log/", "/root/") 

72 if any(path_str.startswith(prefix) for prefix in forbidden_prefixes): 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true

73 msg = "Output path cannot target system directories" 

74 raise ValueError(msg) 

75 

76 # Ensure .py extension 

77 if path.suffix != ".py": 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true

78 msg = "Output path must have .py extension" 

79 raise ValueError(msg) 

80 

81 # Get allowed directories 

82 temp_dir = Path(tempfile.gettempdir()).resolve() 

83 custom_dir = os.getenv("BUILDER_OUTPUT_DIR") 

84 

85 # Build list of allowed base directories 

86 allowed_dirs = [temp_dir] 

87 if custom_dir: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true

88 allowed_dirs.append(Path(custom_dir).resolve()) 

89 

90 # Check if path is within any allowed directory 

91 is_allowed = False 

92 for allowed_base in allowed_dirs: 

93 try: 

94 path.relative_to(allowed_base) 

95 is_allowed = True 

96 break 

97 except ValueError: 

98 continue 

99 

100 if not is_allowed: 

101 allowed_str = ", ".join(str(d) for d in allowed_dirs) 

102 msg = ( 

103 f"Invalid output path: must be within allowed directories ({allowed_str}). " 

104 f"Set BUILDER_OUTPUT_DIR environment variable to add custom directory." 

105 ) 

106 raise ValueError(msg) 

107 

108 # Additional check for path traversal (defense-in-depth after resolution) 

109 # Note: resolve() already handles .., but this catches edge cases 

110 if ".." in output_path: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 msg = "Path traversal detected: '..' not allowed in output path" 

112 raise ValueError(msg) 

113 

114 return path 

115 

116 

117def _format_with_ruff(code: str) -> str: 

118 """ 

119 Format code using ruff format. 

120 

121 Ruff is the project's standardized formatter (replaces black). 

122 Falls back to unformatted code if ruff is not available or fails. 

123 """ 

124 import shutil 

125 import subprocess 

126 

127 if not shutil.which("ruff"): 127 ↛ 129line 127 didn't jump to line 129 because the condition on line 127 was never true

128 # Ruff not available, return unformatted 

129 return code 

130 

131 try: 

132 result = subprocess.run( 

133 ["ruff", "format", "--stdin-filename", "generated.py", "-"], 

134 input=code, 

135 capture_output=True, 

136 text=True, 

137 timeout=10, 

138 ) 

139 if result.returncode == 0: 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true

140 return result.stdout 

141 # Formatting failed, return original 

142 return code 

143 except Exception: 

144 # Any error, return unformatted 

145 return code 

146 

147 

148class NodeDefinition(BaseModel): 

149 """Definition of a workflow node.""" 

150 

151 id: str = Field(description="Unique node ID") 

152 type: str = Field(description="Node type: tool, llm, conditional, approval, custom") 

153 label: str = Field(default="", description="Display label") 

154 config: dict[str, Any] = Field(default_factory=dict, description="Node configuration") 

155 position: dict[str, float] = Field(default_factory=dict, description="Canvas position {x, y}") 

156 

157 

158class EdgeDefinition(BaseModel): 

159 """Definition of a workflow edge.""" 

160 

161 from_node: str = Field(description="Source node ID", alias="from") 

162 to_node: str = Field(description="Target node ID", alias="to") 

163 condition: str | None = Field(default=None, description="Optional condition for edge") 

164 label: str = Field(default="", description="Edge label") 

165 

166 model_config = ConfigDict(populate_by_name=True) 

167 

168 

169class WorkflowDefinition(BaseModel): 

170 """Complete workflow definition from visual builder.""" 

171 

172 name: str = Field(description="Workflow name") 

173 description: str = Field(default="", description="Workflow description") 

174 nodes: list[NodeDefinition] = Field(description="List of nodes") 

175 edges: list[EdgeDefinition] = Field(description="List of edges") 

176 entry_point: str = Field(description="Entry node ID") 

177 state_schema: dict[str, str] = Field(default_factory=dict, description="State field definitions") 

178 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata") 

179 

180 

181class CodeGenerator: 

182 """ 

183 Generate production-ready Python code from visual workflows. 

184 

185 This is our unique differentiator vs OpenAI AgentKit! 

186 """ 

187 

188 # Code generation template 

189 AGENT_TEMPLATE = '''""" 

190{description} 

191 

192Auto-generated from Visual Workflow Builder. 

193""" 

194 

195from typing import Any, Dict, List, TypedDict 

196from langgraph.graph import StateGraph 

197from pydantic import BaseModel, Field 

198 

199 

200# ============================================================================== 

201# State Definition 

202# ============================================================================== 

203 

204 

205class {class_name}State(TypedDict): 

206 """State for {workflow_name} workflow.""" 

207{state_fields} 

208 

209 

210# ============================================================================== 

211# Node Functions 

212# ============================================================================== 

213 

214{node_functions} 

215 

216# ============================================================================== 

217# Routing Functions 

218# ============================================================================== 

219 

220{routing_functions} 

221 

222# ============================================================================== 

223# Graph Construction 

224# ============================================================================== 

225 

226 

227def create_{workflow_name}() -> StateGraph: 

228 """ 

229 Create {workflow_name} workflow. 

230 

231 Returns: 

232 Compiled LangGraph application 

233 

234 Example: 

235 >>> agent = create_{workflow_name}() 

236 >>> result = agent.invoke({{"query": "test"}}) 

237 """ 

238 # Create graph 

239 graph = StateGraph({class_name}State) 

240 

241{graph_construction} 

242 

243 return graph 

244 

245 

246# ============================================================================== 

247# Execution 

248# ============================================================================== 

249 

250 

251def run_{workflow_name}(input_data: Dict[str, Any], config: Dict[str, Any] = None): 

252 """ 

253 Execute {workflow_name} workflow. 

254 

255 Args: 

256 input_data: Input state 

257 config: Optional configuration 

258 

259 Returns: 

260 Final state 

261 """ 

262 graph = create_{workflow_name}() 

263 app = graph.compile() 

264 

265 result = app.invoke(input_data, config=config or {{}}) 

266 

267 return result 

268 

269 

270if __name__ == "__main__": 

271 # Test the generated workflow 

272 result = run_{workflow_name}({{"query": "test input"}}) 

273 print(result) 

274''' 

275 

276 def __init__(self) -> None: 

277 """Initialize code generator.""" 

278 self.templates: dict[str, str] = {} 

279 

280 def _generate_state_fields(self, state_schema: dict[str, str]) -> str: 

281 """ 

282 Generate state field definitions. 

283 

284 Args: 

285 state_schema: Dict of {field_name: field_type} 

286 

287 Returns: 

288 Formatted state fields 

289 """ 

290 if not state_schema: 

291 # Default state 

292 return " query: str\n result: str\n metadata: Dict[str, Any]" 

293 

294 fields = [] 

295 for field_name, field_type in state_schema.items(): 

296 fields.append(f" {field_name}: {field_type}") 

297 

298 return "\n".join(fields) 

299 

300 def _generate_node_function(self, node: NodeDefinition) -> str: 

301 """ 

302 Generate node function code. 

303 

304 Args: 

305 node: Node definition 

306 

307 Returns: 

308 Python function code 

309 """ 

310 function_name = f"node_{node.id.replace('-', '_')}" 

311 

312 if node.type == "tool": 

313 # Tool node 

314 tool_name = node.config.get("tool", "unknown_tool") 

315 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]: 

316 """Execute {node.label or node.id} - tool: {tool_name}.""" 

317 # TODO: Implement {tool_name} integration 

318 result = call_tool("{tool_name}", state) 

319 state["result"] = result 

320 return state 

321''' 

322 

323 elif node.type == "llm": 

324 # LLM node 

325 model = node.config.get("model", "gemini-flash") 

326 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]: 

327 """Execute {node.label or node.id} - LLM: {model}.""" 

328 # TODO: Implement LLM call 

329 from litellm import completion 

330 

331 response = completion( 

332 model="{model}", 

333 messages=[{{"role": "user", "content": state["query"]}}] 

334 ) 

335 state["llm_response"] = response.choices[0].message.content 

336 return state 

337''' 

338 

339 elif node.type == "conditional": 

340 # Conditional node 

341 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]: 

342 """Conditional: {node.label or node.id}.""" 

343 # TODO: Implement conditional logic 

344 return state 

345''' 

346 

347 elif node.type == "approval": 

348 # Approval node (human-in-the-loop) 

349 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]: 

350 """Approval checkpoint: {node.label or node.id}.""" 

351 from mcp_server_langgraph.core.interrupts import ApprovalNode 

352 

353 approval = ApprovalNode("{node.id}", description="{node.label}") 

354 return approval(state) 

355''' 

356 

357 else: 

358 # Custom node 

359 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]: 

360 """Custom node: {node.label or node.id}.""" 

361 # TODO: Implement custom logic for {node.id} 

362 return state 

363''' 

364 

365 def _generate_routing_function(self, node: NodeDefinition, edges: list[EdgeDefinition]) -> str | None: 

366 """ 

367 Generate routing function for conditional edges. 

368 

369 Args: 

370 node: Node with conditional outgoing edges 

371 edges: All edges from this node 

372 

373 Returns: 

374 Routing function code or None 

375 """ 

376 outgoing_edges = [e for e in edges if e.from_node == node.id] 

377 

378 if not outgoing_edges or len(outgoing_edges) <= 1: 

379 return None # No routing needed for single edge 

380 

381 # Check if any edges have conditions 

382 conditional_edges = [e for e in outgoing_edges if e.condition] 

383 

384 if not conditional_edges: 

385 return None # No conditions, no routing needed 

386 

387 function_name = f"route_from_{node.id.replace('-', '_')}" 

388 

389 # Generate routing function 

390 code = f'''def {function_name}(state: Dict[str, Any]) -> str: 

391 """Route from {node.label or node.id}.""" 

392''' 

393 

394 for edge in conditional_edges: 

395 condition = edge.condition or "True" 

396 code += f""" if {condition}: 

397 return "{edge.to_node}" 

398""" 

399 

400 # Default route (last edge without condition or first edge) 

401 default_edge = outgoing_edges[0] 

402 code += f""" return "{default_edge.to_node}" 

403""" 

404 

405 return code 

406 

407 def _generate_graph_construction(self, workflow: WorkflowDefinition) -> str: 

408 """ 

409 Generate graph construction code. 

410 

411 Args: 

412 workflow: Workflow definition 

413 

414 Returns: 

415 Graph construction code 

416 """ 

417 lines = [] 

418 

419 # Add nodes 

420 lines.append(" # Add nodes") 

421 for node in workflow.nodes: 

422 function_name = f"node_{node.id.replace('-', '_')}" 

423 lines.append(f' graph.add_node("{node.id}", {function_name})') 

424 

425 # Add edges 

426 lines.append("\n # Add edges") 

427 

428 # Group edges by source node 

429 edges_by_source: dict[str, list[EdgeDefinition]] = {} 

430 for edge in workflow.edges: 

431 if edge.from_node not in edges_by_source: 

432 edges_by_source[edge.from_node] = [] 

433 edges_by_source[edge.from_node].append(edge) 

434 

435 # Generate edges 

436 for source, edges in edges_by_source.items(): 

437 if len(edges) == 1 and not edges[0].condition: 

438 # Simple edge 

439 lines.append(f' graph.add_edge("{source}", "{edges[0].to_node}")') 

440 else: 

441 # Conditional edges 

442 routing_func = f"route_from_{source.replace('-', '_')}" 

443 lines.append(f' graph.add_conditional_edges("{source}", {routing_func})') 

444 

445 # Set entry and exit 

446 lines.append("\n # Set entry point") 

447 lines.append(f' graph.set_entry_point("{workflow.entry_point}")') 

448 

449 # Find terminal nodes (nodes with no outgoing edges) 

450 terminal_nodes = [] 

451 all_sources = {e.from_node for e in workflow.edges} 

452 all_nodes = {n.id for n in workflow.nodes} 

453 terminal_nodes = list(all_nodes - all_sources) 

454 

455 if terminal_nodes: 

456 lines.append("\n # Set finish points") 

457 for terminal in terminal_nodes: 

458 lines.append(f' graph.set_finish_point("{terminal}")') 

459 

460 return "\n".join(lines) 

461 

462 def _sanitize_workflow_name(self, name: str) -> str: 

463 """ 

464 Sanitize workflow name to prevent code injection. 

465 

466 SECURITY: Prevents code injection via malicious workflow names. 

467 Only allows alphanumeric characters and underscores. 

468 

469 Args: 

470 name: Raw workflow name from user input 

471 

472 Returns: 

473 Sanitized workflow name safe for code generation 

474 """ 

475 import re 

476 

477 # Remove all non-alphanumeric characters except underscores 

478 sanitized = re.sub(r"[^a-zA-Z0-9_]", "_", name) 

479 # Ensure it starts with a letter 

480 if sanitized and not sanitized[0].isalpha(): 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true

481 sanitized = "workflow_" + sanitized 

482 # Fallback if completely empty 

483 if not sanitized: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true

484 sanitized = "workflow" 

485 return sanitized 

486 

487 def generate(self, workflow: WorkflowDefinition) -> str: 

488 """ 

489 Generate production-ready Python code from workflow. 

490 

491 Args: 

492 workflow: Workflow definition from visual builder 

493 

494 Returns: 

495 Formatted Python code 

496 

497 Example: 

498 >>> workflow = WorkflowDefinition( 

499 ... name="my_agent", 

500 ... nodes=[...], 

501 ... edges=[...] 

502 ... ) 

503 >>> code = generator.generate(workflow) 

504 >>> print(code) 

505 """ 

506 # SECURITY: Sanitize workflow name to prevent code injection 

507 sanitized_name = self._sanitize_workflow_name(workflow.name) 

508 

509 # Generate components 

510 class_name = "".join(word.capitalize() for word in sanitized_name.split("_")) 

511 state_fields = self._generate_state_fields(workflow.state_schema) 

512 

513 # Generate node functions 

514 node_functions = [] 

515 for node in workflow.nodes: 

516 node_func = self._generate_node_function(node) 

517 node_functions.append(node_func) 

518 

519 node_functions_code = "\n".join(node_functions) 

520 

521 # Generate routing functions 

522 routing_functions = [] 

523 for node in workflow.nodes: 

524 routing_func = self._generate_routing_function(node, workflow.edges) 

525 if routing_func: 

526 routing_functions.append(routing_func) 

527 

528 routing_functions_code = "\n".join(routing_functions) if routing_functions else "# No routing functions needed" 

529 

530 # Generate graph construction 

531 graph_construction = self._generate_graph_construction(workflow) 

532 

533 # Fill template (use sanitized name) 

534 code = self.AGENT_TEMPLATE.format( 

535 description=workflow.description or f"{sanitized_name} workflow", 

536 class_name=class_name, 

537 workflow_name=sanitized_name, 

538 state_fields=state_fields, 

539 node_functions=node_functions_code, 

540 routing_functions=routing_functions_code, 

541 graph_construction=graph_construction, 

542 ) 

543 

544 # Format with ruff (standardized formatter, replaces black) 

545 return _format_with_ruff(code) 

546 

547 def generate_to_file(self, workflow: WorkflowDefinition, output_path: str) -> None: 

548 """ 

549 Generate code and save to file. 

550 

551 Security: Path is validated to prevent path injection attacks. 

552 Only paths within BUILDER_OUTPUT_DIR (default: temp directory) are allowed. 

553 

554 Args: 

555 workflow: Workflow definition 

556 output_path: Output file path (must be within allowed directory) 

557 

558 Raises: 

559 ValueError: If output_path fails security validation 

560 

561 Example: 

562 >>> generator.generate_to_file(workflow, "/tmp/mcp-server-workflows/my_agent.py") 

563 """ 

564 # Security: Validate path before writing (defense-in-depth) 

565 validated_path = _validate_output_path(output_path) 

566 

567 code = self.generate(workflow) 

568 

569 # Ensure parent directory exists 

570 # nosemgrep: python.lang.security.audit.path-traversal.path-traversal-open 

571 # Security: validated_path was verified by _validate_output_path() above 

572 validated_path.parent.mkdir(parents=True, exist_ok=True) 

573 

574 # nosemgrep: python.lang.security.audit.path-traversal.path-traversal-open 

575 # Security: validated_path was verified by _validate_output_path() above 

576 with open(str(validated_path), "w") as f: 

577 f.write(code) 

578 

579 

580# ============================================================================== 

581# Example Usage 

582# ============================================================================== 

583 

584if __name__ == "__main__": 

585 # Example workflow: Simple research agent 

586 workflow = WorkflowDefinition( 

587 name="research_agent", 

588 description="Research agent that searches and summarizes", 

589 nodes=[ 

590 NodeDefinition(id="search", type="tool", label="Web Search", config={"tool": "tavily_search"}), 

591 NodeDefinition(id="summarize", type="llm", label="Summarize", config={"model": "gemini-2.5-flash"}), 

592 NodeDefinition(id="validate", type="conditional", label="Validate Quality"), 

593 ], 

594 edges=[ 

595 EdgeDefinition(from_node="search", to_node="summarize"), # type: ignore 

596 EdgeDefinition(from_node="summarize", to_node="validate"), # type: ignore 

597 ], 

598 entry_point="search", 

599 state_schema={"query": "str", "search_results": "List[str]", "summary": "str", "validated": "bool"}, 

600 ) 

601 

602 # Generate code 

603 generator = CodeGenerator() 

604 code = generator.generate(workflow) 

605 

606 print("=" * 80) 

607 print("GENERATED PYTHON CODE") 

608 print("=" * 80) 

609 print(code) 

610 print("=" * 80)