Coverage for src / mcp_server_langgraph / core / interrupts / approval.py: 99%
84 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Approval System for Human-in-the-Loop Workflows
4Provides approval nodes that pause agent execution pending human review.
6Architecture:
7 Agent Graph → Approval Node (pauses) → Human Decision → Resume/Reject
9Use Cases:
10 - Financial transactions requiring approval
11 - Content publishing workflows
12 - High-stakes automation
13 - Compliance checkpoints
14 - Security-sensitive operations
16Example:
17 from langgraph.graph import StateGraph
18 from mcp_server_langgraph.core.interrupts import ApprovalNode
20 graph = StateGraph(MyState)
21 graph.add_node("action", risky_action)
22 graph.add_node("approval", ApprovalNode("approve_risky_action"))
24 # Add edge with interrupt
25 graph.add_edge("approval", "action")
27 # Execution will pause at approval node
28 app = graph.compile(checkpointer=checkpointer, interrupt_before=["action"])
29"""
31from datetime import datetime, UTC
32from enum import Enum
33from typing import Any
35from pydantic import BaseModel, Field, model_validator
36from typing_extensions import Self
39class ApprovalStatus(str, Enum):
40 """Status of approval request."""
42 PENDING = "pending"
43 APPROVED = "approved"
44 REJECTED = "rejected"
45 EXPIRED = "expired"
48class ApprovalRequired(BaseModel):
49 """
50 Approval request model.
52 Represents a pending approval requiring human decision.
53 """
55 approval_id: str = Field(description="Unique approval ID")
56 node_name: str = Field(description="Node requiring approval")
57 action_description: str = Field(description="What action needs approval")
58 risk_level: str = Field(default="medium", description="Risk level: low, medium, high, critical")
59 context: dict[str, Any] = Field(default_factory=dict, description="Additional context for decision")
60 requested_at: str | None = Field(default=None, description="When approval was requested (set at validation time)")
61 requested_by: str = Field(default="system", description="Who/what requested approval")
62 expires_at: str | None = Field(default=None, description="Optional expiration time")
63 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
65 @model_validator(mode="after")
66 def set_requested_at_timestamp(self) -> Self:
67 """Set requested_at timestamp if not provided. Ensures freezegun compatibility."""
68 if self.requested_at is None:
69 self.requested_at = datetime.now(UTC).isoformat()
70 return self
73class ApprovalResponse(BaseModel):
74 """
75 Approval decision from human reviewer.
76 """
78 approval_id: str = Field(description="ID of the approval request")
79 status: ApprovalStatus = Field(description="Approval decision")
80 approved_by: str = Field(description="Who approved/rejected")
81 approved_at: str | None = Field(default=None, description="When decision was made (set at validation time)")
82 reason: str | None = Field(default=None, description="Reason for decision")
83 modifications: dict[str, Any] | None = Field(default=None, description="Modifications to proposed action")
85 @model_validator(mode="after")
86 def set_approved_at_timestamp(self) -> Self:
87 """Set approved_at timestamp if not provided. Ensures freezegun compatibility."""
88 if self.approved_at is None: 88 ↛ 90line 88 didn't jump to line 90 because the condition on line 88 was always true
89 self.approved_at = datetime.now(UTC).isoformat()
90 return self
93class ApprovalNode:
94 """
95 Approval node that pauses execution for human review.
97 This node integrates with LangGraph's interrupt mechanism to pause
98 execution until a human makes an approval decision.
99 """
101 def __init__(
102 self,
103 approval_name: str,
104 description: str = "",
105 risk_level: str = "medium",
106 auto_approve_timeout: int | None = None,
107 notification_webhook: str | None = None,
108 ):
109 """
110 Initialize approval node.
112 Args:
113 approval_name: Unique name for this approval type
114 description: Description of what needs approval
115 risk_level: Risk level (low, medium, high, critical)
116 auto_approve_timeout: Auto-approve after N seconds (optional)
117 notification_webhook: Webhook URL to notify approvers
118 """
119 self.approval_name = approval_name
120 self.description = description or f"Approval required: {approval_name}"
121 self.risk_level = risk_level
122 self.auto_approve_timeout = auto_approve_timeout
123 self.notification_webhook = notification_webhook
125 def __call__(self, state: dict[str, Any]) -> dict[str, Any]:
126 """
127 Execute approval node.
129 Creates approval request and pauses execution.
131 Args:
132 state: Current agent state
134 Returns:
135 Updated state with approval request
136 """
137 # Generate approval ID
138 approval_id = f"{self.approval_name}_{datetime.now(UTC).timestamp()}"
140 # Create approval request
141 approval_request = ApprovalRequired(
142 approval_id=approval_id,
143 node_name=self.approval_name,
144 action_description=self.description,
145 risk_level=self.risk_level,
146 context=state.copy(), # Include full state for context
147 )
149 # Store approval request in state
150 if "approval_requests" not in state:
151 state["approval_requests"] = []
153 state["approval_requests"].append(approval_request.model_dump())
155 # Mark as pending approval
156 state["pending_approval"] = True
157 state["current_approval_id"] = approval_id
159 # Send notification if webhook configured
160 if self.notification_webhook:
161 self._send_notification(approval_request)
163 # Return state (execution will interrupt here)
164 return state
166 def _send_notification(self, approval: ApprovalRequired) -> None:
167 """
168 Send notification to approvers.
170 Args:
171 approval: Approval request
173 In production, implement:
174 - Webhook POST to notification service
175 - Email notification
176 - Slack/Teams message
177 - Mobile push notification
178 """
179 # Placeholder for notification logic
180 print(f"📧 Notification: Approval required for {approval.node_name}")
181 print(f" Risk Level: {approval.risk_level}")
182 print(f" Description: {approval.action_description}")
185def create_approval_workflow(
186 graph: Any,
187 approval_points: list[str],
188 notification_webhook: str | None = None,
189) -> Any:
190 """
191 Add approval points to an existing graph.
193 Args:
194 graph: LangGraph StateGraph
195 approval_points: List of node names requiring approval
196 notification_webhook: Optional webhook for notifications
198 Returns:
199 Modified graph with approval nodes
201 Example:
202 from langgraph.graph import StateGraph
203 from mcp_server_langgraph.core.interrupts import create_approval_workflow
205 graph = StateGraph(MyState)
206 graph.add_node("risky_action", perform_action)
208 # Add approval before risky action
209 graph = create_approval_workflow(
210 graph,
211 approval_points=["risky_action"],
212 notification_webhook="https://api.example.com/notify"
213 )
215 # Compile with interrupt
216 app = graph.compile(interrupt_before=["risky_action"])
217 """
218 for node_name in approval_points:
219 approval_node = ApprovalNode(node_name, notification_webhook=notification_webhook)
221 # Add approval node before the target
222 graph.add_node(f"approve_{node_name}", approval_node)
224 return graph
227def check_approval_status(state: dict[str, Any], approval_id: str) -> ApprovalStatus:
228 """
229 Check status of an approval request.
231 Args:
232 state: Current state
233 approval_id: Approval ID to check
235 Returns:
236 ApprovalStatus
238 Example:
239 status = check_approval_status(state, "approval_123")
240 if status == ApprovalStatus.APPROVED:
241 # Continue execution
242 pass
243 """
244 approvals = state.get("approval_responses", {})
246 if approval_id in approvals:
247 return ApprovalStatus(approvals[approval_id]["status"])
249 return ApprovalStatus.PENDING
252def approve_action(state: dict[str, Any], approval_id: str, approved_by: str, reason: str | None = None) -> dict[str, Any]:
253 """
254 Approve a pending action.
256 Args:
257 state: Current state
258 approval_id: Approval ID
259 approved_by: Who is approving
260 reason: Optional approval reason
262 Returns:
263 Updated state
265 Example:
266 # Resume execution with approval
267 state = approve_action(
268 state,
269 approval_id="action_123",
270 approved_by="john@example.com",
271 reason="Verified with finance team"
272 )
273 """
274 if "approval_responses" not in state:
275 state["approval_responses"] = {}
277 response = ApprovalResponse(
278 approval_id=approval_id, status=ApprovalStatus.APPROVED, approved_by=approved_by, reason=reason
279 )
281 state["approval_responses"][approval_id] = response.model_dump()
282 state["pending_approval"] = False
284 return state
287def reject_action(state: dict[str, Any], approval_id: str, rejected_by: str, reason: str) -> dict[str, Any]:
288 """
289 Reject a pending action.
291 Args:
292 state: Current state
293 approval_id: Approval ID
294 rejected_by: Who is rejecting
295 reason: Rejection reason (required)
297 Returns:
298 Updated state
300 Example:
301 # Reject action and halt
302 state = reject_action(
303 state,
304 approval_id="action_123",
305 rejected_by="jane@example.com",
306 reason="Risk too high without additional review"
307 )
308 """
309 if "approval_responses" not in state:
310 state["approval_responses"] = {}
312 response = ApprovalResponse(
313 approval_id=approval_id, status=ApprovalStatus.REJECTED, approved_by=rejected_by, reason=reason
314 )
316 state["approval_responses"][approval_id] = response.model_dump()
317 state["pending_approval"] = False
318 state["workflow_halted"] = True
320 return state