Coverage for src / mcp_server_langgraph / resilience / timeout.py: 83%
83 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Timeout enforcement for async operations.
4Prevents hanging requests by enforcing time limits on all async operations.
5Uses asyncio.timeout() (Python 3.11+) or asyncio.wait_for() (Python 3.10).
7See ADR-0026 for design rationale.
8"""
10import asyncio
11import functools
12import logging
13from collections.abc import Callable
14from typing import Any, ParamSpec, TypeVar
16from opentelemetry import trace
18from mcp_server_langgraph.observability.telemetry import timeout_exceeded_counter
19from mcp_server_langgraph.resilience.config import get_resilience_config
21logger = logging.getLogger(__name__)
22tracer = trace.get_tracer(__name__)
24P = ParamSpec("P")
25T = TypeVar("T")
28class TimeoutConfig:
29 """Timeout configuration for different operation types"""
31 def __init__(
32 self,
33 default: int = 30,
34 llm: int = 60,
35 auth: int = 5,
36 db: int = 10,
37 http: int = 15,
38 ):
39 self.default = default
40 self.llm = llm
41 self.auth = auth
42 self.db = db
43 self.http = http
45 @classmethod
46 def from_resilience_config(cls) -> "TimeoutConfig":
47 """Load from global resilience configuration"""
48 config = get_resilience_config()
49 return cls(
50 default=config.timeout.default,
51 llm=config.timeout.llm,
52 auth=config.timeout.auth,
53 db=config.timeout.db,
54 http=config.timeout.http,
55 )
58def get_timeout_for_operation(operation_type: str) -> int:
59 """
60 Get timeout value for an operation type.
62 Args:
63 operation_type: Type of operation (llm, auth, db, http, default)
65 Returns:
66 Timeout in seconds
67 """
68 config = TimeoutConfig.from_resilience_config()
69 return getattr(config, operation_type, config.default)
72def with_timeout(
73 seconds: int | None = None,
74 operation_type: str | None = None,
75) -> Callable[[Callable[P, T]], Callable[P, T]]:
76 """
77 Decorator to enforce timeout on async functions.
79 Args:
80 seconds: Timeout in seconds (overrides operation_type)
81 operation_type: Type of operation (llm, auth, db, http) for auto-timeout
83 Usage:
84 # Explicit timeout
85 @with_timeout(seconds=30)
86 async def call_external_api() -> dict[str, Any]:
87 async with httpx.AsyncClient() as client:
88 return await client.get("https://api.example.com")
90 # Auto-timeout based on operation type
91 @with_timeout(operation_type="llm")
92 async def generate_response(prompt: str) -> str:
93 # Uses LLM timeout (60s by default)
94 return await llm_client.generate(prompt)
96 # Combine with other resilience patterns
97 @circuit_breaker(name="llm")
98 @retry_with_backoff(max_attempts=3)
99 @with_timeout(operation_type="llm")
100 async def call_llm(prompt: str) -> str:
101 return await llm_client.generate(prompt)
102 """
104 def decorator(func: Callable[P, T]) -> Callable[P, T]:
105 # Determine timeout value
106 if seconds is not None:
107 timeout_value = seconds
108 elif operation_type is not None: 108 ↛ 111line 108 didn't jump to line 111 because the condition on line 108 was always true
109 timeout_value = get_timeout_for_operation(operation_type)
110 else:
111 timeout_value = get_timeout_for_operation("default")
113 @functools.wraps(func)
114 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
115 """Async wrapper with timeout enforcement"""
116 with tracer.start_as_current_span(
117 f"timeout.{func.__name__}",
118 attributes={
119 "timeout.seconds": timeout_value,
120 "timeout.operation_type": operation_type or "default",
121 },
122 ) as span:
123 try:
124 # Use asyncio.timeout() for Python 3.11+
125 async with asyncio.timeout(timeout_value):
126 result = await func(*args, **kwargs) # type: ignore[misc]
128 span.set_attribute("timeout.exceeded", False)
129 return result # type: ignore[no-any-return]
131 except TimeoutError as e:
132 # Timeout exceeded
133 span.set_attribute("timeout.exceeded", True)
135 logger.warning(
136 f"Timeout exceeded: {func.__name__} ({timeout_value}s)",
137 extra={
138 "function": func.__name__,
139 "timeout_seconds": timeout_value,
140 "operation_type": operation_type or "default",
141 },
142 )
144 # Emit metric
145 timeout_exceeded_counter.add(
146 1,
147 attributes={
148 "function": func.__name__,
149 "operation_type": operation_type or "default",
150 "timeout_seconds": timeout_value,
151 },
152 )
154 # Wrap in our custom exception
155 from mcp_server_langgraph.core.exceptions import TimeoutError as MCPTimeoutError
157 raise MCPTimeoutError(
158 message=f"Operation timed out after {timeout_value}s",
159 metadata={
160 "function": func.__name__,
161 "timeout_seconds": timeout_value,
162 "operation_type": operation_type or "default",
163 },
164 ) from e
166 # Only works with async functions
167 import asyncio as aio
169 if not aio.iscoroutinefunction(func):
170 msg = f"@with_timeout can only be applied to async functions, got {func.__name__}"
171 raise TypeError(msg)
173 return async_wrapper # type: ignore
175 return decorator
178class TimeoutContext:
179 """
180 Context manager for timeout enforcement (alternative to decorator).
182 Usage:
183 async with TimeoutContext(seconds=30) as timeout:
184 result = await some_async_operation()
185 """
187 def __init__(
188 self,
189 seconds: int | None = None,
190 operation_type: str | None = None,
191 ):
192 if seconds is not None:
193 self.timeout_value = seconds
194 elif operation_type is not None: 194 ↛ 197line 194 didn't jump to line 197 because the condition on line 194 was always true
195 self.timeout_value = get_timeout_for_operation(operation_type)
196 else:
197 self.timeout_value = get_timeout_for_operation("default")
199 self.operation_type = operation_type or "default"
200 self._context_manager = None
202 async def __aenter__(self) -> None:
203 """Enter timeout context"""
204 self._context_manager = asyncio.timeout(self.timeout_value) # type: ignore[assignment]
206 if self._context_manager: 206 ↛ 209line 206 didn't jump to line 209 because the condition on line 206 was always true
207 await self._context_manager.__aenter__() # type: ignore[unreachable]
209 return self # type: ignore[return-value]
211 async def __aexit__(self, exc_type, exc_val, exc_tb: Any) -> None: # type: ignore[no-untyped-def]
212 """Exit timeout context"""
213 if self._context_manager: 213 ↛ 227line 213 didn't jump to line 227 because the condition on line 213 was always true
214 try: # type: ignore[unreachable]
215 await self._context_manager.__aexit__(exc_type, exc_val, exc_tb)
216 except TimeoutError as e:
217 # Convert to our custom exception
218 from mcp_server_langgraph.core.exceptions import TimeoutError as MCPTimeoutError
220 raise MCPTimeoutError(
221 message=f"Operation timed out after {self.timeout_value}s",
222 metadata={
223 "timeout_seconds": self.timeout_value,
224 "operation_type": self.operation_type,
225 },
226 ) from e
227 elif hasattr(self, "_start_time"):
228 # Manual timeout check for Python 3.10
229 elapsed = asyncio.get_event_loop().time() - self._start_time
230 if elapsed > self.timeout_value and exc_type is None:
231 from mcp_server_langgraph.core.exceptions import TimeoutError as MCPTimeoutError
233 raise MCPTimeoutError(
234 message=f"Operation timed out after {self.timeout_value}s",
235 metadata={
236 "timeout_seconds": self.timeout_value,
237 "operation_type": self.operation_type,
238 "elapsed_seconds": elapsed,
239 },
240 )
242 return None # Don't suppress exceptions
245# Convenience functions for common timeout patterns
246async def run_with_timeout( # type: ignore[no-untyped-def]
247 coro,
248 seconds: int | None = None,
249 operation_type: str | None = None,
250):
251 """
252 Run a coroutine with timeout.
254 Args:
255 coro: Coroutine to run
256 seconds: Timeout in seconds
257 operation_type: Operation type for auto-timeout
259 Returns:
260 Result of the coroutine
262 Raises:
263 TimeoutError: If operation exceeds timeout
265 Usage:
266 result = await run_with_timeout(
267 llm_client.generate(prompt),
268 operation_type="llm"
269 )
270 """
271 async with TimeoutContext(seconds=seconds, operation_type=operation_type):
272 return await coro