Coverage for src / mcp_server_langgraph / core / interrupts / interrupts.py: 100%

48 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Interrupt System for Human-in-the-Loop Workflows 

3 

4Core interrupt handling for pausing and resuming agent execution. 

5 

6Supports: 

7- Manual approval points 

8- Automatic validation checkpoints 

9- Time-based interrupts 

10- Conditional interrupts based on state 

11""" 

12 

13from collections.abc import Callable 

14from datetime import datetime, UTC 

15from enum import Enum 

16from typing import Any 

17 

18from pydantic import BaseModel, Field 

19 

20 

21class InterruptType(str, Enum): 

22 """Type of interrupt.""" 

23 

24 APPROVAL = "approval" # Human approval required 

25 VALIDATION = "validation" # Automatic validation checkpoint 

26 TIMEOUT = "timeout" # Time-based interrupt 

27 CONDITIONAL = "conditional" # Conditional based on state 

28 

29 

30class InterruptConfig(BaseModel): 

31 """Configuration for interrupt handling.""" 

32 

33 interrupt_type: InterruptType = Field(description="Type of interrupt") 

34 node_name: str = Field(description="Node where interrupt occurs") 

35 condition: Callable[[dict[str, Any]], bool] | None = Field(default=None, description="Optional condition function") 

36 timeout_seconds: int | None = Field(default=None, description="Timeout in seconds") 

37 notification_channels: list[str] = Field(default_factory=list, description="Channels to notify") 

38 auto_resume: bool = Field(default=False, description="Whether to auto-resume after timeout") 

39 

40 

41class InterruptHandler: 

42 """ 

43 Handler for managing interrupts in agent workflows. 

44 

45 Provides mechanisms to pause, resume, and manage interrupted executions. 

46 """ 

47 

48 def __init__(self) -> None: 

49 """Initialize interrupt handler.""" 

50 self.pending_interrupts: dict[str, InterruptConfig] = {} 

51 self.interrupt_history: list[dict[str, Any]] = [] 

52 

53 def register_interrupt(self, config: InterruptConfig) -> str: 

54 """ 

55 Register an interrupt point. 

56 

57 Args: 

58 config: Interrupt configuration 

59 

60 Returns: 

61 Interrupt ID 

62 """ 

63 interrupt_id = f"{config.node_name}_{len(self.pending_interrupts)}" 

64 self.pending_interrupts[interrupt_id] = config 

65 

66 return interrupt_id 

67 

68 def should_interrupt(self, node_name: str, state: dict[str, Any]) -> bool: 

69 """ 

70 Check if execution should interrupt at this node. 

71 

72 Args: 

73 node_name: Current node name 

74 state: Current state 

75 

76 Returns: 

77 True if should interrupt 

78 """ 

79 for config in self.pending_interrupts.values(): 

80 if config.node_name == node_name: 

81 # Check condition if provided 

82 if config.condition: 

83 return config.condition(state) 

84 return True 

85 

86 return False 

87 

88 def handle_interrupt(self, node_name: str, state: dict[str, Any]) -> dict[str, Any]: 

89 """ 

90 Handle interrupt at node. 

91 

92 Args: 

93 node_name: Node name 

94 state: Current state 

95 

96 Returns: 

97 Updated state with interrupt metadata 

98 """ 

99 state["interrupted"] = True 

100 state["interrupt_node"] = node_name 

101 state["interrupt_timestamp"] = datetime.now(UTC).isoformat() 

102 

103 # Add to history 

104 self.interrupt_history.append({"node": node_name, "state": state.copy(), "timestamp": state["interrupt_timestamp"]}) 

105 

106 return state 

107 

108 def resume(self, interrupt_id: str, state: dict[str, Any]) -> dict[str, Any]: 

109 """ 

110 Resume execution after interrupt. 

111 

112 Args: 

113 interrupt_id: Interrupt to resume 

114 state: State to resume with 

115 

116 Returns: 

117 Updated state 

118 """ 

119 state["interrupted"] = False 

120 state["resumed_at"] = datetime.now(UTC).isoformat() 

121 

122 return state 

123 

124 

125def create_interrupt_handler() -> InterruptHandler: 

126 """ 

127 Create interrupt handler instance. 

128 

129 Returns: 

130 InterruptHandler 

131 

132 Example: 

133 handler = create_interrupt_handler() 

134 handler.register_interrupt(InterruptConfig( 

135 interrupt_type=InterruptType.APPROVAL, 

136 node_name="risky_action" 

137 )) 

138 """ 

139 return InterruptHandler() 

140 

141 

142# ============================================================================== 

143# Conditional Interrupts 

144# ============================================================================== 

145 

146 

147def create_conditional_interrupt(condition: Callable[[dict[str, Any]], bool], node_name: str) -> InterruptConfig: 

148 """ 

149 Create conditional interrupt that fires based on state. 

150 

151 Args: 

152 condition: Function that returns True if should interrupt 

153 node_name: Node to interrupt 

154 

155 Returns: 

156 InterruptConfig 

157 

158 Example: 

159 # Interrupt if amount exceeds threshold 

160 def high_value_transaction(state): 

161 return state.get("amount", 0) > 10000 

162 

163 interrupt = create_conditional_interrupt( 

164 high_value_transaction, 

165 "process_payment" 

166 ) 

167 """ 

168 return InterruptConfig(interrupt_type=InterruptType.CONDITIONAL, node_name=node_name, condition=condition) 

169 

170 

171def create_timeout_interrupt(node_name: str, timeout_seconds: int) -> InterruptConfig: 

172 """ 

173 Create time-based interrupt. 

174 

175 Args: 

176 node_name: Node to interrupt 

177 timeout_seconds: Timeout duration 

178 

179 Returns: 

180 InterruptConfig 

181 

182 Example: 

183 # Interrupt if node takes >30 seconds 

184 interrupt = create_timeout_interrupt("long_running_task", 30) 

185 """ 

186 return InterruptConfig(interrupt_type=InterruptType.TIMEOUT, node_name=node_name, timeout_seconds=timeout_seconds) 

187 

188 

189# ============================================================================== 

190# Example Usage 

191# ============================================================================== 

192 

193if __name__ == "__main__": 

194 # Create handler 

195 handler = create_interrupt_handler() 

196 

197 # Register approval interrupt 

198 approval_interrupt = InterruptConfig( 

199 interrupt_type=InterruptType.APPROVAL, node_name="execute_trade", notification_channels=["email", "slack"] 

200 ) 

201 

202 interrupt_id = handler.register_interrupt(approval_interrupt) 

203 

204 print(f"Registered interrupt: {interrupt_id}") 

205 

206 # Simulate checking interrupt 

207 test_state = {"action": "execute_trade", "amount": 50000} 

208 

209 if handler.should_interrupt("execute_trade", test_state): 

210 print("✋ Interrupt triggered at node: execute_trade") 

211 state = handler.handle_interrupt("execute_trade", test_state) 

212 print(f" State marked as interrupted: {state['interrupted']}") 

213 print(f" Timestamp: {state['interrupt_timestamp']}") 

214 

215 # Simulate approval 

216 print("\n⏳ Waiting for human approval...") 

217 print(" (In production: webhook notification sent)") 

218 

219 # Resume 

220 state = handler.resume(interrupt_id, state) 

221 print(f"\n✅ Execution resumed at: {state['resumed_at']}")