Coverage for src / mcp_server_langgraph / builder / codegen / generator.py: 91%
166 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Code Generator for Visual Workflow Builder
4Unique Feature: Export visual workflows to production-ready Python code.
6This is what differentiates us from OpenAI AgentKit - they have visual builder
7but NO code export! We provide:
8- Full Python code generation
9- Production-ready patterns
10- Type-safe code with Pydantic
11- Black-formatted output
12- Import/export round-trip capability
14Example:
15 from mcp_server_langgraph.builder.codegen import CodeGenerator, WorkflowDefinition
17 # Define workflow
18 workflow = WorkflowDefinition(
19 name="research_agent",
20 nodes=[
21 {"id": "search", "type": "tool", "config": {"tool": "web_search"}},
22 {"id": "summarize", "type": "llm", "config": {"model": "gemini-flash"}}
23 ],
24 edges=[{"from": "search", "to": "summarize"}]
25 )
27 # Generate Python code
28 generator = CodeGenerator()
29 python_code = generator.generate(workflow)
31 # Result: Production-ready Python code
32 print(python_code)
33"""
35import os
36import tempfile
37from pathlib import Path
38from typing import Any
40from pydantic import BaseModel, ConfigDict, Field
43def _validate_output_path(output_path: str) -> Path:
44 """
45 Validate output path for security (defense-in-depth).
47 Security: Prevents path injection attacks (CWE-73) by:
48 1. Resolving to absolute path (prevents path traversal via ..)
49 2. Validating against allowed directories (temp dir or BUILDER_OUTPUT_DIR)
50 3. Blocking system directories
51 4. Requiring .py extension
53 This validation runs even when called programmatically (not just via API),
54 providing defense-in-depth against path injection.
56 Args:
57 output_path: The output file path to validate
59 Returns:
60 Validated and resolved Path object
62 Raises:
63 ValueError: If path fails validation
64 """
65 # nosemgrep: python.lang.security.audit.path-traversal.path-traversal-open
66 # Security: Path is validated against allowlist below (temp dir, custom allowed dir)
67 path = Path(output_path).resolve()
68 path_str = str(path)
70 # System directories are NEVER allowed (check first for fail-fast)
71 forbidden_prefixes = ("/etc/", "/sys/", "/proc/", "/dev/", "/var/log/", "/root/")
72 if any(path_str.startswith(prefix) for prefix in forbidden_prefixes): 72 ↛ 73line 72 didn't jump to line 73 because the condition on line 72 was never true
73 msg = "Output path cannot target system directories"
74 raise ValueError(msg)
76 # Ensure .py extension
77 if path.suffix != ".py": 77 ↛ 78line 77 didn't jump to line 78 because the condition on line 77 was never true
78 msg = "Output path must have .py extension"
79 raise ValueError(msg)
81 # Get allowed directories
82 temp_dir = Path(tempfile.gettempdir()).resolve()
83 custom_dir = os.getenv("BUILDER_OUTPUT_DIR")
85 # Build list of allowed base directories
86 allowed_dirs = [temp_dir]
87 if custom_dir: 87 ↛ 88line 87 didn't jump to line 88 because the condition on line 87 was never true
88 allowed_dirs.append(Path(custom_dir).resolve())
90 # Check if path is within any allowed directory
91 is_allowed = False
92 for allowed_base in allowed_dirs:
93 try:
94 path.relative_to(allowed_base)
95 is_allowed = True
96 break
97 except ValueError:
98 continue
100 if not is_allowed:
101 allowed_str = ", ".join(str(d) for d in allowed_dirs)
102 msg = (
103 f"Invalid output path: must be within allowed directories ({allowed_str}). "
104 f"Set BUILDER_OUTPUT_DIR environment variable to add custom directory."
105 )
106 raise ValueError(msg)
108 # Additional check for path traversal (defense-in-depth after resolution)
109 # Note: resolve() already handles .., but this catches edge cases
110 if ".." in output_path: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 msg = "Path traversal detected: '..' not allowed in output path"
112 raise ValueError(msg)
114 return path
117def _format_with_ruff(code: str) -> str:
118 """
119 Format code using ruff format.
121 Ruff is the project's standardized formatter (replaces black).
122 Falls back to unformatted code if ruff is not available or fails.
123 """
124 import shutil
125 import subprocess
127 if not shutil.which("ruff"): 127 ↛ 129line 127 didn't jump to line 129 because the condition on line 127 was never true
128 # Ruff not available, return unformatted
129 return code
131 try:
132 result = subprocess.run(
133 ["ruff", "format", "--stdin-filename", "generated.py", "-"],
134 input=code,
135 capture_output=True,
136 text=True,
137 timeout=10,
138 )
139 if result.returncode == 0: 139 ↛ 142line 139 didn't jump to line 142 because the condition on line 139 was always true
140 return result.stdout
141 # Formatting failed, return original
142 return code
143 except Exception:
144 # Any error, return unformatted
145 return code
148class NodeDefinition(BaseModel):
149 """Definition of a workflow node."""
151 id: str = Field(description="Unique node ID")
152 type: str = Field(description="Node type: tool, llm, conditional, approval, custom")
153 label: str = Field(default="", description="Display label")
154 config: dict[str, Any] = Field(default_factory=dict, description="Node configuration")
155 position: dict[str, float] = Field(default_factory=dict, description="Canvas position {x, y}")
158class EdgeDefinition(BaseModel):
159 """Definition of a workflow edge."""
161 from_node: str = Field(description="Source node ID", alias="from")
162 to_node: str = Field(description="Target node ID", alias="to")
163 condition: str | None = Field(default=None, description="Optional condition for edge")
164 label: str = Field(default="", description="Edge label")
166 model_config = ConfigDict(populate_by_name=True)
169class WorkflowDefinition(BaseModel):
170 """Complete workflow definition from visual builder."""
172 name: str = Field(description="Workflow name")
173 description: str = Field(default="", description="Workflow description")
174 nodes: list[NodeDefinition] = Field(description="List of nodes")
175 edges: list[EdgeDefinition] = Field(description="List of edges")
176 entry_point: str = Field(description="Entry node ID")
177 state_schema: dict[str, str] = Field(default_factory=dict, description="State field definitions")
178 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
181class CodeGenerator:
182 """
183 Generate production-ready Python code from visual workflows.
185 This is our unique differentiator vs OpenAI AgentKit!
186 """
188 # Code generation template
189 AGENT_TEMPLATE = '''"""
190{description}
192Auto-generated from Visual Workflow Builder.
193"""
195from typing import Any, Dict, List, TypedDict
196from langgraph.graph import StateGraph
197from pydantic import BaseModel, Field
200# ==============================================================================
201# State Definition
202# ==============================================================================
205class {class_name}State(TypedDict):
206 """State for {workflow_name} workflow."""
207{state_fields}
210# ==============================================================================
211# Node Functions
212# ==============================================================================
214{node_functions}
216# ==============================================================================
217# Routing Functions
218# ==============================================================================
220{routing_functions}
222# ==============================================================================
223# Graph Construction
224# ==============================================================================
227def create_{workflow_name}() -> StateGraph:
228 """
229 Create {workflow_name} workflow.
231 Returns:
232 Compiled LangGraph application
234 Example:
235 >>> agent = create_{workflow_name}()
236 >>> result = agent.invoke({{"query": "test"}})
237 """
238 # Create graph
239 graph = StateGraph({class_name}State)
241{graph_construction}
243 return graph
246# ==============================================================================
247# Execution
248# ==============================================================================
251def run_{workflow_name}(input_data: Dict[str, Any], config: Dict[str, Any] = None):
252 """
253 Execute {workflow_name} workflow.
255 Args:
256 input_data: Input state
257 config: Optional configuration
259 Returns:
260 Final state
261 """
262 graph = create_{workflow_name}()
263 app = graph.compile()
265 result = app.invoke(input_data, config=config or {{}})
267 return result
270if __name__ == "__main__":
271 # Test the generated workflow
272 result = run_{workflow_name}({{"query": "test input"}})
273 print(result)
274'''
276 def __init__(self) -> None:
277 """Initialize code generator."""
278 self.templates: dict[str, str] = {}
280 def _generate_state_fields(self, state_schema: dict[str, str]) -> str:
281 """
282 Generate state field definitions.
284 Args:
285 state_schema: Dict of {field_name: field_type}
287 Returns:
288 Formatted state fields
289 """
290 if not state_schema:
291 # Default state
292 return " query: str\n result: str\n metadata: Dict[str, Any]"
294 fields = []
295 for field_name, field_type in state_schema.items():
296 fields.append(f" {field_name}: {field_type}")
298 return "\n".join(fields)
300 def _generate_node_function(self, node: NodeDefinition) -> str:
301 """
302 Generate node function code.
304 Args:
305 node: Node definition
307 Returns:
308 Python function code
309 """
310 function_name = f"node_{node.id.replace('-', '_')}"
312 if node.type == "tool":
313 # Tool node
314 tool_name = node.config.get("tool", "unknown_tool")
315 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]:
316 """Execute {node.label or node.id} - tool: {tool_name}."""
317 # TODO: Implement {tool_name} integration
318 result = call_tool("{tool_name}", state)
319 state["result"] = result
320 return state
321'''
323 elif node.type == "llm":
324 # LLM node
325 model = node.config.get("model", "gemini-flash")
326 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]:
327 """Execute {node.label or node.id} - LLM: {model}."""
328 # TODO: Implement LLM call
329 from litellm import completion
331 response = completion(
332 model="{model}",
333 messages=[{{"role": "user", "content": state["query"]}}]
334 )
335 state["llm_response"] = response.choices[0].message.content
336 return state
337'''
339 elif node.type == "conditional":
340 # Conditional node
341 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]:
342 """Conditional: {node.label or node.id}."""
343 # TODO: Implement conditional logic
344 return state
345'''
347 elif node.type == "approval":
348 # Approval node (human-in-the-loop)
349 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]:
350 """Approval checkpoint: {node.label or node.id}."""
351 from mcp_server_langgraph.core.interrupts import ApprovalNode
353 approval = ApprovalNode("{node.id}", description="{node.label}")
354 return approval(state)
355'''
357 else:
358 # Custom node
359 return f'''def {function_name}(state: Dict[str, Any]) -> Dict[str, Any]:
360 """Custom node: {node.label or node.id}."""
361 # TODO: Implement custom logic for {node.id}
362 return state
363'''
365 def _generate_routing_function(self, node: NodeDefinition, edges: list[EdgeDefinition]) -> str | None:
366 """
367 Generate routing function for conditional edges.
369 Args:
370 node: Node with conditional outgoing edges
371 edges: All edges from this node
373 Returns:
374 Routing function code or None
375 """
376 outgoing_edges = [e for e in edges if e.from_node == node.id]
378 if not outgoing_edges or len(outgoing_edges) <= 1:
379 return None # No routing needed for single edge
381 # Check if any edges have conditions
382 conditional_edges = [e for e in outgoing_edges if e.condition]
384 if not conditional_edges:
385 return None # No conditions, no routing needed
387 function_name = f"route_from_{node.id.replace('-', '_')}"
389 # Generate routing function
390 code = f'''def {function_name}(state: Dict[str, Any]) -> str:
391 """Route from {node.label or node.id}."""
392'''
394 for edge in conditional_edges:
395 condition = edge.condition or "True"
396 code += f""" if {condition}:
397 return "{edge.to_node}"
398"""
400 # Default route (last edge without condition or first edge)
401 default_edge = outgoing_edges[0]
402 code += f""" return "{default_edge.to_node}"
403"""
405 return code
407 def _generate_graph_construction(self, workflow: WorkflowDefinition) -> str:
408 """
409 Generate graph construction code.
411 Args:
412 workflow: Workflow definition
414 Returns:
415 Graph construction code
416 """
417 lines = []
419 # Add nodes
420 lines.append(" # Add nodes")
421 for node in workflow.nodes:
422 function_name = f"node_{node.id.replace('-', '_')}"
423 lines.append(f' graph.add_node("{node.id}", {function_name})')
425 # Add edges
426 lines.append("\n # Add edges")
428 # Group edges by source node
429 edges_by_source: dict[str, list[EdgeDefinition]] = {}
430 for edge in workflow.edges:
431 if edge.from_node not in edges_by_source:
432 edges_by_source[edge.from_node] = []
433 edges_by_source[edge.from_node].append(edge)
435 # Generate edges
436 for source, edges in edges_by_source.items():
437 if len(edges) == 1 and not edges[0].condition:
438 # Simple edge
439 lines.append(f' graph.add_edge("{source}", "{edges[0].to_node}")')
440 else:
441 # Conditional edges
442 routing_func = f"route_from_{source.replace('-', '_')}"
443 lines.append(f' graph.add_conditional_edges("{source}", {routing_func})')
445 # Set entry and exit
446 lines.append("\n # Set entry point")
447 lines.append(f' graph.set_entry_point("{workflow.entry_point}")')
449 # Find terminal nodes (nodes with no outgoing edges)
450 terminal_nodes = []
451 all_sources = {e.from_node for e in workflow.edges}
452 all_nodes = {n.id for n in workflow.nodes}
453 terminal_nodes = list(all_nodes - all_sources)
455 if terminal_nodes:
456 lines.append("\n # Set finish points")
457 for terminal in terminal_nodes:
458 lines.append(f' graph.set_finish_point("{terminal}")')
460 return "\n".join(lines)
462 def _sanitize_workflow_name(self, name: str) -> str:
463 """
464 Sanitize workflow name to prevent code injection.
466 SECURITY: Prevents code injection via malicious workflow names.
467 Only allows alphanumeric characters and underscores.
469 Args:
470 name: Raw workflow name from user input
472 Returns:
473 Sanitized workflow name safe for code generation
474 """
475 import re
477 # Remove all non-alphanumeric characters except underscores
478 sanitized = re.sub(r"[^a-zA-Z0-9_]", "_", name)
479 # Ensure it starts with a letter
480 if sanitized and not sanitized[0].isalpha(): 480 ↛ 481line 480 didn't jump to line 481 because the condition on line 480 was never true
481 sanitized = "workflow_" + sanitized
482 # Fallback if completely empty
483 if not sanitized: 483 ↛ 484line 483 didn't jump to line 484 because the condition on line 483 was never true
484 sanitized = "workflow"
485 return sanitized
487 def generate(self, workflow: WorkflowDefinition) -> str:
488 """
489 Generate production-ready Python code from workflow.
491 Args:
492 workflow: Workflow definition from visual builder
494 Returns:
495 Formatted Python code
497 Example:
498 >>> workflow = WorkflowDefinition(
499 ... name="my_agent",
500 ... nodes=[...],
501 ... edges=[...]
502 ... )
503 >>> code = generator.generate(workflow)
504 >>> print(code)
505 """
506 # SECURITY: Sanitize workflow name to prevent code injection
507 sanitized_name = self._sanitize_workflow_name(workflow.name)
509 # Generate components
510 class_name = "".join(word.capitalize() for word in sanitized_name.split("_"))
511 state_fields = self._generate_state_fields(workflow.state_schema)
513 # Generate node functions
514 node_functions = []
515 for node in workflow.nodes:
516 node_func = self._generate_node_function(node)
517 node_functions.append(node_func)
519 node_functions_code = "\n".join(node_functions)
521 # Generate routing functions
522 routing_functions = []
523 for node in workflow.nodes:
524 routing_func = self._generate_routing_function(node, workflow.edges)
525 if routing_func:
526 routing_functions.append(routing_func)
528 routing_functions_code = "\n".join(routing_functions) if routing_functions else "# No routing functions needed"
530 # Generate graph construction
531 graph_construction = self._generate_graph_construction(workflow)
533 # Fill template (use sanitized name)
534 code = self.AGENT_TEMPLATE.format(
535 description=workflow.description or f"{sanitized_name} workflow",
536 class_name=class_name,
537 workflow_name=sanitized_name,
538 state_fields=state_fields,
539 node_functions=node_functions_code,
540 routing_functions=routing_functions_code,
541 graph_construction=graph_construction,
542 )
544 # Format with ruff (standardized formatter, replaces black)
545 return _format_with_ruff(code)
547 def generate_to_file(self, workflow: WorkflowDefinition, output_path: str) -> None:
548 """
549 Generate code and save to file.
551 Security: Path is validated to prevent path injection attacks.
552 Only paths within BUILDER_OUTPUT_DIR (default: temp directory) are allowed.
554 Args:
555 workflow: Workflow definition
556 output_path: Output file path (must be within allowed directory)
558 Raises:
559 ValueError: If output_path fails security validation
561 Example:
562 >>> generator.generate_to_file(workflow, "/tmp/mcp-server-workflows/my_agent.py")
563 """
564 # Security: Validate path before writing (defense-in-depth)
565 validated_path = _validate_output_path(output_path)
567 code = self.generate(workflow)
569 # Ensure parent directory exists
570 # nosemgrep: python.lang.security.audit.path-traversal.path-traversal-open
571 # Security: validated_path was verified by _validate_output_path() above
572 validated_path.parent.mkdir(parents=True, exist_ok=True)
574 # nosemgrep: python.lang.security.audit.path-traversal.path-traversal-open
575 # Security: validated_path was verified by _validate_output_path() above
576 with open(str(validated_path), "w") as f:
577 f.write(code)
580# ==============================================================================
581# Example Usage
582# ==============================================================================
584if __name__ == "__main__":
585 # Example workflow: Simple research agent
586 workflow = WorkflowDefinition(
587 name="research_agent",
588 description="Research agent that searches and summarizes",
589 nodes=[
590 NodeDefinition(id="search", type="tool", label="Web Search", config={"tool": "tavily_search"}),
591 NodeDefinition(id="summarize", type="llm", label="Summarize", config={"model": "gemini-2.5-flash"}),
592 NodeDefinition(id="validate", type="conditional", label="Validate Quality"),
593 ],
594 edges=[
595 EdgeDefinition(from_node="search", to_node="summarize"), # type: ignore
596 EdgeDefinition(from_node="summarize", to_node="validate"), # type: ignore
597 ],
598 entry_point="search",
599 state_schema={"query": "str", "search_results": "List[str]", "summary": "str", "validated": "bool"},
600 )
602 # Generate code
603 generator = CodeGenerator()
604 code = generator.generate(workflow)
606 print("=" * 80)
607 print("GENERATED PYTHON CODE")
608 print("=" * 80)
609 print(code)
610 print("=" * 80)