Coverage for src / mcp_server_langgraph / builder / workflow.py: 96%

65 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Workflow Builder API 

3 

4Programmatic API for building agent workflows. 

5 

6Provides fluent interface for: 

7- Adding nodes and edges 

8- Exporting to Python code 

9- Importing from existing code 

10- Validating workflow structure 

11 

12Example: 

13 from mcp_server_langgraph.builder import WorkflowBuilder 

14 

15 # Create builder 

16 builder = WorkflowBuilder("customer_support") 

17 

18 # Add nodes 

19 builder.add_node("classify", "llm", {"model": "gemini-flash"}) 

20 builder.add_node("search_kb", "tool", {"tool": "knowledge_base"}) 

21 builder.add_node("respond", "llm", {"model": "gemini-flash"}) 

22 

23 # Add edges 

24 builder.add_edge("classify", "search_kb") 

25 builder.add_edge("search_kb", "respond") 

26 

27 # Export to code 

28 code = builder.export_code() 

29 builder.save_code("agent.py") 

30""" 

31 

32from typing import Any 

33 

34from .codegen import CodeGenerator, EdgeDefinition, NodeDefinition, WorkflowDefinition 

35 

36 

37class WorkflowBuilder: 

38 """ 

39 Fluent API for building agent workflows. 

40 

41 Provides convenient methods for programmatic workflow construction. 

42 """ 

43 

44 def __init__(self, name: str, description: str = ""): 

45 """ 

46 Initialize workflow builder. 

47 

48 Args: 

49 name: Workflow name 

50 description: Workflow description 

51 """ 

52 self.name = name 

53 self.description = description 

54 self.nodes: list[NodeDefinition] = [] 

55 self.edges: list[EdgeDefinition] = [] 

56 self.entry_point: str | None = None 

57 self.state_schema: dict[str, str] = {} 

58 

59 def add_node( 

60 self, node_id: str, node_type: str = "custom", config: dict[str, Any] | None = None, label: str = "" 

61 ) -> "WorkflowBuilder": 

62 """ 

63 Add a node to the workflow. 

64 

65 Args: 

66 node_id: Unique node identifier 

67 node_type: Type of node (tool, llm, conditional, approval, custom) 

68 config: Node configuration 

69 label: Display label 

70 

71 Returns: 

72 Self for chaining 

73 

74 Example: 

75 >>> builder.add_node("search", "tool", {"tool": "web_search"}) 

76 """ 

77 node = NodeDefinition(id=node_id, type=node_type, label=label or node_id, config=config or {}) 

78 

79 self.nodes.append(node) 

80 

81 # Set first node as entry point if not set 

82 if self.entry_point is None: 

83 self.entry_point = node_id 

84 

85 return self 

86 

87 def add_edge(self, from_node: str, to_node: str, condition: str | None = None, label: str = "") -> "WorkflowBuilder": 

88 """ 

89 Add an edge between nodes. 

90 

91 Args: 

92 from_node: Source node ID 

93 to_node: Target node ID 

94 condition: Optional condition expression 

95 label: Edge label 

96 

97 Returns: 

98 Self for chaining 

99 

100 Example: 

101 >>> builder.add_edge("search", "summarize") 

102 >>> builder.add_edge("validate", "approved", condition='state["score"] > 0.8') 

103 """ 

104 edge = EdgeDefinition(from_node=from_node, to_node=to_node, condition=condition, label=label) # type: ignore 

105 

106 self.edges.append(edge) 

107 

108 return self 

109 

110 def set_entry_point(self, node_id: str) -> "WorkflowBuilder": 

111 """ 

112 Set the entry point node. 

113 

114 Args: 

115 node_id: Node to use as entry point 

116 

117 Returns: 

118 Self for chaining 

119 """ 

120 self.entry_point = node_id 

121 return self 

122 

123 def add_state_field(self, field_name: str, field_type: str = "str") -> "WorkflowBuilder": 

124 """ 

125 Add a field to the state schema. 

126 

127 Args: 

128 field_name: Name of the field 

129 field_type: Python type annotation (default: str) 

130 

131 Returns: 

132 Self for chaining 

133 

134 Example: 

135 >>> builder.add_state_field("query", "str") 

136 >>> builder.add_state_field("results", "List[str]") 

137 """ 

138 self.state_schema[field_name] = field_type 

139 return self 

140 

141 def build(self) -> WorkflowDefinition: 

142 """ 

143 Build the workflow definition. 

144 

145 Returns: 

146 WorkflowDefinition ready for code generation 

147 

148 Raises: 

149 ValueError: If workflow is invalid 

150 """ 

151 if not self.nodes: 

152 msg = "Workflow must have at least one node" 

153 raise ValueError(msg) 

154 

155 if not self.entry_point: 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 msg = "Workflow must have an entry point" 

157 raise ValueError(msg) 

158 

159 # Validate entry point exists 

160 node_ids = {n.id for n in self.nodes} 

161 if self.entry_point not in node_ids: 

162 msg = f"Entry point '{self.entry_point}' not found in nodes" 

163 raise ValueError(msg) 

164 

165 # Validate all edges reference existing nodes 

166 for edge in self.edges: 

167 if edge.from_node not in node_ids: 

168 msg = f"Edge references non-existent node: {edge.from_node}" 

169 raise ValueError(msg) 

170 if edge.to_node not in node_ids: 

171 msg = f"Edge references non-existent node: {edge.to_node}" 

172 raise ValueError(msg) 

173 

174 return WorkflowDefinition( 

175 name=self.name, 

176 description=self.description, 

177 nodes=self.nodes, 

178 edges=self.edges, 

179 entry_point=self.entry_point, 

180 state_schema=self.state_schema, 

181 ) 

182 

183 def export_code(self) -> str: 

184 """ 

185 Export workflow to Python code. 

186 

187 Returns: 

188 Production-ready Python code 

189 

190 Example: 

191 >>> code = builder.export_code() 

192 >>> print(code) 

193 """ 

194 workflow = self.build() 

195 generator = CodeGenerator() 

196 

197 return generator.generate(workflow) 

198 

199 def save_code(self, output_path: str) -> None: 

200 """ 

201 Export workflow to Python file. 

202 

203 Args: 

204 output_path: Output file path 

205 

206 Example: 

207 >>> builder.save_code("src/agents/my_agent.py") 

208 """ 

209 code = self.export_code() 

210 

211 with open(output_path, "w") as f: 

212 f.write(code) 

213 

214 def to_json(self) -> dict[str, Any]: 

215 """ 

216 Export workflow to JSON for visual builder. 

217 

218 Returns: 

219 JSON-serializable workflow definition 

220 

221 Example: 

222 >>> json_data = builder.to_json() 

223 >>> # Send to frontend visual builder 

224 """ 

225 workflow = self.build() 

226 return workflow.model_dump() 

227 

228 @classmethod 

229 def from_json(cls, data: dict[str, Any]) -> "WorkflowBuilder": 

230 """ 

231 Create builder from JSON (from visual builder). 

232 

233 Args: 

234 data: JSON workflow data 

235 

236 Returns: 

237 WorkflowBuilder instance 

238 

239 Example: 

240 >>> builder = WorkflowBuilder.from_json(visual_builder_data) 

241 >>> code = builder.export_code() 

242 """ 

243 workflow = WorkflowDefinition(**data) 

244 

245 builder = cls(workflow.name, workflow.description) 

246 builder.nodes = workflow.nodes 

247 builder.edges = workflow.edges 

248 builder.entry_point = workflow.entry_point 

249 builder.state_schema = workflow.state_schema 

250 

251 return builder 

252 

253 

254# ============================================================================== 

255# Example Usage 

256# ============================================================================== 

257 

258if __name__ == "__main__": 

259 # Build workflow programmatically 

260 builder = WorkflowBuilder("research_assistant", "Agent that researches and summarizes topics") 

261 

262 # Define state 

263 builder.add_state_field("query", "str") 

264 builder.add_state_field("search_results", "List[str]") 

265 builder.add_state_field("summary", "str") 

266 

267 # Add nodes 

268 builder.add_node("search", "tool", {"tool": "tavily_search"}, "Search Web") 

269 builder.add_node("summarize", "llm", {"model": "gemini-2.5-flash"}, "Summarize Results") 

270 builder.add_node("validate", "conditional", {}, "Validate Quality") 

271 

272 # Add edges 

273 builder.add_edge("search", "summarize") 

274 builder.add_edge("summarize", "validate") 

275 

276 # Set entry 

277 builder.set_entry_point("search") 

278 

279 # Export to code 

280 code = builder.export_code() 

281 

282 print("=" * 80) 

283 print("WORKFLOW BUILDER - GENERATED CODE") 

284 print("=" * 80) 

285 print(code) 

286 print("=" * 80) 

287 

288 # Also show JSON representation 

289 print("\nJSON REPRESENTATION:") 

290 import json 

291 

292 print(json.dumps(builder.to_json(), indent=2))