Coverage for src / mcp_server_langgraph / builder / importer / layout_engine.py: 94%

115 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Layout Engine for Auto-Arranging Workflow Nodes 

3 

4Automatically positions nodes on the visual canvas using graph layout algorithms. 

5 

6Algorithms: 

7- Hierarchical (top-down, layered) 

8- Force-directed (organic, balanced) 

9- Grid (simple, aligned) 

10 

11Example: 

12 from mcp_server_langgraph.builder.importer import LayoutEngine 

13 

14 engine = LayoutEngine() 

15 positioned_nodes = engine.layout(nodes, edges, algorithm="hierarchical") 

16""" 

17 

18import math 

19from typing import Any, Literal 

20 

21 

22class LayoutEngine: 

23 """ 

24 Auto-layout engine for workflow nodes. 

25 

26 Positions nodes on canvas using graph layout algorithms. 

27 """ 

28 

29 def __init__( 

30 self, 

31 canvas_width: int = 1200, 

32 canvas_height: int = 800, 

33 node_width: int = 180, 

34 node_height: int = 60, 

35 spacing_x: int = 250, 

36 spacing_y: int = 150, 

37 ): 

38 """ 

39 Initialize layout engine. 

40 

41 Args: 

42 canvas_width: Canvas width in pixels 

43 canvas_height: Canvas height in pixels 

44 node_width: Node width 

45 node_height: Node height 

46 spacing_x: Horizontal spacing between nodes 

47 spacing_y: Vertical spacing between nodes 

48 """ 

49 self.canvas_width = canvas_width 

50 self.canvas_height = canvas_height 

51 self.node_width = node_width 

52 self.node_height = node_height 

53 self.spacing_x = spacing_x 

54 self.spacing_y = spacing_y 

55 

56 def layout( 

57 self, 

58 nodes: list[dict[str, Any]], 

59 edges: list[dict[str, str]], 

60 algorithm: Literal["hierarchical", "force", "grid"] = "hierarchical", 

61 entry_point: str | None = None, 

62 ) -> list[dict[str, Any]]: 

63 """ 

64 Layout nodes using specified algorithm. 

65 

66 Args: 

67 nodes: List of node definitions 

68 edges: List of edge definitions 

69 algorithm: Layout algorithm to use 

70 entry_point: Entry point node (for hierarchical) 

71 

72 Returns: 

73 Nodes with updated positions 

74 

75 Example: 

76 >>> positioned = engine.layout(nodes, edges, "hierarchical") 

77 >>> positioned[0]["position"] 

78 {'x': 250, 'y': 50} 

79 """ 

80 if algorithm == "hierarchical": 

81 return self._hierarchical_layout(nodes, edges, entry_point) 

82 elif algorithm == "force": 

83 return self._force_directed_layout(nodes, edges) 

84 elif algorithm == "grid": 84 ↛ 87line 84 didn't jump to line 87 because the condition on line 84 was always true

85 return self._grid_layout(nodes) 

86 else: 

87 raise ValueError(f"Unknown layout algorithm: {algorithm}") # noqa: EM102, TRY003 

88 

89 def _hierarchical_layout( 

90 self, nodes: list[dict[str, Any]], edges: list[dict[str, str]], entry_point: str | None 

91 ) -> list[dict[str, Any]]: 

92 """ 

93 Hierarchical top-down layout. 

94 

95 Arranges nodes in layers based on graph depth. 

96 

97 Args: 

98 nodes: Nodes to layout 

99 edges: Edges defining connections 

100 entry_point: Starting node 

101 

102 Returns: 

103 Positioned nodes 

104 """ 

105 # Build adjacency list 

106 adjacency: dict[str, list[str]] = {node["id"]: [] for node in nodes} 

107 for edge in edges: 

108 if edge["from"] in adjacency: 108 ↛ 107line 108 didn't jump to line 107 because the condition on line 108 was always true

109 adjacency[edge["from"]].append(edge["to"]) 

110 

111 # Calculate depth for each node (BFS from entry point) 

112 depths: dict[str, int] = {} 

113 if entry_point: 113 ↛ 131line 113 didn't jump to line 131 because the condition on line 113 was always true

114 queue = [(entry_point, 0)] 

115 visited = set() 

116 

117 while queue: 

118 node_id, depth = queue.pop(0) 

119 if node_id in visited: 119 ↛ 120line 119 didn't jump to line 120 because the condition on line 119 was never true

120 continue 

121 

122 visited.add(node_id) 

123 depths[node_id] = depth 

124 

125 # Add children to queue 

126 for child in adjacency.get(node_id, []): 

127 if child not in visited: 127 ↛ 126line 127 didn't jump to line 126 because the condition on line 127 was always true

128 queue.append((child, depth + 1)) 

129 

130 # Assign remaining nodes (not reachable from entry) 

131 max_depth = max(depths.values()) if depths else 0 

132 for node in nodes: 

133 if node["id"] not in depths: 

134 depths[node["id"]] = max_depth + 1 

135 

136 # Group nodes by depth (layer) 

137 layers: dict[int, list[str]] = {} 

138 for node_id, depth in depths.items(): 

139 if depth not in layers: 

140 layers[depth] = [] 

141 layers[depth].append(node_id) 

142 

143 # Position nodes by layer 

144 positioned_nodes = [] 

145 for node in nodes: 

146 node_id = node["id"] 

147 depth = depths.get(node_id, 0) 

148 layer_nodes = layers[depth] 

149 position_in_layer = layer_nodes.index(node_id) 

150 

151 # Calculate position 

152 y = 50 + depth * self.spacing_y 

153 x = self._center_nodes_in_layer(len(layer_nodes), position_in_layer) 

154 

155 # Update node with position 

156 node["position"] = {"x": x, "y": y} 

157 positioned_nodes.append(node) 

158 

159 return positioned_nodes 

160 

161 def _center_nodes_in_layer(self, num_nodes: int, position: int) -> int: 

162 """ 

163 Calculate x position to center nodes in layer. 

164 

165 Args: 

166 num_nodes: Number of nodes in layer 

167 position: Position index in layer 

168 

169 Returns: 

170 X coordinate 

171 """ 

172 total_width = num_nodes * self.spacing_x 

173 start_x = (self.canvas_width - total_width) / 2 

174 

175 return int(start_x + position * self.spacing_x) 

176 

177 def _force_directed_layout(self, nodes: list[dict[str, Any]], edges: list[dict[str, str]]) -> list[dict[str, Any]]: 

178 """ 

179 Force-directed layout (Fruchterman-Reingold algorithm). 

180 

181 Uses physics simulation to create organic layout. 

182 

183 Args: 

184 nodes: Nodes to layout 

185 edges: Edges 

186 

187 Returns: 

188 Positioned nodes 

189 """ 

190 # Initialize random positions 

191 import random 

192 

193 for node in nodes: 

194 node["position"] = { 

195 "x": random.randint(100, self.canvas_width - 100), 

196 "y": random.randint(100, self.canvas_height - 100), 

197 } 

198 

199 # Physics simulation parameters 

200 iterations = 50 

201 k = math.sqrt((self.canvas_width * self.canvas_height) / len(nodes)) # Optimal distance 

202 temperature = self.canvas_width / 10 

203 

204 # Simulation 

205 for iteration in range(iterations): 

206 # Calculate repulsive forces (all pairs) 

207 forces = {node["id"]: {"x": 0.0, "y": 0.0} for node in nodes} 

208 

209 for i, node1 in enumerate(nodes): 

210 for node2 in nodes[i + 1 :]: 

211 dx = node1["position"]["x"] - node2["position"]["x"] 

212 dy = node1["position"]["y"] - node2["position"]["y"] 

213 distance = math.sqrt(dx * dx + dy * dy) or 1 

214 

215 # Repulsive force (nodes repel) 

216 force = k * k / distance 

217 fx = (dx / distance) * force 

218 fy = (dy / distance) * force 

219 

220 forces[node1["id"]]["x"] += fx 

221 forces[node1["id"]]["y"] += fy 

222 forces[node2["id"]]["x"] -= fx 

223 forces[node2["id"]]["y"] -= fy 

224 

225 # Calculate attractive forces (connected pairs) 

226 for edge in edges: 

227 node1_result = next((n for n in nodes if n["id"] == edge["from"]), {}) 

228 node2_result = next((n for n in nodes if n["id"] == edge["to"]), {}) 

229 

230 if not node1_result or not node2_result: 230 ↛ 231line 230 didn't jump to line 231 because the condition on line 230 was never true

231 continue 

232 

233 node1 = node1_result 

234 node2 = node2_result 

235 

236 dx = node1["position"]["x"] - node2["position"]["x"] 

237 dy = node1["position"]["y"] - node2["position"]["y"] 

238 distance = math.sqrt(dx * dx + dy * dy) or 1 

239 

240 # Attractive force (edges pull) 

241 force = distance * distance / k 

242 fx = (dx / distance) * force 

243 fy = (dy / distance) * force 

244 

245 forces[node1["id"]]["x"] -= fx 

246 forces[node1["id"]]["y"] -= fy 

247 forces[node2["id"]]["x"] += fx 

248 forces[node2["id"]]["y"] += fy 

249 

250 # Apply forces with temperature cooling 

251 temp = temperature * (1 - iteration / iterations) 

252 

253 for node in nodes: 

254 force_vec = forces[node["id"]] 

255 displacement = math.sqrt(force_vec["x"] ** 2 + force_vec["y"] ** 2) or 1 

256 

257 # Limit displacement by temperature 

258 node["position"]["x"] += (force_vec["x"] / displacement) * min(displacement, temp) 

259 node["position"]["y"] += (force_vec["y"] / displacement) * min(displacement, temp) 

260 

261 # Keep within canvas bounds 

262 node["position"]["x"] = max(50, min(self.canvas_width - 50, node["position"]["x"])) 

263 node["position"]["y"] = max(50, min(self.canvas_height - 50, node["position"]["y"])) 

264 

265 return nodes 

266 

267 def _grid_layout(self, nodes: list[dict[str, Any]]) -> list[dict[str, Any]]: 

268 """ 

269 Simple grid layout. 

270 

271 Args: 

272 nodes: Nodes to layout 

273 

274 Returns: 

275 Positioned nodes in grid 

276 """ 

277 # Calculate grid dimensions 

278 num_nodes = len(nodes) 

279 cols = math.ceil(math.sqrt(num_nodes)) 

280 

281 for i, node in enumerate(nodes): 

282 row = i // cols 

283 col = i % cols 

284 

285 node["position"] = {"x": 100 + col * self.spacing_x, "y": 100 + row * self.spacing_y} 

286 

287 return nodes 

288 

289 

290# ============================================================================== 

291# Example Usage 

292# ============================================================================== 

293 

294if __name__ == "__main__": 

295 # Sample nodes and edges 

296 nodes = [ 

297 {"id": "search", "type": "tool", "label": "Search"}, 

298 {"id": "filter", "type": "custom", "label": "Filter"}, 

299 {"id": "summarize", "type": "llm", "label": "Summarize"}, 

300 {"id": "validate", "type": "conditional", "label": "Validate"}, 

301 {"id": "respond", "type": "custom", "label": "Respond"}, 

302 ] 

303 

304 edges = [ 

305 {"from": "search", "to": "filter"}, 

306 {"from": "filter", "to": "summarize"}, 

307 {"from": "summarize", "to": "validate"}, 

308 {"from": "validate", "to": "respond"}, 

309 ] 

310 

311 engine = LayoutEngine() 

312 

313 print("=" * 80) 

314 print("LAYOUT ENGINE - TEST RUN") 

315 print("=" * 80) 

316 

317 for algorithm in ["hierarchical", "force", "grid"]: 

318 print(f"\n{algorithm.upper()} LAYOUT:") 

319 positioned = engine.layout(nodes.copy(), edges, algorithm=algorithm, entry_point="search") # type: ignore 

320 

321 for node in positioned: 

322 print(f" {node['id']:15} @ ({node['position']['x']:6.1f}, {node['position']['y']:6.1f})") 

323 

324 print("\n" + "=" * 80)