Coverage for src / mcp_server_langgraph / core / interrupts / interrupts.py: 100%
48 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Interrupt System for Human-in-the-Loop Workflows
4Core interrupt handling for pausing and resuming agent execution.
6Supports:
7- Manual approval points
8- Automatic validation checkpoints
9- Time-based interrupts
10- Conditional interrupts based on state
11"""
13from collections.abc import Callable
14from datetime import datetime, UTC
15from enum import Enum
16from typing import Any
18from pydantic import BaseModel, Field
21class InterruptType(str, Enum):
22 """Type of interrupt."""
24 APPROVAL = "approval" # Human approval required
25 VALIDATION = "validation" # Automatic validation checkpoint
26 TIMEOUT = "timeout" # Time-based interrupt
27 CONDITIONAL = "conditional" # Conditional based on state
30class InterruptConfig(BaseModel):
31 """Configuration for interrupt handling."""
33 interrupt_type: InterruptType = Field(description="Type of interrupt")
34 node_name: str = Field(description="Node where interrupt occurs")
35 condition: Callable[[dict[str, Any]], bool] | None = Field(default=None, description="Optional condition function")
36 timeout_seconds: int | None = Field(default=None, description="Timeout in seconds")
37 notification_channels: list[str] = Field(default_factory=list, description="Channels to notify")
38 auto_resume: bool = Field(default=False, description="Whether to auto-resume after timeout")
41class InterruptHandler:
42 """
43 Handler for managing interrupts in agent workflows.
45 Provides mechanisms to pause, resume, and manage interrupted executions.
46 """
48 def __init__(self) -> None:
49 """Initialize interrupt handler."""
50 self.pending_interrupts: dict[str, InterruptConfig] = {}
51 self.interrupt_history: list[dict[str, Any]] = []
53 def register_interrupt(self, config: InterruptConfig) -> str:
54 """
55 Register an interrupt point.
57 Args:
58 config: Interrupt configuration
60 Returns:
61 Interrupt ID
62 """
63 interrupt_id = f"{config.node_name}_{len(self.pending_interrupts)}"
64 self.pending_interrupts[interrupt_id] = config
66 return interrupt_id
68 def should_interrupt(self, node_name: str, state: dict[str, Any]) -> bool:
69 """
70 Check if execution should interrupt at this node.
72 Args:
73 node_name: Current node name
74 state: Current state
76 Returns:
77 True if should interrupt
78 """
79 for config in self.pending_interrupts.values():
80 if config.node_name == node_name:
81 # Check condition if provided
82 if config.condition:
83 return config.condition(state)
84 return True
86 return False
88 def handle_interrupt(self, node_name: str, state: dict[str, Any]) -> dict[str, Any]:
89 """
90 Handle interrupt at node.
92 Args:
93 node_name: Node name
94 state: Current state
96 Returns:
97 Updated state with interrupt metadata
98 """
99 state["interrupted"] = True
100 state["interrupt_node"] = node_name
101 state["interrupt_timestamp"] = datetime.now(UTC).isoformat()
103 # Add to history
104 self.interrupt_history.append({"node": node_name, "state": state.copy(), "timestamp": state["interrupt_timestamp"]})
106 return state
108 def resume(self, interrupt_id: str, state: dict[str, Any]) -> dict[str, Any]:
109 """
110 Resume execution after interrupt.
112 Args:
113 interrupt_id: Interrupt to resume
114 state: State to resume with
116 Returns:
117 Updated state
118 """
119 state["interrupted"] = False
120 state["resumed_at"] = datetime.now(UTC).isoformat()
122 return state
125def create_interrupt_handler() -> InterruptHandler:
126 """
127 Create interrupt handler instance.
129 Returns:
130 InterruptHandler
132 Example:
133 handler = create_interrupt_handler()
134 handler.register_interrupt(InterruptConfig(
135 interrupt_type=InterruptType.APPROVAL,
136 node_name="risky_action"
137 ))
138 """
139 return InterruptHandler()
142# ==============================================================================
143# Conditional Interrupts
144# ==============================================================================
147def create_conditional_interrupt(condition: Callable[[dict[str, Any]], bool], node_name: str) -> InterruptConfig:
148 """
149 Create conditional interrupt that fires based on state.
151 Args:
152 condition: Function that returns True if should interrupt
153 node_name: Node to interrupt
155 Returns:
156 InterruptConfig
158 Example:
159 # Interrupt if amount exceeds threshold
160 def high_value_transaction(state):
161 return state.get("amount", 0) > 10000
163 interrupt = create_conditional_interrupt(
164 high_value_transaction,
165 "process_payment"
166 )
167 """
168 return InterruptConfig(interrupt_type=InterruptType.CONDITIONAL, node_name=node_name, condition=condition)
171def create_timeout_interrupt(node_name: str, timeout_seconds: int) -> InterruptConfig:
172 """
173 Create time-based interrupt.
175 Args:
176 node_name: Node to interrupt
177 timeout_seconds: Timeout duration
179 Returns:
180 InterruptConfig
182 Example:
183 # Interrupt if node takes >30 seconds
184 interrupt = create_timeout_interrupt("long_running_task", 30)
185 """
186 return InterruptConfig(interrupt_type=InterruptType.TIMEOUT, node_name=node_name, timeout_seconds=timeout_seconds)
189# ==============================================================================
190# Example Usage
191# ==============================================================================
193if __name__ == "__main__":
194 # Create handler
195 handler = create_interrupt_handler()
197 # Register approval interrupt
198 approval_interrupt = InterruptConfig(
199 interrupt_type=InterruptType.APPROVAL, node_name="execute_trade", notification_channels=["email", "slack"]
200 )
202 interrupt_id = handler.register_interrupt(approval_interrupt)
204 print(f"Registered interrupt: {interrupt_id}")
206 # Simulate checking interrupt
207 test_state = {"action": "execute_trade", "amount": 50000}
209 if handler.should_interrupt("execute_trade", test_state):
210 print("✋ Interrupt triggered at node: execute_trade")
211 state = handler.handle_interrupt("execute_trade", test_state)
212 print(f" State marked as interrupted: {state['interrupted']}")
213 print(f" Timestamp: {state['interrupt_timestamp']}")
215 # Simulate approval
216 print("\n⏳ Waiting for human approval...")
217 print(" (In production: webhook notification sent)")
219 # Resume
220 state = handler.resume(interrupt_id, state)
221 print(f"\n✅ Execution resumed at: {state['resumed_at']}")