Coverage for src / mcp_server_langgraph / resilience / bulkhead.py: 90%
127 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 06:31 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-08 06:31 +0000
1"""
2Bulkhead isolation pattern for resource pool limits.
4Prevents resource exhaustion by limiting concurrent operations per resource type.
5Uses asyncio.Semaphore for concurrency control.
7See ADR-0026 for design rationale.
8"""
10import asyncio
11import functools
12import logging
13from collections.abc import Callable
14from typing import Any, ParamSpec, TypeVar
16from opentelemetry import trace
18from mcp_server_langgraph.observability.telemetry import bulkhead_active_operations_gauge, bulkhead_rejected_counter
19from mcp_server_langgraph.resilience.config import get_provider_limit, get_resilience_config
21logger = logging.getLogger(__name__)
22tracer = trace.get_tracer(__name__)
24P = ParamSpec("P")
25T = TypeVar("T")
28# Provider-specific bulkhead semaphores (separate from resource-type bulkheads)
29_provider_bulkhead_semaphores: dict[str, asyncio.Semaphore] = {}
32def get_provider_bulkhead(provider: str) -> asyncio.Semaphore:
33 """
34 Get or create a bulkhead semaphore for a specific LLM provider.
36 Provider bulkheads are separate from resource-type bulkheads and use
37 provider-specific limits based on upstream rate limits.
39 Args:
40 provider: LLM provider name (e.g., "anthropic", "openai", "vertex_ai")
42 Returns:
43 asyncio.Semaphore configured for the provider's concurrency limit
45 Example:
46 >>> semaphore = get_provider_bulkhead("anthropic")
47 >>> async with semaphore:
48 ... await call_anthropic_api()
49 """
50 if provider in _provider_bulkhead_semaphores:
51 return _provider_bulkhead_semaphores[provider]
53 # Get provider-specific limit from config
54 limit = get_provider_limit(provider)
56 # Create semaphore with provider limit
57 semaphore = asyncio.Semaphore(limit)
58 _provider_bulkhead_semaphores[provider] = semaphore
60 logger.info(
61 f"Created provider bulkhead for {provider}",
62 extra={"provider": provider, "limit": limit},
63 )
65 return semaphore
68def reset_provider_bulkhead(provider: str) -> None:
69 """
70 Reset a provider bulkhead (for testing).
72 Args:
73 provider: Provider name to reset
75 Warning: Only use this for testing! In production, bulkheads should not be reset.
76 """
77 if provider in _provider_bulkhead_semaphores:
78 del _provider_bulkhead_semaphores[provider]
79 logger.warning(f"Provider bulkhead reset for {provider} (testing only)")
82def reset_all_provider_bulkheads() -> None:
83 """
84 Reset all provider bulkheads (for testing).
86 Warning: Only use this for testing! In production, bulkheads should not be reset.
87 """
88 _provider_bulkhead_semaphores.clear()
89 logger.warning("All provider bulkheads reset (testing only)")
92class BulkheadConfig:
93 """Bulkhead configuration for different resource types"""
95 def __init__(
96 self,
97 llm_limit: int = 10,
98 openfga_limit: int = 50,
99 redis_limit: int = 100,
100 db_limit: int = 20,
101 ):
102 self.llm_limit = llm_limit
103 self.openfga_limit = openfga_limit
104 self.redis_limit = redis_limit
105 self.db_limit = db_limit
107 @classmethod
108 def from_resilience_config(cls) -> "BulkheadConfig":
109 """Load from global resilience configuration"""
110 config = get_resilience_config()
111 return cls(
112 llm_limit=config.bulkhead.llm_limit,
113 openfga_limit=config.bulkhead.openfga_limit,
114 redis_limit=config.bulkhead.redis_limit,
115 db_limit=config.bulkhead.db_limit,
116 )
119# Global semaphores for resource types
120_bulkhead_semaphores: dict[str, asyncio.Semaphore] = {}
123def get_bulkhead(resource_type: str, limit: int | None = None) -> asyncio.Semaphore:
124 """
125 Get or create a bulkhead semaphore for a resource type.
127 Args:
128 resource_type: Type of resource (llm, openfga, redis, db, custom)
129 limit: Concurrency limit (optional override)
131 Returns:
132 asyncio.Semaphore for the resource type
133 """
134 if resource_type in _bulkhead_semaphores:
135 return _bulkhead_semaphores[resource_type]
137 # Determine limit
138 if limit is None:
139 config = BulkheadConfig.from_resilience_config()
140 limit = getattr(config, f"{resource_type}_limit", 10) # Default to 10
142 # Create semaphore
143 semaphore = asyncio.Semaphore(limit)
144 _bulkhead_semaphores[resource_type] = semaphore
146 logger.info(
147 f"Created bulkhead for {resource_type}",
148 extra={"resource_type": resource_type, "limit": limit},
149 )
151 return semaphore
154def with_bulkhead(
155 resource_type: str,
156 limit: int | None = None,
157 wait: bool = True,
158) -> Callable[[Callable[P, T]], Callable[P, T]]:
159 """
160 Decorator to limit concurrent executions of a function.
162 Args:
163 resource_type: Type of resource (llm, openfga, redis, db, custom)
164 limit: Concurrency limit (overrides config)
165 wait: If True, wait for slot. If False, reject immediately if no slots available.
167 Usage:
168 # Limit to 10 concurrent LLM calls (default)
169 @with_bulkhead(resource_type="llm", limit=10)
170 async def call_llm(prompt: str) -> str:
171 async with httpx.AsyncClient() as client:
172 response = await client.post(...)
173 return response.json()
175 # Reject immediately if no slots (fail-fast)
176 @with_bulkhead(resource_type="openfga", wait=False)
177 async def check_permission(user: str, resource: str) -> bool:
178 return await openfga_client.check(user, resource)
180 # Combine with other resilience patterns
181 @circuit_breaker(name="llm")
182 @retry_with_backoff(max_attempts=3)
183 @with_timeout(operation_type="llm")
184 @with_bulkhead(resource_type="llm")
185 async def call_llm_with_resilience(prompt: str) -> str:
186 return await llm_client.generate(prompt)
187 """
189 def decorator(func: Callable[P, T]) -> Callable[P, T]:
190 # Get or create bulkhead semaphore
191 semaphore = get_bulkhead(resource_type, limit)
193 @functools.wraps(func)
194 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
195 """Async wrapper with bulkhead isolation"""
196 # Get waiters count safely (attribute may not exist in all Python versions)
197 try:
198 waiters_count = len(semaphore._waiters) if hasattr(semaphore, "_waiters") and semaphore._waiters else 0
199 except (AttributeError, TypeError):
200 waiters_count = 0
202 with tracer.start_as_current_span(
203 f"bulkhead.{resource_type}",
204 attributes={
205 "bulkhead.resource_type": resource_type,
206 "bulkhead.limit": limit or 10,
207 "bulkhead.available": semaphore._value if hasattr(semaphore, "_value") else 0,
208 },
209 ) as span:
210 # Check if slots are available (semaphore._value == 0 means no slots)
211 slots_available = semaphore._value if hasattr(semaphore, "_value") else 1
212 if not wait and slots_available == 0:
213 # No slots available, reject immediately
214 span.set_attribute("bulkhead.rejected", True)
216 logger.warning(
217 f"Bulkhead rejected request: {resource_type}",
218 extra={
219 "resource_type": resource_type,
220 "function": func.__name__,
221 "reason": "no_available_slots",
222 },
223 )
225 # Emit metric
226 try:
227 bulkhead_rejected_counter.add(
228 1,
229 attributes={
230 "resource_type": resource_type,
231 "function": func.__name__,
232 },
233 )
234 except RuntimeError:
235 # Observability not initialized (can happen in tests or during shutdown)
236 pass
238 # Raise our custom exception
239 from mcp_server_langgraph.core.exceptions import BulkheadRejectedError
241 raise BulkheadRejectedError(
242 message=f"Bulkhead rejected request for {resource_type} (no available slots)",
243 metadata={
244 "resource_type": resource_type,
245 "function": func.__name__,
246 "limit": limit or 10,
247 },
248 )
250 # Acquire semaphore (wait if necessary)
251 async with semaphore:
252 span.set_attribute("bulkhead.rejected", False)
253 span.set_attribute("bulkhead.active", waiters_count + 1)
255 # Emit metric for active operations
256 # Get active count safely (since we're inside the semaphore, use the value we calculated earlier)
257 active_count = waiters_count + 1
258 try:
259 bulkhead_active_operations_gauge.set(
260 active_count,
261 attributes={"resource_type": resource_type},
262 )
263 except RuntimeError:
264 # Observability not initialized (can happen in tests or during shutdown)
265 pass
267 # Execute function
268 result = await func(*args, **kwargs) # type: ignore[misc]
270 return result # type: ignore[no-any-return]
272 # Only works with async functions
273 import asyncio as aio
275 if not aio.iscoroutinefunction(func):
276 msg = f"@with_bulkhead can only be applied to async functions, got {func.__name__}"
277 raise TypeError(msg)
279 return async_wrapper # type: ignore
281 return decorator
284class BulkheadContext:
285 """
286 Context manager for bulkhead isolation (alternative to decorator).
288 Usage:
289 async with BulkheadContext(resource_type="llm"):
290 result = await call_llm(prompt)
291 """
293 def __init__(
294 self,
295 resource_type: str,
296 limit: int | None = None,
297 wait: bool = True,
298 ):
299 self.resource_type = resource_type
300 self.semaphore = get_bulkhead(resource_type, limit)
301 self.wait = wait
303 async def __aenter__(self) -> None:
304 """Acquire bulkhead slot"""
305 # Check if slots are available (semaphore._value == 0 means no slots)
306 slots_available = self.semaphore._value if hasattr(self.semaphore, "_value") else 1
307 if not self.wait and slots_available == 0:
308 # Reject immediately if no slots
309 from mcp_server_langgraph.core.exceptions import BulkheadRejectedError
311 # Calculate total limit safely
312 waiters_count = (
313 len(self.semaphore._waiters) if hasattr(self.semaphore, "_waiters") and self.semaphore._waiters else 0
314 )
315 total_limit = self.semaphore._value + waiters_count if hasattr(self.semaphore, "_value") else 10
317 raise BulkheadRejectedError(
318 message=f"Bulkhead rejected request for {self.resource_type}",
319 metadata={
320 "resource_type": self.resource_type,
321 "limit": total_limit,
322 },
323 )
325 await self.semaphore.acquire()
326 return self # type: ignore[return-value]
328 async def __aexit__(self, exc_type, exc_val, exc_tb: Any) -> None: # type: ignore[no-untyped-def]
329 """Release bulkhead slot"""
330 self.semaphore.release()
331 return None # Don't suppress exceptions
334def get_bulkhead_stats(resource_type: str | None = None) -> dict[str, dict[str, int]]:
335 """
336 Get statistics for bulkheads.
338 Args:
339 resource_type: Specific resource type, or None for all
341 Returns:
342 Dictionary with bulkhead statistics
344 Usage:
345 stats = get_bulkhead_stats()
346 # {
347 # "llm": {"limit": 10, "available": 3, "active": 7, "waiting": 2},
348 # "openfga": {"limit": 50, "available": 45, "active": 5, "waiting": 0},
349 # }
350 """
351 stats = {}
353 resource_types = [resource_type] if resource_type else _bulkhead_semaphores.keys()
355 for res_type in resource_types:
356 if res_type not in _bulkhead_semaphores:
357 continue
359 semaphore = _bulkhead_semaphores[res_type]
361 # Safely get semaphore attributes
362 try:
363 available = semaphore._value if hasattr(semaphore, "_value") else 0
364 waiters = semaphore._waiters if hasattr(semaphore, "_waiters") and semaphore._waiters else []
365 active = len(waiters)
366 waiting = len([w for w in waiters if not w.done()]) if waiters else 0
367 except (AttributeError, TypeError):
368 available = 0
369 active = 0
370 waiting = 0
372 stats[res_type] = {
373 "limit": available + active,
374 "available": available,
375 "active": active,
376 "waiting": waiting,
377 }
379 return stats
382def reset_bulkhead(resource_type: str) -> None:
383 """
384 Reset a bulkhead (for testing).
386 Args:
387 resource_type: Resource type to reset
389 Warning: Only use this for testing! In production, bulkheads should not be reset.
390 """
391 if resource_type in _bulkhead_semaphores:
392 del _bulkhead_semaphores[resource_type]
393 logger.warning(f"Bulkhead reset for {resource_type} (testing only)")
396def reset_all_bulkheads() -> None:
397 """
398 Reset all bulkheads (for testing).
400 Warning: Only use this for testing! In production, bulkheads should not be reset.
401 """
402 _bulkhead_semaphores.clear()
403 logger.warning("All bulkheads reset (testing only)")