Coverage for src / mcp_server_langgraph / resilience / bulkhead.py: 90%

127 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-08 06:31 +0000

1""" 

2Bulkhead isolation pattern for resource pool limits. 

3 

4Prevents resource exhaustion by limiting concurrent operations per resource type. 

5Uses asyncio.Semaphore for concurrency control. 

6 

7See ADR-0026 for design rationale. 

8""" 

9 

10import asyncio 

11import functools 

12import logging 

13from collections.abc import Callable 

14from typing import Any, ParamSpec, TypeVar 

15 

16from opentelemetry import trace 

17 

18from mcp_server_langgraph.observability.telemetry import bulkhead_active_operations_gauge, bulkhead_rejected_counter 

19from mcp_server_langgraph.resilience.config import get_provider_limit, get_resilience_config 

20 

21logger = logging.getLogger(__name__) 

22tracer = trace.get_tracer(__name__) 

23 

24P = ParamSpec("P") 

25T = TypeVar("T") 

26 

27 

28# Provider-specific bulkhead semaphores (separate from resource-type bulkheads) 

29_provider_bulkhead_semaphores: dict[str, asyncio.Semaphore] = {} 

30 

31 

32def get_provider_bulkhead(provider: str) -> asyncio.Semaphore: 

33 """ 

34 Get or create a bulkhead semaphore for a specific LLM provider. 

35 

36 Provider bulkheads are separate from resource-type bulkheads and use 

37 provider-specific limits based on upstream rate limits. 

38 

39 Args: 

40 provider: LLM provider name (e.g., "anthropic", "openai", "vertex_ai") 

41 

42 Returns: 

43 asyncio.Semaphore configured for the provider's concurrency limit 

44 

45 Example: 

46 >>> semaphore = get_provider_bulkhead("anthropic") 

47 >>> async with semaphore: 

48 ... await call_anthropic_api() 

49 """ 

50 if provider in _provider_bulkhead_semaphores: 

51 return _provider_bulkhead_semaphores[provider] 

52 

53 # Get provider-specific limit from config 

54 limit = get_provider_limit(provider) 

55 

56 # Create semaphore with provider limit 

57 semaphore = asyncio.Semaphore(limit) 

58 _provider_bulkhead_semaphores[provider] = semaphore 

59 

60 logger.info( 

61 f"Created provider bulkhead for {provider}", 

62 extra={"provider": provider, "limit": limit}, 

63 ) 

64 

65 return semaphore 

66 

67 

68def reset_provider_bulkhead(provider: str) -> None: 

69 """ 

70 Reset a provider bulkhead (for testing). 

71 

72 Args: 

73 provider: Provider name to reset 

74 

75 Warning: Only use this for testing! In production, bulkheads should not be reset. 

76 """ 

77 if provider in _provider_bulkhead_semaphores: 

78 del _provider_bulkhead_semaphores[provider] 

79 logger.warning(f"Provider bulkhead reset for {provider} (testing only)") 

80 

81 

82def reset_all_provider_bulkheads() -> None: 

83 """ 

84 Reset all provider bulkheads (for testing). 

85 

86 Warning: Only use this for testing! In production, bulkheads should not be reset. 

87 """ 

88 _provider_bulkhead_semaphores.clear() 

89 logger.warning("All provider bulkheads reset (testing only)") 

90 

91 

92class BulkheadConfig: 

93 """Bulkhead configuration for different resource types""" 

94 

95 def __init__( 

96 self, 

97 llm_limit: int = 10, 

98 openfga_limit: int = 50, 

99 redis_limit: int = 100, 

100 db_limit: int = 20, 

101 ): 

102 self.llm_limit = llm_limit 

103 self.openfga_limit = openfga_limit 

104 self.redis_limit = redis_limit 

105 self.db_limit = db_limit 

106 

107 @classmethod 

108 def from_resilience_config(cls) -> "BulkheadConfig": 

109 """Load from global resilience configuration""" 

110 config = get_resilience_config() 

111 return cls( 

112 llm_limit=config.bulkhead.llm_limit, 

113 openfga_limit=config.bulkhead.openfga_limit, 

114 redis_limit=config.bulkhead.redis_limit, 

115 db_limit=config.bulkhead.db_limit, 

116 ) 

117 

118 

119# Global semaphores for resource types 

120_bulkhead_semaphores: dict[str, asyncio.Semaphore] = {} 

121 

122 

123def get_bulkhead(resource_type: str, limit: int | None = None) -> asyncio.Semaphore: 

124 """ 

125 Get or create a bulkhead semaphore for a resource type. 

126 

127 Args: 

128 resource_type: Type of resource (llm, openfga, redis, db, custom) 

129 limit: Concurrency limit (optional override) 

130 

131 Returns: 

132 asyncio.Semaphore for the resource type 

133 """ 

134 if resource_type in _bulkhead_semaphores: 

135 return _bulkhead_semaphores[resource_type] 

136 

137 # Determine limit 

138 if limit is None: 

139 config = BulkheadConfig.from_resilience_config() 

140 limit = getattr(config, f"{resource_type}_limit", 10) # Default to 10 

141 

142 # Create semaphore 

143 semaphore = asyncio.Semaphore(limit) 

144 _bulkhead_semaphores[resource_type] = semaphore 

145 

146 logger.info( 

147 f"Created bulkhead for {resource_type}", 

148 extra={"resource_type": resource_type, "limit": limit}, 

149 ) 

150 

151 return semaphore 

152 

153 

154def with_bulkhead( 

155 resource_type: str, 

156 limit: int | None = None, 

157 wait: bool = True, 

158) -> Callable[[Callable[P, T]], Callable[P, T]]: 

159 """ 

160 Decorator to limit concurrent executions of a function. 

161 

162 Args: 

163 resource_type: Type of resource (llm, openfga, redis, db, custom) 

164 limit: Concurrency limit (overrides config) 

165 wait: If True, wait for slot. If False, reject immediately if no slots available. 

166 

167 Usage: 

168 # Limit to 10 concurrent LLM calls (default) 

169 @with_bulkhead(resource_type="llm", limit=10) 

170 async def call_llm(prompt: str) -> str: 

171 async with httpx.AsyncClient() as client: 

172 response = await client.post(...) 

173 return response.json() 

174 

175 # Reject immediately if no slots (fail-fast) 

176 @with_bulkhead(resource_type="openfga", wait=False) 

177 async def check_permission(user: str, resource: str) -> bool: 

178 return await openfga_client.check(user, resource) 

179 

180 # Combine with other resilience patterns 

181 @circuit_breaker(name="llm") 

182 @retry_with_backoff(max_attempts=3) 

183 @with_timeout(operation_type="llm") 

184 @with_bulkhead(resource_type="llm") 

185 async def call_llm_with_resilience(prompt: str) -> str: 

186 return await llm_client.generate(prompt) 

187 """ 

188 

189 def decorator(func: Callable[P, T]) -> Callable[P, T]: 

190 # Get or create bulkhead semaphore 

191 semaphore = get_bulkhead(resource_type, limit) 

192 

193 @functools.wraps(func) 

194 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T: 

195 """Async wrapper with bulkhead isolation""" 

196 # Get waiters count safely (attribute may not exist in all Python versions) 

197 try: 

198 waiters_count = len(semaphore._waiters) if hasattr(semaphore, "_waiters") and semaphore._waiters else 0 

199 except (AttributeError, TypeError): 

200 waiters_count = 0 

201 

202 with tracer.start_as_current_span( 

203 f"bulkhead.{resource_type}", 

204 attributes={ 

205 "bulkhead.resource_type": resource_type, 

206 "bulkhead.limit": limit or 10, 

207 "bulkhead.available": semaphore._value if hasattr(semaphore, "_value") else 0, 

208 }, 

209 ) as span: 

210 # Check if slots are available (semaphore._value == 0 means no slots) 

211 slots_available = semaphore._value if hasattr(semaphore, "_value") else 1 

212 if not wait and slots_available == 0: 

213 # No slots available, reject immediately 

214 span.set_attribute("bulkhead.rejected", True) 

215 

216 logger.warning( 

217 f"Bulkhead rejected request: {resource_type}", 

218 extra={ 

219 "resource_type": resource_type, 

220 "function": func.__name__, 

221 "reason": "no_available_slots", 

222 }, 

223 ) 

224 

225 # Emit metric 

226 try: 

227 bulkhead_rejected_counter.add( 

228 1, 

229 attributes={ 

230 "resource_type": resource_type, 

231 "function": func.__name__, 

232 }, 

233 ) 

234 except RuntimeError: 

235 # Observability not initialized (can happen in tests or during shutdown) 

236 pass 

237 

238 # Raise our custom exception 

239 from mcp_server_langgraph.core.exceptions import BulkheadRejectedError 

240 

241 raise BulkheadRejectedError( 

242 message=f"Bulkhead rejected request for {resource_type} (no available slots)", 

243 metadata={ 

244 "resource_type": resource_type, 

245 "function": func.__name__, 

246 "limit": limit or 10, 

247 }, 

248 ) 

249 

250 # Acquire semaphore (wait if necessary) 

251 async with semaphore: 

252 span.set_attribute("bulkhead.rejected", False) 

253 span.set_attribute("bulkhead.active", waiters_count + 1) 

254 

255 # Emit metric for active operations 

256 # Get active count safely (since we're inside the semaphore, use the value we calculated earlier) 

257 active_count = waiters_count + 1 

258 try: 

259 bulkhead_active_operations_gauge.set( 

260 active_count, 

261 attributes={"resource_type": resource_type}, 

262 ) 

263 except RuntimeError: 

264 # Observability not initialized (can happen in tests or during shutdown) 

265 pass 

266 

267 # Execute function 

268 result = await func(*args, **kwargs) # type: ignore[misc] 

269 

270 return result # type: ignore[no-any-return] 

271 

272 # Only works with async functions 

273 import asyncio as aio 

274 

275 if not aio.iscoroutinefunction(func): 

276 msg = f"@with_bulkhead can only be applied to async functions, got {func.__name__}" 

277 raise TypeError(msg) 

278 

279 return async_wrapper # type: ignore 

280 

281 return decorator 

282 

283 

284class BulkheadContext: 

285 """ 

286 Context manager for bulkhead isolation (alternative to decorator). 

287 

288 Usage: 

289 async with BulkheadContext(resource_type="llm"): 

290 result = await call_llm(prompt) 

291 """ 

292 

293 def __init__( 

294 self, 

295 resource_type: str, 

296 limit: int | None = None, 

297 wait: bool = True, 

298 ): 

299 self.resource_type = resource_type 

300 self.semaphore = get_bulkhead(resource_type, limit) 

301 self.wait = wait 

302 

303 async def __aenter__(self) -> None: 

304 """Acquire bulkhead slot""" 

305 # Check if slots are available (semaphore._value == 0 means no slots) 

306 slots_available = self.semaphore._value if hasattr(self.semaphore, "_value") else 1 

307 if not self.wait and slots_available == 0: 

308 # Reject immediately if no slots 

309 from mcp_server_langgraph.core.exceptions import BulkheadRejectedError 

310 

311 # Calculate total limit safely 

312 waiters_count = ( 

313 len(self.semaphore._waiters) if hasattr(self.semaphore, "_waiters") and self.semaphore._waiters else 0 

314 ) 

315 total_limit = self.semaphore._value + waiters_count if hasattr(self.semaphore, "_value") else 10 

316 

317 raise BulkheadRejectedError( 

318 message=f"Bulkhead rejected request for {self.resource_type}", 

319 metadata={ 

320 "resource_type": self.resource_type, 

321 "limit": total_limit, 

322 }, 

323 ) 

324 

325 await self.semaphore.acquire() 

326 return self # type: ignore[return-value] 

327 

328 async def __aexit__(self, exc_type, exc_val, exc_tb: Any) -> None: # type: ignore[no-untyped-def] 

329 """Release bulkhead slot""" 

330 self.semaphore.release() 

331 return None # Don't suppress exceptions 

332 

333 

334def get_bulkhead_stats(resource_type: str | None = None) -> dict[str, dict[str, int]]: 

335 """ 

336 Get statistics for bulkheads. 

337 

338 Args: 

339 resource_type: Specific resource type, or None for all 

340 

341 Returns: 

342 Dictionary with bulkhead statistics 

343 

344 Usage: 

345 stats = get_bulkhead_stats() 

346 # { 

347 # "llm": {"limit": 10, "available": 3, "active": 7, "waiting": 2}, 

348 # "openfga": {"limit": 50, "available": 45, "active": 5, "waiting": 0}, 

349 # } 

350 """ 

351 stats = {} 

352 

353 resource_types = [resource_type] if resource_type else _bulkhead_semaphores.keys() 

354 

355 for res_type in resource_types: 

356 if res_type not in _bulkhead_semaphores: 

357 continue 

358 

359 semaphore = _bulkhead_semaphores[res_type] 

360 

361 # Safely get semaphore attributes 

362 try: 

363 available = semaphore._value if hasattr(semaphore, "_value") else 0 

364 waiters = semaphore._waiters if hasattr(semaphore, "_waiters") and semaphore._waiters else [] 

365 active = len(waiters) 

366 waiting = len([w for w in waiters if not w.done()]) if waiters else 0 

367 except (AttributeError, TypeError): 

368 available = 0 

369 active = 0 

370 waiting = 0 

371 

372 stats[res_type] = { 

373 "limit": available + active, 

374 "available": available, 

375 "active": active, 

376 "waiting": waiting, 

377 } 

378 

379 return stats 

380 

381 

382def reset_bulkhead(resource_type: str) -> None: 

383 """ 

384 Reset a bulkhead (for testing). 

385 

386 Args: 

387 resource_type: Resource type to reset 

388 

389 Warning: Only use this for testing! In production, bulkheads should not be reset. 

390 """ 

391 if resource_type in _bulkhead_semaphores: 

392 del _bulkhead_semaphores[resource_type] 

393 logger.warning(f"Bulkhead reset for {resource_type} (testing only)") 

394 

395 

396def reset_all_bulkheads() -> None: 

397 """ 

398 Reset all bulkheads (for testing). 

399 

400 Warning: Only use this for testing! In production, bulkheads should not be reset. 

401 """ 

402 _bulkhead_semaphores.clear() 

403 logger.warning("All bulkheads reset (testing only)")