Coverage for src / mcp_server_langgraph / patterns / hierarchical.py: 18%

91 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Hierarchical Pattern for Multi-Level Agent Delegation 

3 

4The hierarchical pattern organizes agents in a tree structure where 

5top-level agents can delegate to sub-agents, creating a chain of command. 

6 

7Architecture: 

8 User Query → CEO Agent → [Manager A, Manager B] 

9 ↓ ↓ 

10 [Worker 1, 2, 3] [Worker 4, 5, 6] 

11 

12Use Cases: 

13 - Complex project management (CEO → Managers → Workers) 

14 - Enterprise workflows (Executive → Department → Team → Individual) 

15 - Cascading decision-making 

16 - Organizational hierarchies 

17 

18Example: 

19 from mcp_server_langgraph.patterns import HierarchicalCoordinator 

20 

21 hierarchy = HierarchicalCoordinator( 

22 ceo_agent=executive_agent, 

23 managers={ 

24 "research_manager": research_mgr, 

25 "dev_manager": dev_mgr 

26 }, 

27 workers={ 

28 "research_manager": [researcher1, researcher2], 

29 "dev_manager": [developer1, developer2, developer3] 

30 } 

31 ) 

32 

33 result = hierarchy.invoke({"project": "Build AI feature"}) 

34""" 

35 

36from collections.abc import Callable 

37from typing import Any 

38 

39from langgraph.graph import StateGraph 

40from pydantic import BaseModel, Field 

41 

42 

43class HierarchicalState(BaseModel): 

44 """State for hierarchical pattern.""" 

45 

46 project: str = Field(description="The project or task") 

47 ceo_decision: str = Field(default="", description="Top-level strategic decision") 

48 manager_assignments: dict[str, str] = Field(default_factory=dict, description="Tasks assigned to each manager") 

49 worker_results: dict[str, list[Any]] = Field(default_factory=dict, description="Results from workers by manager") 

50 manager_reports: dict[str, str] = Field(default_factory=dict, description="Manager summary reports") 

51 final_report: str = Field(default="", description="Final consolidated report") 

52 execution_path: list[str] = Field(default_factory=list, description="Execution path through hierarchy") 

53 

54 

55class HierarchicalCoordinator: 

56 """ 

57 Hierarchical pattern for multi-level agent delegation. 

58 

59 Organizes agents in a tree: CEO → Managers → Workers 

60 """ 

61 

62 def __init__( 

63 self, 

64 ceo_agent: Callable[[str], str], 

65 managers: dict[str, Callable[[str], str]], 

66 workers: dict[str, list[Callable[[str], Any]]], 

67 delegation_strategy: str = "balanced", 

68 ): 

69 """ 

70 Initialize hierarchical coordinator. 

71 

72 Args: 

73 ceo_agent: Top-level decision-making agent 

74 managers: Dict of {manager_name: manager_function} 

75 workers: Dict of {manager_name: [worker_functions]} 

76 delegation_strategy: How to distribute work 

77 - balanced: Distribute evenly 

78 - specialized: Route based on expertise 

79 """ 

80 self.ceo_agent = ceo_agent 

81 self.managers = managers 

82 self.workers = workers 

83 self.delegation_strategy = delegation_strategy 

84 self._graph: StateGraph[HierarchicalState] | None = None 

85 

86 def _ceo_node(self, state: HierarchicalState) -> HierarchicalState: 

87 """ 

88 CEO makes top-level decisions and delegates to managers. 

89 

90 In production, CEO agent would use LLM to analyze project 

91 and decide delegation strategy. 

92 """ 

93 # CEO analyzes the project 

94 ceo_analysis = self.ceo_agent(state.project) 

95 state.ceo_decision = ceo_analysis 

96 state.execution_path.append("CEO") 

97 

98 # Delegate to managers (simplified delegation logic) 

99 if self.delegation_strategy == "balanced": 

100 # Distribute work evenly 

101 managers_list = list(self.managers.keys()) 

102 for manager_name in managers_list: 

103 state.manager_assignments[manager_name] = f"Handle aspect of '{state.project}' - assigned by CEO" 

104 else: 

105 # Specialized delegation (in production, use LLM) 

106 for manager_name in self.managers: 

107 state.manager_assignments[manager_name] = f"Specialized task for {manager_name} regarding '{state.project}'" 

108 

109 return state 

110 

111 def _create_manager_node( 

112 self, manager_name: str, manager_func: Callable[[str], str] 

113 ) -> Callable[[HierarchicalState], HierarchicalState]: 

114 """ 

115 Create manager node that delegates to workers. 

116 

117 Args: 

118 manager_name: Name of the manager 

119 manager_func: Manager function 

120 

121 Returns: 

122 Manager node function 

123 """ 

124 

125 def manager_node(state: HierarchicalState) -> HierarchicalState: 

126 """Manager delegates to workers and summarizes results.""" 

127 state.execution_path.append(f"Manager:{manager_name}") 

128 

129 # Get assignment from CEO 

130 assignment = state.manager_assignments.get(manager_name, "") 

131 

132 # Manager analyzes and delegates to workers 

133 manager_analysis = manager_func(assignment) 

134 

135 # Collect worker results 

136 worker_funcs = self.workers.get(manager_name, []) 

137 worker_results = [] 

138 

139 for i, worker_func in enumerate(worker_funcs): 

140 state.execution_path.append(f"Worker:{manager_name}_{i}") 

141 worker_result = worker_func(assignment) 

142 worker_results.append(worker_result) 

143 

144 # Store worker results 

145 state.worker_results[manager_name] = worker_results 

146 

147 # Manager creates summary report 

148 report = f"**{manager_name.replace('_', ' ').title()} Report:**\n\n" 

149 report += f"Assignment: {assignment}\n\n" 

150 report += f"Manager Analysis: {manager_analysis}\n\n" 

151 report += f"Team Results ({len(worker_results)} workers):\n" 

152 

153 for i, result in enumerate(worker_results, 1): 

154 report += f"{i}. {result}\n" 

155 

156 state.manager_reports[manager_name] = report 

157 

158 return state 

159 

160 return manager_node 

161 

162 def _consolidate_node(self, state: HierarchicalState) -> HierarchicalState: 

163 """ 

164 Consolidate all manager reports into final report. 

165 

166 In production, CEO agent would synthesize all reports. 

167 """ 

168 state.execution_path.append("Consolidation") 

169 

170 # Build final report 

171 report_parts = [] 

172 

173 report_parts.append(f"# Project: {state.project}\n") 

174 report_parts.append(f"## CEO Strategic Decision\n{state.ceo_decision}\n") 

175 report_parts.append("## Execution Summary\n") 

176 

177 # Add all manager reports 

178 for report in state.manager_reports.values(): 

179 report_parts.append(f"\n### {report}\n") 

180 

181 # Add execution path 

182 report_parts.append("\n## Execution Path\n") 

183 report_parts.append(" → ".join(state.execution_path)) 

184 

185 # Statistics 

186 total_workers = sum(len(results) for results in state.worker_results.values()) 

187 report_parts.append("\n## Statistics\n") 

188 report_parts.append(f"- Managers: {len(self.managers)}\n") 

189 report_parts.append(f"- Workers: {total_workers}\n") 

190 report_parts.append("- Hierarchy Depth: 3 levels\n") 

191 

192 state.final_report = "\n".join(report_parts) 

193 

194 return state 

195 

196 def build(self) -> "StateGraph[HierarchicalState]": 

197 """Build the hierarchical graph.""" 

198 graph: StateGraph[HierarchicalState] = StateGraph(HierarchicalState) 

199 

200 # Add CEO node 

201 graph.add_node("ceo", self._ceo_node) 

202 

203 # Add manager nodes 

204 for manager_name, manager_func in self.managers.items(): 

205 manager_node = self._create_manager_node(manager_name, manager_func) 

206 graph.add_node(f"manager_{manager_name}", manager_node) # type: ignore[arg-type] 

207 

208 # Add consolidation node 

209 graph.add_node("consolidate", self._consolidate_node) 

210 

211 # Define edges 

212 graph.set_entry_point("ceo") 

213 

214 # CEO delegates to all managers 

215 for manager_name in self.managers: 

216 graph.add_edge("ceo", f"manager_{manager_name}") 

217 

218 # All managers report to consolidation 

219 for manager_name in self.managers: 

220 graph.add_edge(f"manager_{manager_name}", "consolidate") 

221 

222 # Consolidation is the end 

223 graph.set_finish_point("consolidate") 

224 

225 return graph 

226 

227 def compile(self, checkpointer: Any = None) -> Any: 

228 """ 

229 Compile the hierarchical graph. 

230 

231 Args: 

232 checkpointer: Optional checkpointer 

233 

234 Returns: 

235 Compiled graph 

236 """ 

237 if self._graph is None: 

238 self._graph = self.build() 

239 

240 return self._graph.compile(checkpointer=checkpointer) 

241 

242 def invoke(self, project: str, config: dict[str, Any] | None = None) -> dict[str, Any]: 

243 """ 

244 Execute the hierarchical pattern. 

245 

246 Args: 

247 project: Project description 

248 config: Optional configuration 

249 

250 Returns: 

251 Final report and execution details 

252 """ 

253 compiled = self.compile() 

254 state = HierarchicalState(project=project) 

255 

256 result = compiled.invoke(state, config=config or {}) 

257 

258 return { 

259 "project": result.project, 

260 "final_report": result.final_report, 

261 "ceo_decision": result.ceo_decision, 

262 "manager_reports": result.manager_reports, 

263 "execution_path": result.execution_path, 

264 "total_agents": len(result.execution_path), 

265 } 

266 

267 

268# Example usage and testing 

269if __name__ == "__main__": 

270 # Define organizational hierarchy 

271 def ceo_agent(project: str) -> str: 

272 """CEO strategic planning.""" 

273 return f"Strategic analysis: {project} requires research and development coordination." 

274 

275 def research_manager(task: str) -> str: 

276 """Research manager planning.""" 

277 return "Research plan: Investigate technical requirements and feasibility." 

278 

279 def dev_manager(task: str) -> str: 

280 """Development manager planning.""" 

281 return "Development plan: Design architecture and implementation roadmap." 

282 

283 def researcher_1(task: str) -> str: 

284 """Research worker.""" 

285 return "Conducted market analysis and competitor research" 

286 

287 def researcher_2(task: str) -> str: 

288 """Research worker.""" 

289 return "Reviewed academic papers and technical documentation" 

290 

291 def developer_1(task: str) -> str: 

292 """Development worker.""" 

293 return "Designed system architecture and database schema" 

294 

295 def developer_2(task: str) -> str: 

296 """Development worker.""" 

297 return "Implemented core features and API endpoints" 

298 

299 def developer_3(task: str) -> str: 

300 """Development worker.""" 

301 return "Created tests and deployment configurations" 

302 

303 # Create hierarchical coordinator 

304 hierarchy = HierarchicalCoordinator( 

305 ceo_agent=ceo_agent, 

306 managers={"research_manager": research_manager, "dev_manager": dev_manager}, 

307 workers={"research_manager": [researcher_1, researcher_2], "dev_manager": [developer_1, developer_2, developer_3]}, 

308 ) 

309 

310 # Test 

311 print("=" * 80) 

312 print("HIERARCHICAL PATTERN - TEST RUN") 

313 print("=" * 80) 

314 

315 result = hierarchy.invoke("Build AI-powered search feature") 

316 

317 print(f"\nProject: {result['project']}") 

318 print(f"Total Agents Involved: {result['total_agents']}") 

319 print(f"Execution Path: {' → '.join(result['execution_path'][:5])}...") 

320 print(f"\nFINAL REPORT:\n{result['final_report']}") 

321 

322 print("\n" + "=" * 80)