Coverage for src / mcp_server_langgraph / core / interrupts / approval.py: 99%

84 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Approval System for Human-in-the-Loop Workflows 

3 

4Provides approval nodes that pause agent execution pending human review. 

5 

6Architecture: 

7 Agent Graph → Approval Node (pauses) → Human Decision → Resume/Reject 

8 

9Use Cases: 

10 - Financial transactions requiring approval 

11 - Content publishing workflows 

12 - High-stakes automation 

13 - Compliance checkpoints 

14 - Security-sensitive operations 

15 

16Example: 

17 from langgraph.graph import StateGraph 

18 from mcp_server_langgraph.core.interrupts import ApprovalNode 

19 

20 graph = StateGraph(MyState) 

21 graph.add_node("action", risky_action) 

22 graph.add_node("approval", ApprovalNode("approve_risky_action")) 

23 

24 # Add edge with interrupt 

25 graph.add_edge("approval", "action") 

26 

27 # Execution will pause at approval node 

28 app = graph.compile(checkpointer=checkpointer, interrupt_before=["action"]) 

29""" 

30 

31from datetime import datetime, UTC 

32from enum import Enum 

33from typing import Any 

34 

35from pydantic import BaseModel, Field, model_validator 

36from typing_extensions import Self 

37 

38 

39class ApprovalStatus(str, Enum): 

40 """Status of approval request.""" 

41 

42 PENDING = "pending" 

43 APPROVED = "approved" 

44 REJECTED = "rejected" 

45 EXPIRED = "expired" 

46 

47 

48class ApprovalRequired(BaseModel): 

49 """ 

50 Approval request model. 

51 

52 Represents a pending approval requiring human decision. 

53 """ 

54 

55 approval_id: str = Field(description="Unique approval ID") 

56 node_name: str = Field(description="Node requiring approval") 

57 action_description: str = Field(description="What action needs approval") 

58 risk_level: str = Field(default="medium", description="Risk level: low, medium, high, critical") 

59 context: dict[str, Any] = Field(default_factory=dict, description="Additional context for decision") 

60 requested_at: str | None = Field(default=None, description="When approval was requested (set at validation time)") 

61 requested_by: str = Field(default="system", description="Who/what requested approval") 

62 expires_at: str | None = Field(default=None, description="Optional expiration time") 

63 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata") 

64 

65 @model_validator(mode="after") 

66 def set_requested_at_timestamp(self) -> Self: 

67 """Set requested_at timestamp if not provided. Ensures freezegun compatibility.""" 

68 if self.requested_at is None: 

69 self.requested_at = datetime.now(UTC).isoformat() 

70 return self 

71 

72 

73class ApprovalResponse(BaseModel): 

74 """ 

75 Approval decision from human reviewer. 

76 """ 

77 

78 approval_id: str = Field(description="ID of the approval request") 

79 status: ApprovalStatus = Field(description="Approval decision") 

80 approved_by: str = Field(description="Who approved/rejected") 

81 approved_at: str | None = Field(default=None, description="When decision was made (set at validation time)") 

82 reason: str | None = Field(default=None, description="Reason for decision") 

83 modifications: dict[str, Any] | None = Field(default=None, description="Modifications to proposed action") 

84 

85 @model_validator(mode="after") 

86 def set_approved_at_timestamp(self) -> Self: 

87 """Set approved_at timestamp if not provided. Ensures freezegun compatibility.""" 

88 if self.approved_at is None: 88 ↛ 90line 88 didn't jump to line 90 because the condition on line 88 was always true

89 self.approved_at = datetime.now(UTC).isoformat() 

90 return self 

91 

92 

93class ApprovalNode: 

94 """ 

95 Approval node that pauses execution for human review. 

96 

97 This node integrates with LangGraph's interrupt mechanism to pause 

98 execution until a human makes an approval decision. 

99 """ 

100 

101 def __init__( 

102 self, 

103 approval_name: str, 

104 description: str = "", 

105 risk_level: str = "medium", 

106 auto_approve_timeout: int | None = None, 

107 notification_webhook: str | None = None, 

108 ): 

109 """ 

110 Initialize approval node. 

111 

112 Args: 

113 approval_name: Unique name for this approval type 

114 description: Description of what needs approval 

115 risk_level: Risk level (low, medium, high, critical) 

116 auto_approve_timeout: Auto-approve after N seconds (optional) 

117 notification_webhook: Webhook URL to notify approvers 

118 """ 

119 self.approval_name = approval_name 

120 self.description = description or f"Approval required: {approval_name}" 

121 self.risk_level = risk_level 

122 self.auto_approve_timeout = auto_approve_timeout 

123 self.notification_webhook = notification_webhook 

124 

125 def __call__(self, state: dict[str, Any]) -> dict[str, Any]: 

126 """ 

127 Execute approval node. 

128 

129 Creates approval request and pauses execution. 

130 

131 Args: 

132 state: Current agent state 

133 

134 Returns: 

135 Updated state with approval request 

136 """ 

137 # Generate approval ID 

138 approval_id = f"{self.approval_name}_{datetime.now(UTC).timestamp()}" 

139 

140 # Create approval request 

141 approval_request = ApprovalRequired( 

142 approval_id=approval_id, 

143 node_name=self.approval_name, 

144 action_description=self.description, 

145 risk_level=self.risk_level, 

146 context=state.copy(), # Include full state for context 

147 ) 

148 

149 # Store approval request in state 

150 if "approval_requests" not in state: 

151 state["approval_requests"] = [] 

152 

153 state["approval_requests"].append(approval_request.model_dump()) 

154 

155 # Mark as pending approval 

156 state["pending_approval"] = True 

157 state["current_approval_id"] = approval_id 

158 

159 # Send notification if webhook configured 

160 if self.notification_webhook: 

161 self._send_notification(approval_request) 

162 

163 # Return state (execution will interrupt here) 

164 return state 

165 

166 def _send_notification(self, approval: ApprovalRequired) -> None: 

167 """ 

168 Send notification to approvers. 

169 

170 Args: 

171 approval: Approval request 

172 

173 In production, implement: 

174 - Webhook POST to notification service 

175 - Email notification 

176 - Slack/Teams message 

177 - Mobile push notification 

178 """ 

179 # Placeholder for notification logic 

180 print(f"📧 Notification: Approval required for {approval.node_name}") 

181 print(f" Risk Level: {approval.risk_level}") 

182 print(f" Description: {approval.action_description}") 

183 

184 

185def create_approval_workflow( 

186 graph: Any, 

187 approval_points: list[str], 

188 notification_webhook: str | None = None, 

189) -> Any: 

190 """ 

191 Add approval points to an existing graph. 

192 

193 Args: 

194 graph: LangGraph StateGraph 

195 approval_points: List of node names requiring approval 

196 notification_webhook: Optional webhook for notifications 

197 

198 Returns: 

199 Modified graph with approval nodes 

200 

201 Example: 

202 from langgraph.graph import StateGraph 

203 from mcp_server_langgraph.core.interrupts import create_approval_workflow 

204 

205 graph = StateGraph(MyState) 

206 graph.add_node("risky_action", perform_action) 

207 

208 # Add approval before risky action 

209 graph = create_approval_workflow( 

210 graph, 

211 approval_points=["risky_action"], 

212 notification_webhook="https://api.example.com/notify" 

213 ) 

214 

215 # Compile with interrupt 

216 app = graph.compile(interrupt_before=["risky_action"]) 

217 """ 

218 for node_name in approval_points: 

219 approval_node = ApprovalNode(node_name, notification_webhook=notification_webhook) 

220 

221 # Add approval node before the target 

222 graph.add_node(f"approve_{node_name}", approval_node) 

223 

224 return graph 

225 

226 

227def check_approval_status(state: dict[str, Any], approval_id: str) -> ApprovalStatus: 

228 """ 

229 Check status of an approval request. 

230 

231 Args: 

232 state: Current state 

233 approval_id: Approval ID to check 

234 

235 Returns: 

236 ApprovalStatus 

237 

238 Example: 

239 status = check_approval_status(state, "approval_123") 

240 if status == ApprovalStatus.APPROVED: 

241 # Continue execution 

242 pass 

243 """ 

244 approvals = state.get("approval_responses", {}) 

245 

246 if approval_id in approvals: 

247 return ApprovalStatus(approvals[approval_id]["status"]) 

248 

249 return ApprovalStatus.PENDING 

250 

251 

252def approve_action(state: dict[str, Any], approval_id: str, approved_by: str, reason: str | None = None) -> dict[str, Any]: 

253 """ 

254 Approve a pending action. 

255 

256 Args: 

257 state: Current state 

258 approval_id: Approval ID 

259 approved_by: Who is approving 

260 reason: Optional approval reason 

261 

262 Returns: 

263 Updated state 

264 

265 Example: 

266 # Resume execution with approval 

267 state = approve_action( 

268 state, 

269 approval_id="action_123", 

270 approved_by="john@example.com", 

271 reason="Verified with finance team" 

272 ) 

273 """ 

274 if "approval_responses" not in state: 

275 state["approval_responses"] = {} 

276 

277 response = ApprovalResponse( 

278 approval_id=approval_id, status=ApprovalStatus.APPROVED, approved_by=approved_by, reason=reason 

279 ) 

280 

281 state["approval_responses"][approval_id] = response.model_dump() 

282 state["pending_approval"] = False 

283 

284 return state 

285 

286 

287def reject_action(state: dict[str, Any], approval_id: str, rejected_by: str, reason: str) -> dict[str, Any]: 

288 """ 

289 Reject a pending action. 

290 

291 Args: 

292 state: Current state 

293 approval_id: Approval ID 

294 rejected_by: Who is rejecting 

295 reason: Rejection reason (required) 

296 

297 Returns: 

298 Updated state 

299 

300 Example: 

301 # Reject action and halt 

302 state = reject_action( 

303 state, 

304 approval_id="action_123", 

305 rejected_by="jane@example.com", 

306 reason="Risk too high without additional review" 

307 ) 

308 """ 

309 if "approval_responses" not in state: 

310 state["approval_responses"] = {} 

311 

312 response = ApprovalResponse( 

313 approval_id=approval_id, status=ApprovalStatus.REJECTED, approved_by=rejected_by, reason=reason 

314 ) 

315 

316 state["approval_responses"][approval_id] = response.model_dump() 

317 state["pending_approval"] = False 

318 state["workflow_halted"] = True 

319 

320 return state