Coverage for src / mcp_server_langgraph / resilience / timeout.py: 83%

83 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Timeout enforcement for async operations. 

3 

4Prevents hanging requests by enforcing time limits on all async operations. 

5Uses asyncio.timeout() (Python 3.11+) or asyncio.wait_for() (Python 3.10). 

6 

7See ADR-0026 for design rationale. 

8""" 

9 

10import asyncio 

11import functools 

12import logging 

13from collections.abc import Callable 

14from typing import Any, ParamSpec, TypeVar 

15 

16from opentelemetry import trace 

17 

18from mcp_server_langgraph.observability.telemetry import timeout_exceeded_counter 

19from mcp_server_langgraph.resilience.config import get_resilience_config 

20 

21logger = logging.getLogger(__name__) 

22tracer = trace.get_tracer(__name__) 

23 

24P = ParamSpec("P") 

25T = TypeVar("T") 

26 

27 

28class TimeoutConfig: 

29 """Timeout configuration for different operation types""" 

30 

31 def __init__( 

32 self, 

33 default: int = 30, 

34 llm: int = 60, 

35 auth: int = 5, 

36 db: int = 10, 

37 http: int = 15, 

38 ): 

39 self.default = default 

40 self.llm = llm 

41 self.auth = auth 

42 self.db = db 

43 self.http = http 

44 

45 @classmethod 

46 def from_resilience_config(cls) -> "TimeoutConfig": 

47 """Load from global resilience configuration""" 

48 config = get_resilience_config() 

49 return cls( 

50 default=config.timeout.default, 

51 llm=config.timeout.llm, 

52 auth=config.timeout.auth, 

53 db=config.timeout.db, 

54 http=config.timeout.http, 

55 ) 

56 

57 

58def get_timeout_for_operation(operation_type: str) -> int: 

59 """ 

60 Get timeout value for an operation type. 

61 

62 Args: 

63 operation_type: Type of operation (llm, auth, db, http, default) 

64 

65 Returns: 

66 Timeout in seconds 

67 """ 

68 config = TimeoutConfig.from_resilience_config() 

69 return getattr(config, operation_type, config.default) 

70 

71 

72def with_timeout( 

73 seconds: int | None = None, 

74 operation_type: str | None = None, 

75) -> Callable[[Callable[P, T]], Callable[P, T]]: 

76 """ 

77 Decorator to enforce timeout on async functions. 

78 

79 Args: 

80 seconds: Timeout in seconds (overrides operation_type) 

81 operation_type: Type of operation (llm, auth, db, http) for auto-timeout 

82 

83 Usage: 

84 # Explicit timeout 

85 @with_timeout(seconds=30) 

86 async def call_external_api() -> dict[str, Any]: 

87 async with httpx.AsyncClient() as client: 

88 return await client.get("https://api.example.com") 

89 

90 # Auto-timeout based on operation type 

91 @with_timeout(operation_type="llm") 

92 async def generate_response(prompt: str) -> str: 

93 # Uses LLM timeout (60s by default) 

94 return await llm_client.generate(prompt) 

95 

96 # Combine with other resilience patterns 

97 @circuit_breaker(name="llm") 

98 @retry_with_backoff(max_attempts=3) 

99 @with_timeout(operation_type="llm") 

100 async def call_llm(prompt: str) -> str: 

101 return await llm_client.generate(prompt) 

102 """ 

103 

104 def decorator(func: Callable[P, T]) -> Callable[P, T]: 

105 # Determine timeout value 

106 if seconds is not None: 

107 timeout_value = seconds 

108 elif operation_type is not None: 108 ↛ 111line 108 didn't jump to line 111 because the condition on line 108 was always true

109 timeout_value = get_timeout_for_operation(operation_type) 

110 else: 

111 timeout_value = get_timeout_for_operation("default") 

112 

113 @functools.wraps(func) 

114 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T: 

115 """Async wrapper with timeout enforcement""" 

116 with tracer.start_as_current_span( 

117 f"timeout.{func.__name__}", 

118 attributes={ 

119 "timeout.seconds": timeout_value, 

120 "timeout.operation_type": operation_type or "default", 

121 }, 

122 ) as span: 

123 try: 

124 # Use asyncio.timeout() for Python 3.11+ 

125 async with asyncio.timeout(timeout_value): 

126 result = await func(*args, **kwargs) # type: ignore[misc] 

127 

128 span.set_attribute("timeout.exceeded", False) 

129 return result # type: ignore[no-any-return] 

130 

131 except TimeoutError as e: 

132 # Timeout exceeded 

133 span.set_attribute("timeout.exceeded", True) 

134 

135 logger.warning( 

136 f"Timeout exceeded: {func.__name__} ({timeout_value}s)", 

137 extra={ 

138 "function": func.__name__, 

139 "timeout_seconds": timeout_value, 

140 "operation_type": operation_type or "default", 

141 }, 

142 ) 

143 

144 # Emit metric 

145 timeout_exceeded_counter.add( 

146 1, 

147 attributes={ 

148 "function": func.__name__, 

149 "operation_type": operation_type or "default", 

150 "timeout_seconds": timeout_value, 

151 }, 

152 ) 

153 

154 # Wrap in our custom exception 

155 from mcp_server_langgraph.core.exceptions import TimeoutError as MCPTimeoutError 

156 

157 raise MCPTimeoutError( 

158 message=f"Operation timed out after {timeout_value}s", 

159 metadata={ 

160 "function": func.__name__, 

161 "timeout_seconds": timeout_value, 

162 "operation_type": operation_type or "default", 

163 }, 

164 ) from e 

165 

166 # Only works with async functions 

167 import asyncio as aio 

168 

169 if not aio.iscoroutinefunction(func): 

170 msg = f"@with_timeout can only be applied to async functions, got {func.__name__}" 

171 raise TypeError(msg) 

172 

173 return async_wrapper # type: ignore 

174 

175 return decorator 

176 

177 

178class TimeoutContext: 

179 """ 

180 Context manager for timeout enforcement (alternative to decorator). 

181 

182 Usage: 

183 async with TimeoutContext(seconds=30) as timeout: 

184 result = await some_async_operation() 

185 """ 

186 

187 def __init__( 

188 self, 

189 seconds: int | None = None, 

190 operation_type: str | None = None, 

191 ): 

192 if seconds is not None: 

193 self.timeout_value = seconds 

194 elif operation_type is not None: 194 ↛ 197line 194 didn't jump to line 197 because the condition on line 194 was always true

195 self.timeout_value = get_timeout_for_operation(operation_type) 

196 else: 

197 self.timeout_value = get_timeout_for_operation("default") 

198 

199 self.operation_type = operation_type or "default" 

200 self._context_manager = None 

201 

202 async def __aenter__(self) -> None: 

203 """Enter timeout context""" 

204 self._context_manager = asyncio.timeout(self.timeout_value) # type: ignore[assignment] 

205 

206 if self._context_manager: 206 ↛ 209line 206 didn't jump to line 209 because the condition on line 206 was always true

207 await self._context_manager.__aenter__() # type: ignore[unreachable] 

208 

209 return self # type: ignore[return-value] 

210 

211 async def __aexit__(self, exc_type, exc_val, exc_tb: Any) -> None: # type: ignore[no-untyped-def] 

212 """Exit timeout context""" 

213 if self._context_manager: 213 ↛ 227line 213 didn't jump to line 227 because the condition on line 213 was always true

214 try: # type: ignore[unreachable] 

215 await self._context_manager.__aexit__(exc_type, exc_val, exc_tb) 

216 except TimeoutError as e: 

217 # Convert to our custom exception 

218 from mcp_server_langgraph.core.exceptions import TimeoutError as MCPTimeoutError 

219 

220 raise MCPTimeoutError( 

221 message=f"Operation timed out after {self.timeout_value}s", 

222 metadata={ 

223 "timeout_seconds": self.timeout_value, 

224 "operation_type": self.operation_type, 

225 }, 

226 ) from e 

227 elif hasattr(self, "_start_time"): 

228 # Manual timeout check for Python 3.10 

229 elapsed = asyncio.get_event_loop().time() - self._start_time 

230 if elapsed > self.timeout_value and exc_type is None: 

231 from mcp_server_langgraph.core.exceptions import TimeoutError as MCPTimeoutError 

232 

233 raise MCPTimeoutError( 

234 message=f"Operation timed out after {self.timeout_value}s", 

235 metadata={ 

236 "timeout_seconds": self.timeout_value, 

237 "operation_type": self.operation_type, 

238 "elapsed_seconds": elapsed, 

239 }, 

240 ) 

241 

242 return None # Don't suppress exceptions 

243 

244 

245# Convenience functions for common timeout patterns 

246async def run_with_timeout( # type: ignore[no-untyped-def] 

247 coro, 

248 seconds: int | None = None, 

249 operation_type: str | None = None, 

250): 

251 """ 

252 Run a coroutine with timeout. 

253 

254 Args: 

255 coro: Coroutine to run 

256 seconds: Timeout in seconds 

257 operation_type: Operation type for auto-timeout 

258 

259 Returns: 

260 Result of the coroutine 

261 

262 Raises: 

263 TimeoutError: If operation exceeds timeout 

264 

265 Usage: 

266 result = await run_with_timeout( 

267 llm_client.generate(prompt), 

268 operation_type="llm" 

269 ) 

270 """ 

271 async with TimeoutContext(seconds=seconds, operation_type=operation_type): 

272 return await coro