Coverage for src / mcp_server_langgraph / resilience / bulkhead.py: 92%

111 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Bulkhead isolation pattern for resource pool limits. 

3 

4Prevents resource exhaustion by limiting concurrent operations per resource type. 

5Uses asyncio.Semaphore for concurrency control. 

6 

7See ADR-0026 for design rationale. 

8""" 

9 

10import asyncio 

11import functools 

12import logging 

13from collections.abc import Callable 

14from typing import Any, ParamSpec, TypeVar 

15 

16from opentelemetry import trace 

17 

18from mcp_server_langgraph.observability.telemetry import bulkhead_active_operations_gauge, bulkhead_rejected_counter 

19from mcp_server_langgraph.resilience.config import get_resilience_config 

20 

21logger = logging.getLogger(__name__) 

22tracer = trace.get_tracer(__name__) 

23 

24P = ParamSpec("P") 

25T = TypeVar("T") 

26 

27 

28class BulkheadConfig: 

29 """Bulkhead configuration for different resource types""" 

30 

31 def __init__( 

32 self, 

33 llm_limit: int = 10, 

34 openfga_limit: int = 50, 

35 redis_limit: int = 100, 

36 db_limit: int = 20, 

37 ): 

38 self.llm_limit = llm_limit 

39 self.openfga_limit = openfga_limit 

40 self.redis_limit = redis_limit 

41 self.db_limit = db_limit 

42 

43 @classmethod 

44 def from_resilience_config(cls) -> "BulkheadConfig": 

45 """Load from global resilience configuration""" 

46 config = get_resilience_config() 

47 return cls( 

48 llm_limit=config.bulkhead.llm_limit, 

49 openfga_limit=config.bulkhead.openfga_limit, 

50 redis_limit=config.bulkhead.redis_limit, 

51 db_limit=config.bulkhead.db_limit, 

52 ) 

53 

54 

55# Global semaphores for resource types 

56_bulkhead_semaphores: dict[str, asyncio.Semaphore] = {} 

57 

58 

59def get_bulkhead(resource_type: str, limit: int | None = None) -> asyncio.Semaphore: 

60 """ 

61 Get or create a bulkhead semaphore for a resource type. 

62 

63 Args: 

64 resource_type: Type of resource (llm, openfga, redis, db, custom) 

65 limit: Concurrency limit (optional override) 

66 

67 Returns: 

68 asyncio.Semaphore for the resource type 

69 """ 

70 if resource_type in _bulkhead_semaphores: 

71 return _bulkhead_semaphores[resource_type] 

72 

73 # Determine limit 

74 if limit is None: 

75 config = BulkheadConfig.from_resilience_config() 

76 limit = getattr(config, f"{resource_type}_limit", 10) # Default to 10 

77 

78 # Create semaphore 

79 semaphore = asyncio.Semaphore(limit) 

80 _bulkhead_semaphores[resource_type] = semaphore 

81 

82 logger.info( 

83 f"Created bulkhead for {resource_type}", 

84 extra={"resource_type": resource_type, "limit": limit}, 

85 ) 

86 

87 return semaphore 

88 

89 

90def with_bulkhead( 

91 resource_type: str, 

92 limit: int | None = None, 

93 wait: bool = True, 

94) -> Callable[[Callable[P, T]], Callable[P, T]]: 

95 """ 

96 Decorator to limit concurrent executions of a function. 

97 

98 Args: 

99 resource_type: Type of resource (llm, openfga, redis, db, custom) 

100 limit: Concurrency limit (overrides config) 

101 wait: If True, wait for slot. If False, reject immediately if no slots available. 

102 

103 Usage: 

104 # Limit to 10 concurrent LLM calls 

105 @with_bulkhead(resource_type="llm", limit=10) 

106 async def call_llm(prompt: str) -> str: 

107 async with httpx.AsyncClient() as client: 

108 response = await client.post(...) 

109 return response.json() 

110 

111 # Reject immediately if no slots (fail-fast) 

112 @with_bulkhead(resource_type="openfga", wait=False) 

113 async def check_permission(user: str, resource: str) -> bool: 

114 return await openfga_client.check(user, resource) 

115 

116 # Combine with other resilience patterns 

117 @circuit_breaker(name="llm") 

118 @retry_with_backoff(max_attempts=3) 

119 @with_timeout(operation_type="llm") 

120 @with_bulkhead(resource_type="llm") 

121 async def call_llm_with_resilience(prompt: str) -> str: 

122 return await llm_client.generate(prompt) 

123 """ 

124 

125 def decorator(func: Callable[P, T]) -> Callable[P, T]: 

126 # Get or create bulkhead semaphore 

127 semaphore = get_bulkhead(resource_type, limit) 

128 

129 @functools.wraps(func) 

130 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T: 

131 """Async wrapper with bulkhead isolation""" 

132 # Get waiters count safely (attribute may not exist in all Python versions) 

133 try: 

134 waiters_count = len(semaphore._waiters) if hasattr(semaphore, "_waiters") and semaphore._waiters else 0 

135 except (AttributeError, TypeError): 

136 waiters_count = 0 

137 

138 with tracer.start_as_current_span( 

139 f"bulkhead.{resource_type}", 

140 attributes={ 

141 "bulkhead.resource_type": resource_type, 

142 "bulkhead.limit": limit or 10, 

143 "bulkhead.available": semaphore._value if hasattr(semaphore, "_value") else 0, 

144 }, 

145 ) as span: 

146 # Check if slots are available (semaphore._value == 0 means no slots) 

147 slots_available = semaphore._value if hasattr(semaphore, "_value") else 1 

148 if not wait and slots_available == 0: 

149 # No slots available, reject immediately 

150 span.set_attribute("bulkhead.rejected", True) 

151 

152 logger.warning( 

153 f"Bulkhead rejected request: {resource_type}", 

154 extra={ 

155 "resource_type": resource_type, 

156 "function": func.__name__, 

157 "reason": "no_available_slots", 

158 }, 

159 ) 

160 

161 # Emit metric 

162 try: 

163 bulkhead_rejected_counter.add( 

164 1, 

165 attributes={ 

166 "resource_type": resource_type, 

167 "function": func.__name__, 

168 }, 

169 ) 

170 except RuntimeError: 

171 # Observability not initialized (can happen in tests or during shutdown) 

172 pass 

173 

174 # Raise our custom exception 

175 from mcp_server_langgraph.core.exceptions import BulkheadRejectedError 

176 

177 raise BulkheadRejectedError( 

178 message=f"Bulkhead rejected request for {resource_type} (no available slots)", 

179 metadata={ 

180 "resource_type": resource_type, 

181 "function": func.__name__, 

182 "limit": limit or 10, 

183 }, 

184 ) 

185 

186 # Acquire semaphore (wait if necessary) 

187 async with semaphore: 

188 span.set_attribute("bulkhead.rejected", False) 

189 span.set_attribute("bulkhead.active", waiters_count + 1) 

190 

191 # Emit metric for active operations 

192 # Get active count safely (since we're inside the semaphore, use the value we calculated earlier) 

193 active_count = waiters_count + 1 

194 try: 

195 bulkhead_active_operations_gauge.set( 

196 active_count, 

197 attributes={"resource_type": resource_type}, 

198 ) 

199 except RuntimeError: 

200 # Observability not initialized (can happen in tests or during shutdown) 

201 pass 

202 

203 # Execute function 

204 result = await func(*args, **kwargs) # type: ignore[misc] 

205 

206 return result # type: ignore[no-any-return] 

207 

208 # Only works with async functions 

209 import asyncio as aio 

210 

211 if not aio.iscoroutinefunction(func): 

212 msg = f"@with_bulkhead can only be applied to async functions, got {func.__name__}" 

213 raise TypeError(msg) 

214 

215 return async_wrapper # type: ignore 

216 

217 return decorator 

218 

219 

220class BulkheadContext: 

221 """ 

222 Context manager for bulkhead isolation (alternative to decorator). 

223 

224 Usage: 

225 async with BulkheadContext(resource_type="llm"): 

226 result = await call_llm(prompt) 

227 """ 

228 

229 def __init__( 

230 self, 

231 resource_type: str, 

232 limit: int | None = None, 

233 wait: bool = True, 

234 ): 

235 self.resource_type = resource_type 

236 self.semaphore = get_bulkhead(resource_type, limit) 

237 self.wait = wait 

238 

239 async def __aenter__(self) -> None: 

240 """Acquire bulkhead slot""" 

241 # Check if slots are available (semaphore._value == 0 means no slots) 

242 slots_available = self.semaphore._value if hasattr(self.semaphore, "_value") else 1 

243 if not self.wait and slots_available == 0: 

244 # Reject immediately if no slots 

245 from mcp_server_langgraph.core.exceptions import BulkheadRejectedError 

246 

247 # Calculate total limit safely 

248 waiters_count = ( 

249 len(self.semaphore._waiters) if hasattr(self.semaphore, "_waiters") and self.semaphore._waiters else 0 

250 ) 

251 total_limit = self.semaphore._value + waiters_count if hasattr(self.semaphore, "_value") else 10 

252 

253 raise BulkheadRejectedError( 

254 message=f"Bulkhead rejected request for {self.resource_type}", 

255 metadata={ 

256 "resource_type": self.resource_type, 

257 "limit": total_limit, 

258 }, 

259 ) 

260 

261 await self.semaphore.acquire() 

262 return self # type: ignore[return-value] 

263 

264 async def __aexit__(self, exc_type, exc_val, exc_tb: Any) -> None: # type: ignore[no-untyped-def] 

265 """Release bulkhead slot""" 

266 self.semaphore.release() 

267 return None # Don't suppress exceptions 

268 

269 

270def get_bulkhead_stats(resource_type: str | None = None) -> dict[str, dict[str, int]]: 

271 """ 

272 Get statistics for bulkheads. 

273 

274 Args: 

275 resource_type: Specific resource type, or None for all 

276 

277 Returns: 

278 Dictionary with bulkhead statistics 

279 

280 Usage: 

281 stats = get_bulkhead_stats() 

282 # { 

283 # "llm": {"limit": 10, "available": 3, "active": 7, "waiting": 2}, 

284 # "openfga": {"limit": 50, "available": 45, "active": 5, "waiting": 0}, 

285 # } 

286 """ 

287 stats = {} 

288 

289 resource_types = [resource_type] if resource_type else _bulkhead_semaphores.keys() 

290 

291 for res_type in resource_types: 

292 if res_type not in _bulkhead_semaphores: 

293 continue 

294 

295 semaphore = _bulkhead_semaphores[res_type] 

296 

297 # Safely get semaphore attributes 

298 try: 

299 available = semaphore._value if hasattr(semaphore, "_value") else 0 

300 waiters = semaphore._waiters if hasattr(semaphore, "_waiters") and semaphore._waiters else [] 

301 active = len(waiters) 

302 waiting = len([w for w in waiters if not w.done()]) if waiters else 0 

303 except (AttributeError, TypeError): 

304 available = 0 

305 active = 0 

306 waiting = 0 

307 

308 stats[res_type] = { 

309 "limit": available + active, 

310 "available": available, 

311 "active": active, 

312 "waiting": waiting, 

313 } 

314 

315 return stats 

316 

317 

318def reset_bulkhead(resource_type: str) -> None: 

319 """ 

320 Reset a bulkhead (for testing). 

321 

322 Args: 

323 resource_type: Resource type to reset 

324 

325 Warning: Only use this for testing! In production, bulkheads should not be reset. 

326 """ 

327 if resource_type in _bulkhead_semaphores: 

328 del _bulkhead_semaphores[resource_type] 

329 logger.warning(f"Bulkhead reset for {resource_type} (testing only)") 

330 

331 

332def reset_all_bulkheads() -> None: 

333 """ 

334 Reset all bulkheads (for testing). 

335 

336 Warning: Only use this for testing! In production, bulkheads should not be reset. 

337 """ 

338 _bulkhead_semaphores.clear() 

339 logger.warning("All bulkheads reset (testing only)")