Coverage for src / mcp_server_langgraph / builder / workflow.py: 96%
65 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Workflow Builder API
4Programmatic API for building agent workflows.
6Provides fluent interface for:
7- Adding nodes and edges
8- Exporting to Python code
9- Importing from existing code
10- Validating workflow structure
12Example:
13 from mcp_server_langgraph.builder import WorkflowBuilder
15 # Create builder
16 builder = WorkflowBuilder("customer_support")
18 # Add nodes
19 builder.add_node("classify", "llm", {"model": "gemini-flash"})
20 builder.add_node("search_kb", "tool", {"tool": "knowledge_base"})
21 builder.add_node("respond", "llm", {"model": "gemini-flash"})
23 # Add edges
24 builder.add_edge("classify", "search_kb")
25 builder.add_edge("search_kb", "respond")
27 # Export to code
28 code = builder.export_code()
29 builder.save_code("agent.py")
30"""
32from typing import Any
34from .codegen import CodeGenerator, EdgeDefinition, NodeDefinition, WorkflowDefinition
37class WorkflowBuilder:
38 """
39 Fluent API for building agent workflows.
41 Provides convenient methods for programmatic workflow construction.
42 """
44 def __init__(self, name: str, description: str = ""):
45 """
46 Initialize workflow builder.
48 Args:
49 name: Workflow name
50 description: Workflow description
51 """
52 self.name = name
53 self.description = description
54 self.nodes: list[NodeDefinition] = []
55 self.edges: list[EdgeDefinition] = []
56 self.entry_point: str | None = None
57 self.state_schema: dict[str, str] = {}
59 def add_node(
60 self, node_id: str, node_type: str = "custom", config: dict[str, Any] | None = None, label: str = ""
61 ) -> "WorkflowBuilder":
62 """
63 Add a node to the workflow.
65 Args:
66 node_id: Unique node identifier
67 node_type: Type of node (tool, llm, conditional, approval, custom)
68 config: Node configuration
69 label: Display label
71 Returns:
72 Self for chaining
74 Example:
75 >>> builder.add_node("search", "tool", {"tool": "web_search"})
76 """
77 node = NodeDefinition(id=node_id, type=node_type, label=label or node_id, config=config or {})
79 self.nodes.append(node)
81 # Set first node as entry point if not set
82 if self.entry_point is None:
83 self.entry_point = node_id
85 return self
87 def add_edge(self, from_node: str, to_node: str, condition: str | None = None, label: str = "") -> "WorkflowBuilder":
88 """
89 Add an edge between nodes.
91 Args:
92 from_node: Source node ID
93 to_node: Target node ID
94 condition: Optional condition expression
95 label: Edge label
97 Returns:
98 Self for chaining
100 Example:
101 >>> builder.add_edge("search", "summarize")
102 >>> builder.add_edge("validate", "approved", condition='state["score"] > 0.8')
103 """
104 edge = EdgeDefinition(from_node=from_node, to_node=to_node, condition=condition, label=label) # type: ignore
106 self.edges.append(edge)
108 return self
110 def set_entry_point(self, node_id: str) -> "WorkflowBuilder":
111 """
112 Set the entry point node.
114 Args:
115 node_id: Node to use as entry point
117 Returns:
118 Self for chaining
119 """
120 self.entry_point = node_id
121 return self
123 def add_state_field(self, field_name: str, field_type: str = "str") -> "WorkflowBuilder":
124 """
125 Add a field to the state schema.
127 Args:
128 field_name: Name of the field
129 field_type: Python type annotation (default: str)
131 Returns:
132 Self for chaining
134 Example:
135 >>> builder.add_state_field("query", "str")
136 >>> builder.add_state_field("results", "List[str]")
137 """
138 self.state_schema[field_name] = field_type
139 return self
141 def build(self) -> WorkflowDefinition:
142 """
143 Build the workflow definition.
145 Returns:
146 WorkflowDefinition ready for code generation
148 Raises:
149 ValueError: If workflow is invalid
150 """
151 if not self.nodes:
152 msg = "Workflow must have at least one node"
153 raise ValueError(msg)
155 if not self.entry_point: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 msg = "Workflow must have an entry point"
157 raise ValueError(msg)
159 # Validate entry point exists
160 node_ids = {n.id for n in self.nodes}
161 if self.entry_point not in node_ids:
162 msg = f"Entry point '{self.entry_point}' not found in nodes"
163 raise ValueError(msg)
165 # Validate all edges reference existing nodes
166 for edge in self.edges:
167 if edge.from_node not in node_ids:
168 msg = f"Edge references non-existent node: {edge.from_node}"
169 raise ValueError(msg)
170 if edge.to_node not in node_ids:
171 msg = f"Edge references non-existent node: {edge.to_node}"
172 raise ValueError(msg)
174 return WorkflowDefinition(
175 name=self.name,
176 description=self.description,
177 nodes=self.nodes,
178 edges=self.edges,
179 entry_point=self.entry_point,
180 state_schema=self.state_schema,
181 )
183 def export_code(self) -> str:
184 """
185 Export workflow to Python code.
187 Returns:
188 Production-ready Python code
190 Example:
191 >>> code = builder.export_code()
192 >>> print(code)
193 """
194 workflow = self.build()
195 generator = CodeGenerator()
197 return generator.generate(workflow)
199 def save_code(self, output_path: str) -> None:
200 """
201 Export workflow to Python file.
203 Args:
204 output_path: Output file path
206 Example:
207 >>> builder.save_code("src/agents/my_agent.py")
208 """
209 code = self.export_code()
211 with open(output_path, "w") as f:
212 f.write(code)
214 def to_json(self) -> dict[str, Any]:
215 """
216 Export workflow to JSON for visual builder.
218 Returns:
219 JSON-serializable workflow definition
221 Example:
222 >>> json_data = builder.to_json()
223 >>> # Send to frontend visual builder
224 """
225 workflow = self.build()
226 return workflow.model_dump()
228 @classmethod
229 def from_json(cls, data: dict[str, Any]) -> "WorkflowBuilder":
230 """
231 Create builder from JSON (from visual builder).
233 Args:
234 data: JSON workflow data
236 Returns:
237 WorkflowBuilder instance
239 Example:
240 >>> builder = WorkflowBuilder.from_json(visual_builder_data)
241 >>> code = builder.export_code()
242 """
243 workflow = WorkflowDefinition(**data)
245 builder = cls(workflow.name, workflow.description)
246 builder.nodes = workflow.nodes
247 builder.edges = workflow.edges
248 builder.entry_point = workflow.entry_point
249 builder.state_schema = workflow.state_schema
251 return builder
254# ==============================================================================
255# Example Usage
256# ==============================================================================
258if __name__ == "__main__":
259 # Build workflow programmatically
260 builder = WorkflowBuilder("research_assistant", "Agent that researches and summarizes topics")
262 # Define state
263 builder.add_state_field("query", "str")
264 builder.add_state_field("search_results", "List[str]")
265 builder.add_state_field("summary", "str")
267 # Add nodes
268 builder.add_node("search", "tool", {"tool": "tavily_search"}, "Search Web")
269 builder.add_node("summarize", "llm", {"model": "gemini-2.5-flash"}, "Summarize Results")
270 builder.add_node("validate", "conditional", {}, "Validate Quality")
272 # Add edges
273 builder.add_edge("search", "summarize")
274 builder.add_edge("summarize", "validate")
276 # Set entry
277 builder.set_entry_point("search")
279 # Export to code
280 code = builder.export_code()
282 print("=" * 80)
283 print("WORKFLOW BUILDER - GENERATED CODE")
284 print("=" * 80)
285 print(code)
286 print("=" * 80)
288 # Also show JSON representation
289 print("\nJSON REPRESENTATION:")
290 import json
292 print(json.dumps(builder.to_json(), indent=2))