Coverage for src / mcp_server_langgraph / patterns / hierarchical.py: 18%
91 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Hierarchical Pattern for Multi-Level Agent Delegation
4The hierarchical pattern organizes agents in a tree structure where
5top-level agents can delegate to sub-agents, creating a chain of command.
7Architecture:
8 User Query → CEO Agent → [Manager A, Manager B]
9 ↓ ↓
10 [Worker 1, 2, 3] [Worker 4, 5, 6]
12Use Cases:
13 - Complex project management (CEO → Managers → Workers)
14 - Enterprise workflows (Executive → Department → Team → Individual)
15 - Cascading decision-making
16 - Organizational hierarchies
18Example:
19 from mcp_server_langgraph.patterns import HierarchicalCoordinator
21 hierarchy = HierarchicalCoordinator(
22 ceo_agent=executive_agent,
23 managers={
24 "research_manager": research_mgr,
25 "dev_manager": dev_mgr
26 },
27 workers={
28 "research_manager": [researcher1, researcher2],
29 "dev_manager": [developer1, developer2, developer3]
30 }
31 )
33 result = hierarchy.invoke({"project": "Build AI feature"})
34"""
36from collections.abc import Callable
37from typing import Any
39from langgraph.graph import StateGraph
40from pydantic import BaseModel, Field
43class HierarchicalState(BaseModel):
44 """State for hierarchical pattern."""
46 project: str = Field(description="The project or task")
47 ceo_decision: str = Field(default="", description="Top-level strategic decision")
48 manager_assignments: dict[str, str] = Field(default_factory=dict, description="Tasks assigned to each manager")
49 worker_results: dict[str, list[Any]] = Field(default_factory=dict, description="Results from workers by manager")
50 manager_reports: dict[str, str] = Field(default_factory=dict, description="Manager summary reports")
51 final_report: str = Field(default="", description="Final consolidated report")
52 execution_path: list[str] = Field(default_factory=list, description="Execution path through hierarchy")
55class HierarchicalCoordinator:
56 """
57 Hierarchical pattern for multi-level agent delegation.
59 Organizes agents in a tree: CEO → Managers → Workers
60 """
62 def __init__(
63 self,
64 ceo_agent: Callable[[str], str],
65 managers: dict[str, Callable[[str], str]],
66 workers: dict[str, list[Callable[[str], Any]]],
67 delegation_strategy: str = "balanced",
68 ):
69 """
70 Initialize hierarchical coordinator.
72 Args:
73 ceo_agent: Top-level decision-making agent
74 managers: Dict of {manager_name: manager_function}
75 workers: Dict of {manager_name: [worker_functions]}
76 delegation_strategy: How to distribute work
77 - balanced: Distribute evenly
78 - specialized: Route based on expertise
79 """
80 self.ceo_agent = ceo_agent
81 self.managers = managers
82 self.workers = workers
83 self.delegation_strategy = delegation_strategy
84 self._graph: StateGraph[HierarchicalState] | None = None
86 def _ceo_node(self, state: HierarchicalState) -> HierarchicalState:
87 """
88 CEO makes top-level decisions and delegates to managers.
90 In production, CEO agent would use LLM to analyze project
91 and decide delegation strategy.
92 """
93 # CEO analyzes the project
94 ceo_analysis = self.ceo_agent(state.project)
95 state.ceo_decision = ceo_analysis
96 state.execution_path.append("CEO")
98 # Delegate to managers (simplified delegation logic)
99 if self.delegation_strategy == "balanced":
100 # Distribute work evenly
101 managers_list = list(self.managers.keys())
102 for manager_name in managers_list:
103 state.manager_assignments[manager_name] = f"Handle aspect of '{state.project}' - assigned by CEO"
104 else:
105 # Specialized delegation (in production, use LLM)
106 for manager_name in self.managers:
107 state.manager_assignments[manager_name] = f"Specialized task for {manager_name} regarding '{state.project}'"
109 return state
111 def _create_manager_node(
112 self, manager_name: str, manager_func: Callable[[str], str]
113 ) -> Callable[[HierarchicalState], HierarchicalState]:
114 """
115 Create manager node that delegates to workers.
117 Args:
118 manager_name: Name of the manager
119 manager_func: Manager function
121 Returns:
122 Manager node function
123 """
125 def manager_node(state: HierarchicalState) -> HierarchicalState:
126 """Manager delegates to workers and summarizes results."""
127 state.execution_path.append(f"Manager:{manager_name}")
129 # Get assignment from CEO
130 assignment = state.manager_assignments.get(manager_name, "")
132 # Manager analyzes and delegates to workers
133 manager_analysis = manager_func(assignment)
135 # Collect worker results
136 worker_funcs = self.workers.get(manager_name, [])
137 worker_results = []
139 for i, worker_func in enumerate(worker_funcs):
140 state.execution_path.append(f"Worker:{manager_name}_{i}")
141 worker_result = worker_func(assignment)
142 worker_results.append(worker_result)
144 # Store worker results
145 state.worker_results[manager_name] = worker_results
147 # Manager creates summary report
148 report = f"**{manager_name.replace('_', ' ').title()} Report:**\n\n"
149 report += f"Assignment: {assignment}\n\n"
150 report += f"Manager Analysis: {manager_analysis}\n\n"
151 report += f"Team Results ({len(worker_results)} workers):\n"
153 for i, result in enumerate(worker_results, 1):
154 report += f"{i}. {result}\n"
156 state.manager_reports[manager_name] = report
158 return state
160 return manager_node
162 def _consolidate_node(self, state: HierarchicalState) -> HierarchicalState:
163 """
164 Consolidate all manager reports into final report.
166 In production, CEO agent would synthesize all reports.
167 """
168 state.execution_path.append("Consolidation")
170 # Build final report
171 report_parts = []
173 report_parts.append(f"# Project: {state.project}\n")
174 report_parts.append(f"## CEO Strategic Decision\n{state.ceo_decision}\n")
175 report_parts.append("## Execution Summary\n")
177 # Add all manager reports
178 for report in state.manager_reports.values():
179 report_parts.append(f"\n### {report}\n")
181 # Add execution path
182 report_parts.append("\n## Execution Path\n")
183 report_parts.append(" → ".join(state.execution_path))
185 # Statistics
186 total_workers = sum(len(results) for results in state.worker_results.values())
187 report_parts.append("\n## Statistics\n")
188 report_parts.append(f"- Managers: {len(self.managers)}\n")
189 report_parts.append(f"- Workers: {total_workers}\n")
190 report_parts.append("- Hierarchy Depth: 3 levels\n")
192 state.final_report = "\n".join(report_parts)
194 return state
196 def build(self) -> "StateGraph[HierarchicalState]":
197 """Build the hierarchical graph."""
198 graph: StateGraph[HierarchicalState] = StateGraph(HierarchicalState)
200 # Add CEO node
201 graph.add_node("ceo", self._ceo_node)
203 # Add manager nodes
204 for manager_name, manager_func in self.managers.items():
205 manager_node = self._create_manager_node(manager_name, manager_func)
206 graph.add_node(f"manager_{manager_name}", manager_node) # type: ignore[arg-type]
208 # Add consolidation node
209 graph.add_node("consolidate", self._consolidate_node)
211 # Define edges
212 graph.set_entry_point("ceo")
214 # CEO delegates to all managers
215 for manager_name in self.managers:
216 graph.add_edge("ceo", f"manager_{manager_name}")
218 # All managers report to consolidation
219 for manager_name in self.managers:
220 graph.add_edge(f"manager_{manager_name}", "consolidate")
222 # Consolidation is the end
223 graph.set_finish_point("consolidate")
225 return graph
227 def compile(self, checkpointer: Any = None) -> Any:
228 """
229 Compile the hierarchical graph.
231 Args:
232 checkpointer: Optional checkpointer
234 Returns:
235 Compiled graph
236 """
237 if self._graph is None:
238 self._graph = self.build()
240 return self._graph.compile(checkpointer=checkpointer)
242 def invoke(self, project: str, config: dict[str, Any] | None = None) -> dict[str, Any]:
243 """
244 Execute the hierarchical pattern.
246 Args:
247 project: Project description
248 config: Optional configuration
250 Returns:
251 Final report and execution details
252 """
253 compiled = self.compile()
254 state = HierarchicalState(project=project)
256 result = compiled.invoke(state, config=config or {})
258 return {
259 "project": result.project,
260 "final_report": result.final_report,
261 "ceo_decision": result.ceo_decision,
262 "manager_reports": result.manager_reports,
263 "execution_path": result.execution_path,
264 "total_agents": len(result.execution_path),
265 }
268# Example usage and testing
269if __name__ == "__main__":
270 # Define organizational hierarchy
271 def ceo_agent(project: str) -> str:
272 """CEO strategic planning."""
273 return f"Strategic analysis: {project} requires research and development coordination."
275 def research_manager(task: str) -> str:
276 """Research manager planning."""
277 return "Research plan: Investigate technical requirements and feasibility."
279 def dev_manager(task: str) -> str:
280 """Development manager planning."""
281 return "Development plan: Design architecture and implementation roadmap."
283 def researcher_1(task: str) -> str:
284 """Research worker."""
285 return "Conducted market analysis and competitor research"
287 def researcher_2(task: str) -> str:
288 """Research worker."""
289 return "Reviewed academic papers and technical documentation"
291 def developer_1(task: str) -> str:
292 """Development worker."""
293 return "Designed system architecture and database schema"
295 def developer_2(task: str) -> str:
296 """Development worker."""
297 return "Implemented core features and API endpoints"
299 def developer_3(task: str) -> str:
300 """Development worker."""
301 return "Created tests and deployment configurations"
303 # Create hierarchical coordinator
304 hierarchy = HierarchicalCoordinator(
305 ceo_agent=ceo_agent,
306 managers={"research_manager": research_manager, "dev_manager": dev_manager},
307 workers={"research_manager": [researcher_1, researcher_2], "dev_manager": [developer_1, developer_2, developer_3]},
308 )
310 # Test
311 print("=" * 80)
312 print("HIERARCHICAL PATTERN - TEST RUN")
313 print("=" * 80)
315 result = hierarchy.invoke("Build AI-powered search feature")
317 print(f"\nProject: {result['project']}")
318 print(f"Total Agents Involved: {result['total_agents']}")
319 print(f"Execution Path: {' → '.join(result['execution_path'][:5])}...")
320 print(f"\nFINAL REPORT:\n{result['final_report']}")
322 print("\n" + "=" * 80)