Coverage for src / mcp_server_langgraph / utils / retry.py: 88%
50 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Retry utilities with exponential backoff and jitter.
4Provides decorators and functions for retrying operations that may fail
5transiently, particularly useful for database and network operations.
6"""
8import asyncio
9import random
10import time
11from typing import Callable, TypeVar, Any
12from functools import wraps
14from mcp_server_langgraph.observability.telemetry import logger
16T = TypeVar("T")
19async def retry_with_backoff(
20 func: Callable[..., Any],
21 max_retries: int = 3,
22 initial_delay: float = 1.0,
23 max_delay: float = 32.0,
24 exponential_base: float = 2.0,
25 jitter: bool = True,
26 max_timeout: float | None = 60.0,
27 retryable_exceptions: tuple[type[Exception], ...] | None = None,
28) -> Any:
29 """
30 Retry an async function with exponential backoff and jitter.
32 Args:
33 func: Async function to retry
34 max_retries: Maximum number of retry attempts (default: 3)
35 initial_delay: Initial delay in seconds (default: 1.0)
36 max_delay: Maximum delay between retries (default: 32.0)
37 exponential_base: Base for exponential backoff (default: 2.0)
38 jitter: Add random jitter to avoid thundering herd (default: True)
39 max_timeout: Maximum total time to spend retrying in seconds (default: 60.0)
40 retryable_exceptions: Tuple of exception types to retry (default: all)
42 Returns:
43 Result of the function call
45 Raises:
46 Exception: Re-raises the last exception if all retries are exhausted
48 Example:
49 async def connect_db():
50 return await asyncpg.connect("postgresql://...")
52 result = await retry_with_backoff(
53 connect_db,
54 max_retries=3,
55 initial_delay=1.0
56 )
57 """
58 start_time = time.time()
59 last_exception: Exception | None = None
61 for attempt in range(max_retries + 1): # +1 for initial attempt 61 ↛ 103line 61 didn't jump to line 103 because the loop on line 61 didn't complete
62 try:
63 result = await func()
64 if attempt > 0:
65 logger.info(f"Success after {attempt} retry attempts")
66 return result
68 except Exception as e:
69 last_exception = e
71 # Check if this exception type should be retried
72 if retryable_exceptions and not isinstance(e, retryable_exceptions):
73 logger.debug(f"Non-retryable exception: {type(e).__name__}")
74 raise
76 # Check if we've exhausted retries
77 if attempt >= max_retries:
78 logger.error(f"Failed after {attempt + 1} attempts (including initial): {e}")
79 msg = f"Failed after {attempt + 1} attempts: {e}"
80 raise Exception(msg) from e
82 # Check if we've exceeded max timeout
83 if max_timeout: 83 ↛ 91line 83 didn't jump to line 91 because the condition on line 83 was always true
84 elapsed = time.time() - start_time
85 if elapsed >= max_timeout:
86 logger.error(f"Retry timeout exceeded ({elapsed:.1f}s >= {max_timeout}s)")
87 msg = f"Retry timeout exceeded after {elapsed:.1f}s (max: {max_timeout}s)"
88 raise Exception(msg) from e
90 # Calculate delay with exponential backoff
91 delay = min(initial_delay * (exponential_base**attempt), max_delay)
93 # Add jitter (±20% randomness)
94 if jitter:
95 jitter_factor = random.uniform(0.8, 1.2)
96 delay = delay * jitter_factor
98 logger.warning(f"Attempt {attempt + 1}/{max_retries + 1} failed: {e}. Retrying in {delay:.2f}s...")
100 await asyncio.sleep(delay)
102 # Should never reach here, but just in case
103 if last_exception:
104 raise last_exception
105 msg = "Retry logic error: no exception recorded"
106 raise RuntimeError(msg)
109def async_retry(
110 max_retries: int = 3,
111 initial_delay: float = 1.0,
112 max_delay: float = 32.0,
113 exponential_base: float = 2.0,
114 jitter: bool = True,
115 max_timeout: float | None = 60.0,
116 retryable_exceptions: tuple[type[Exception], ...] | None = None,
117) -> Callable[[Callable[..., Any]], Callable[..., Any]]:
118 """
119 Decorator for retrying async functions with exponential backoff.
121 Args:
122 Same as retry_with_backoff()
124 Returns:
125 Decorated function with retry logic
127 Example:
128 @async_retry(max_retries=3, initial_delay=1.0)
129 async def connect_to_database():
130 return await asyncpg.connect("postgresql://...")
131 """
133 def decorator(func: Callable[..., Any]) -> Callable[..., Any]:
134 @wraps(func)
135 async def wrapper(*args: Any, **kwargs: Any) -> Any:
136 async def _call() -> Any:
137 return await func(*args, **kwargs)
139 return await retry_with_backoff(
140 _call,
141 max_retries=max_retries,
142 initial_delay=initial_delay,
143 max_delay=max_delay,
144 exponential_base=exponential_base,
145 jitter=jitter,
146 max_timeout=max_timeout,
147 retryable_exceptions=retryable_exceptions,
148 )
150 return wrapper
152 return decorator