Coverage for src / mcp_server_langgraph / utils / retry.py: 88%

50 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Retry utilities with exponential backoff and jitter. 

3 

4Provides decorators and functions for retrying operations that may fail 

5transiently, particularly useful for database and network operations. 

6""" 

7 

8import asyncio 

9import random 

10import time 

11from typing import Callable, TypeVar, Any 

12from functools import wraps 

13 

14from mcp_server_langgraph.observability.telemetry import logger 

15 

16T = TypeVar("T") 

17 

18 

19async def retry_with_backoff( 

20 func: Callable[..., Any], 

21 max_retries: int = 3, 

22 initial_delay: float = 1.0, 

23 max_delay: float = 32.0, 

24 exponential_base: float = 2.0, 

25 jitter: bool = True, 

26 max_timeout: float | None = 60.0, 

27 retryable_exceptions: tuple[type[Exception], ...] | None = None, 

28) -> Any: 

29 """ 

30 Retry an async function with exponential backoff and jitter. 

31 

32 Args: 

33 func: Async function to retry 

34 max_retries: Maximum number of retry attempts (default: 3) 

35 initial_delay: Initial delay in seconds (default: 1.0) 

36 max_delay: Maximum delay between retries (default: 32.0) 

37 exponential_base: Base for exponential backoff (default: 2.0) 

38 jitter: Add random jitter to avoid thundering herd (default: True) 

39 max_timeout: Maximum total time to spend retrying in seconds (default: 60.0) 

40 retryable_exceptions: Tuple of exception types to retry (default: all) 

41 

42 Returns: 

43 Result of the function call 

44 

45 Raises: 

46 Exception: Re-raises the last exception if all retries are exhausted 

47 

48 Example: 

49 async def connect_db(): 

50 return await asyncpg.connect("postgresql://...") 

51 

52 result = await retry_with_backoff( 

53 connect_db, 

54 max_retries=3, 

55 initial_delay=1.0 

56 ) 

57 """ 

58 start_time = time.time() 

59 last_exception: Exception | None = None 

60 

61 for attempt in range(max_retries + 1): # +1 for initial attempt 61 ↛ 103line 61 didn't jump to line 103 because the loop on line 61 didn't complete

62 try: 

63 result = await func() 

64 if attempt > 0: 

65 logger.info(f"Success after {attempt} retry attempts") 

66 return result 

67 

68 except Exception as e: 

69 last_exception = e 

70 

71 # Check if this exception type should be retried 

72 if retryable_exceptions and not isinstance(e, retryable_exceptions): 

73 logger.debug(f"Non-retryable exception: {type(e).__name__}") 

74 raise 

75 

76 # Check if we've exhausted retries 

77 if attempt >= max_retries: 

78 logger.error(f"Failed after {attempt + 1} attempts (including initial): {e}") 

79 msg = f"Failed after {attempt + 1} attempts: {e}" 

80 raise Exception(msg) from e 

81 

82 # Check if we've exceeded max timeout 

83 if max_timeout: 83 ↛ 91line 83 didn't jump to line 91 because the condition on line 83 was always true

84 elapsed = time.time() - start_time 

85 if elapsed >= max_timeout: 

86 logger.error(f"Retry timeout exceeded ({elapsed:.1f}s >= {max_timeout}s)") 

87 msg = f"Retry timeout exceeded after {elapsed:.1f}s (max: {max_timeout}s)" 

88 raise Exception(msg) from e 

89 

90 # Calculate delay with exponential backoff 

91 delay = min(initial_delay * (exponential_base**attempt), max_delay) 

92 

93 # Add jitter (±20% randomness) 

94 if jitter: 

95 jitter_factor = random.uniform(0.8, 1.2) 

96 delay = delay * jitter_factor 

97 

98 logger.warning(f"Attempt {attempt + 1}/{max_retries + 1} failed: {e}. Retrying in {delay:.2f}s...") 

99 

100 await asyncio.sleep(delay) 

101 

102 # Should never reach here, but just in case 

103 if last_exception: 

104 raise last_exception 

105 msg = "Retry logic error: no exception recorded" 

106 raise RuntimeError(msg) 

107 

108 

109def async_retry( 

110 max_retries: int = 3, 

111 initial_delay: float = 1.0, 

112 max_delay: float = 32.0, 

113 exponential_base: float = 2.0, 

114 jitter: bool = True, 

115 max_timeout: float | None = 60.0, 

116 retryable_exceptions: tuple[type[Exception], ...] | None = None, 

117) -> Callable[[Callable[..., Any]], Callable[..., Any]]: 

118 """ 

119 Decorator for retrying async functions with exponential backoff. 

120 

121 Args: 

122 Same as retry_with_backoff() 

123 

124 Returns: 

125 Decorated function with retry logic 

126 

127 Example: 

128 @async_retry(max_retries=3, initial_delay=1.0) 

129 async def connect_to_database(): 

130 return await asyncpg.connect("postgresql://...") 

131 """ 

132 

133 def decorator(func: Callable[..., Any]) -> Callable[..., Any]: 

134 @wraps(func) 

135 async def wrapper(*args: Any, **kwargs: Any) -> Any: 

136 async def _call() -> Any: 

137 return await func(*args, **kwargs) 

138 

139 return await retry_with_backoff( 

140 _call, 

141 max_retries=max_retries, 

142 initial_delay=initial_delay, 

143 max_delay=max_delay, 

144 exponential_base=exponential_base, 

145 jitter=jitter, 

146 max_timeout=max_timeout, 

147 retryable_exceptions=retryable_exceptions, 

148 ) 

149 

150 return wrapper 

151 

152 return decorator