Coverage for src / mcp_server_langgraph / builder / importer / layout_engine.py: 94%
115 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Layout Engine for Auto-Arranging Workflow Nodes
4Automatically positions nodes on the visual canvas using graph layout algorithms.
6Algorithms:
7- Hierarchical (top-down, layered)
8- Force-directed (organic, balanced)
9- Grid (simple, aligned)
11Example:
12 from mcp_server_langgraph.builder.importer import LayoutEngine
14 engine = LayoutEngine()
15 positioned_nodes = engine.layout(nodes, edges, algorithm="hierarchical")
16"""
18import math
19from typing import Any, Literal
22class LayoutEngine:
23 """
24 Auto-layout engine for workflow nodes.
26 Positions nodes on canvas using graph layout algorithms.
27 """
29 def __init__(
30 self,
31 canvas_width: int = 1200,
32 canvas_height: int = 800,
33 node_width: int = 180,
34 node_height: int = 60,
35 spacing_x: int = 250,
36 spacing_y: int = 150,
37 ):
38 """
39 Initialize layout engine.
41 Args:
42 canvas_width: Canvas width in pixels
43 canvas_height: Canvas height in pixels
44 node_width: Node width
45 node_height: Node height
46 spacing_x: Horizontal spacing between nodes
47 spacing_y: Vertical spacing between nodes
48 """
49 self.canvas_width = canvas_width
50 self.canvas_height = canvas_height
51 self.node_width = node_width
52 self.node_height = node_height
53 self.spacing_x = spacing_x
54 self.spacing_y = spacing_y
56 def layout(
57 self,
58 nodes: list[dict[str, Any]],
59 edges: list[dict[str, str]],
60 algorithm: Literal["hierarchical", "force", "grid"] = "hierarchical",
61 entry_point: str | None = None,
62 ) -> list[dict[str, Any]]:
63 """
64 Layout nodes using specified algorithm.
66 Args:
67 nodes: List of node definitions
68 edges: List of edge definitions
69 algorithm: Layout algorithm to use
70 entry_point: Entry point node (for hierarchical)
72 Returns:
73 Nodes with updated positions
75 Example:
76 >>> positioned = engine.layout(nodes, edges, "hierarchical")
77 >>> positioned[0]["position"]
78 {'x': 250, 'y': 50}
79 """
80 if algorithm == "hierarchical":
81 return self._hierarchical_layout(nodes, edges, entry_point)
82 elif algorithm == "force":
83 return self._force_directed_layout(nodes, edges)
84 elif algorithm == "grid": 84 ↛ 87line 84 didn't jump to line 87 because the condition on line 84 was always true
85 return self._grid_layout(nodes)
86 else:
87 raise ValueError(f"Unknown layout algorithm: {algorithm}") # noqa: EM102, TRY003
89 def _hierarchical_layout(
90 self, nodes: list[dict[str, Any]], edges: list[dict[str, str]], entry_point: str | None
91 ) -> list[dict[str, Any]]:
92 """
93 Hierarchical top-down layout.
95 Arranges nodes in layers based on graph depth.
97 Args:
98 nodes: Nodes to layout
99 edges: Edges defining connections
100 entry_point: Starting node
102 Returns:
103 Positioned nodes
104 """
105 # Build adjacency list
106 adjacency: dict[str, list[str]] = {node["id"]: [] for node in nodes}
107 for edge in edges:
108 if edge["from"] in adjacency: 108 ↛ 107line 108 didn't jump to line 107 because the condition on line 108 was always true
109 adjacency[edge["from"]].append(edge["to"])
111 # Calculate depth for each node (BFS from entry point)
112 depths: dict[str, int] = {}
113 if entry_point: 113 ↛ 131line 113 didn't jump to line 131 because the condition on line 113 was always true
114 queue = [(entry_point, 0)]
115 visited = set()
117 while queue:
118 node_id, depth = queue.pop(0)
119 if node_id in visited: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true
120 continue
122 visited.add(node_id)
123 depths[node_id] = depth
125 # Add children to queue
126 for child in adjacency.get(node_id, []):
127 if child not in visited: 127 ↛ 126line 127 didn't jump to line 126 because the condition on line 127 was always true
128 queue.append((child, depth + 1))
130 # Assign remaining nodes (not reachable from entry)
131 max_depth = max(depths.values()) if depths else 0
132 for node in nodes:
133 if node["id"] not in depths:
134 depths[node["id"]] = max_depth + 1
136 # Group nodes by depth (layer)
137 layers: dict[int, list[str]] = {}
138 for node_id, depth in depths.items():
139 if depth not in layers:
140 layers[depth] = []
141 layers[depth].append(node_id)
143 # Position nodes by layer
144 positioned_nodes = []
145 for node in nodes:
146 node_id = node["id"]
147 depth = depths.get(node_id, 0)
148 layer_nodes = layers[depth]
149 position_in_layer = layer_nodes.index(node_id)
151 # Calculate position
152 y = 50 + depth * self.spacing_y
153 x = self._center_nodes_in_layer(len(layer_nodes), position_in_layer)
155 # Update node with position
156 node["position"] = {"x": x, "y": y}
157 positioned_nodes.append(node)
159 return positioned_nodes
161 def _center_nodes_in_layer(self, num_nodes: int, position: int) -> int:
162 """
163 Calculate x position to center nodes in layer.
165 Args:
166 num_nodes: Number of nodes in layer
167 position: Position index in layer
169 Returns:
170 X coordinate
171 """
172 total_width = num_nodes * self.spacing_x
173 start_x = (self.canvas_width - total_width) / 2
175 return int(start_x + position * self.spacing_x)
177 def _force_directed_layout(self, nodes: list[dict[str, Any]], edges: list[dict[str, str]]) -> list[dict[str, Any]]:
178 """
179 Force-directed layout (Fruchterman-Reingold algorithm).
181 Uses physics simulation to create organic layout.
183 Args:
184 nodes: Nodes to layout
185 edges: Edges
187 Returns:
188 Positioned nodes
189 """
190 # Initialize random positions
191 import random
193 for node in nodes:
194 node["position"] = {
195 "x": random.randint(100, self.canvas_width - 100),
196 "y": random.randint(100, self.canvas_height - 100),
197 }
199 # Physics simulation parameters
200 iterations = 50
201 k = math.sqrt((self.canvas_width * self.canvas_height) / len(nodes)) # Optimal distance
202 temperature = self.canvas_width / 10
204 # Simulation
205 for iteration in range(iterations):
206 # Calculate repulsive forces (all pairs)
207 forces = {node["id"]: {"x": 0.0, "y": 0.0} for node in nodes}
209 for i, node1 in enumerate(nodes):
210 for node2 in nodes[i + 1 :]:
211 dx = node1["position"]["x"] - node2["position"]["x"]
212 dy = node1["position"]["y"] - node2["position"]["y"]
213 distance = math.sqrt(dx * dx + dy * dy) or 1
215 # Repulsive force (nodes repel)
216 force = k * k / distance
217 fx = (dx / distance) * force
218 fy = (dy / distance) * force
220 forces[node1["id"]]["x"] += fx
221 forces[node1["id"]]["y"] += fy
222 forces[node2["id"]]["x"] -= fx
223 forces[node2["id"]]["y"] -= fy
225 # Calculate attractive forces (connected pairs)
226 for edge in edges:
227 node1_result = next((n for n in nodes if n["id"] == edge["from"]), {})
228 node2_result = next((n for n in nodes if n["id"] == edge["to"]), {})
230 if not node1_result or not node2_result: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true
231 continue
233 node1 = node1_result
234 node2 = node2_result
236 dx = node1["position"]["x"] - node2["position"]["x"]
237 dy = node1["position"]["y"] - node2["position"]["y"]
238 distance = math.sqrt(dx * dx + dy * dy) or 1
240 # Attractive force (edges pull)
241 force = distance * distance / k
242 fx = (dx / distance) * force
243 fy = (dy / distance) * force
245 forces[node1["id"]]["x"] -= fx
246 forces[node1["id"]]["y"] -= fy
247 forces[node2["id"]]["x"] += fx
248 forces[node2["id"]]["y"] += fy
250 # Apply forces with temperature cooling
251 temp = temperature * (1 - iteration / iterations)
253 for node in nodes:
254 force_vec = forces[node["id"]]
255 displacement = math.sqrt(force_vec["x"] ** 2 + force_vec["y"] ** 2) or 1
257 # Limit displacement by temperature
258 node["position"]["x"] += (force_vec["x"] / displacement) * min(displacement, temp)
259 node["position"]["y"] += (force_vec["y"] / displacement) * min(displacement, temp)
261 # Keep within canvas bounds
262 node["position"]["x"] = max(50, min(self.canvas_width - 50, node["position"]["x"]))
263 node["position"]["y"] = max(50, min(self.canvas_height - 50, node["position"]["y"]))
265 return nodes
267 def _grid_layout(self, nodes: list[dict[str, Any]]) -> list[dict[str, Any]]:
268 """
269 Simple grid layout.
271 Args:
272 nodes: Nodes to layout
274 Returns:
275 Positioned nodes in grid
276 """
277 # Calculate grid dimensions
278 num_nodes = len(nodes)
279 cols = math.ceil(math.sqrt(num_nodes))
281 for i, node in enumerate(nodes):
282 row = i // cols
283 col = i % cols
285 node["position"] = {"x": 100 + col * self.spacing_x, "y": 100 + row * self.spacing_y}
287 return nodes
290# ==============================================================================
291# Example Usage
292# ==============================================================================
294if __name__ == "__main__":
295 # Sample nodes and edges
296 nodes = [
297 {"id": "search", "type": "tool", "label": "Search"},
298 {"id": "filter", "type": "custom", "label": "Filter"},
299 {"id": "summarize", "type": "llm", "label": "Summarize"},
300 {"id": "validate", "type": "conditional", "label": "Validate"},
301 {"id": "respond", "type": "custom", "label": "Respond"},
302 ]
304 edges = [
305 {"from": "search", "to": "filter"},
306 {"from": "filter", "to": "summarize"},
307 {"from": "summarize", "to": "validate"},
308 {"from": "validate", "to": "respond"},
309 ]
311 engine = LayoutEngine()
313 print("=" * 80)
314 print("LAYOUT ENGINE - TEST RUN")
315 print("=" * 80)
317 for algorithm in ["hierarchical", "force", "grid"]:
318 print(f"\n{algorithm.upper()} LAYOUT:")
319 positioned = engine.layout(nodes.copy(), edges, algorithm=algorithm, entry_point="search") # type: ignore
321 for node in positioned:
322 print(f" {node['id']:15} @ ({node['position']['x']:6.1f}, {node['position']['y']:6.1f})")
324 print("\n" + "=" * 80)