Coverage for src / mcp_server_langgraph / resilience / bulkhead.py: 92%
111 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Bulkhead isolation pattern for resource pool limits.
4Prevents resource exhaustion by limiting concurrent operations per resource type.
5Uses asyncio.Semaphore for concurrency control.
7See ADR-0026 for design rationale.
8"""
10import asyncio
11import functools
12import logging
13from collections.abc import Callable
14from typing import Any, ParamSpec, TypeVar
16from opentelemetry import trace
18from mcp_server_langgraph.observability.telemetry import bulkhead_active_operations_gauge, bulkhead_rejected_counter
19from mcp_server_langgraph.resilience.config import get_resilience_config
21logger = logging.getLogger(__name__)
22tracer = trace.get_tracer(__name__)
24P = ParamSpec("P")
25T = TypeVar("T")
28class BulkheadConfig:
29 """Bulkhead configuration for different resource types"""
31 def __init__(
32 self,
33 llm_limit: int = 10,
34 openfga_limit: int = 50,
35 redis_limit: int = 100,
36 db_limit: int = 20,
37 ):
38 self.llm_limit = llm_limit
39 self.openfga_limit = openfga_limit
40 self.redis_limit = redis_limit
41 self.db_limit = db_limit
43 @classmethod
44 def from_resilience_config(cls) -> "BulkheadConfig":
45 """Load from global resilience configuration"""
46 config = get_resilience_config()
47 return cls(
48 llm_limit=config.bulkhead.llm_limit,
49 openfga_limit=config.bulkhead.openfga_limit,
50 redis_limit=config.bulkhead.redis_limit,
51 db_limit=config.bulkhead.db_limit,
52 )
55# Global semaphores for resource types
56_bulkhead_semaphores: dict[str, asyncio.Semaphore] = {}
59def get_bulkhead(resource_type: str, limit: int | None = None) -> asyncio.Semaphore:
60 """
61 Get or create a bulkhead semaphore for a resource type.
63 Args:
64 resource_type: Type of resource (llm, openfga, redis, db, custom)
65 limit: Concurrency limit (optional override)
67 Returns:
68 asyncio.Semaphore for the resource type
69 """
70 if resource_type in _bulkhead_semaphores:
71 return _bulkhead_semaphores[resource_type]
73 # Determine limit
74 if limit is None:
75 config = BulkheadConfig.from_resilience_config()
76 limit = getattr(config, f"{resource_type}_limit", 10) # Default to 10
78 # Create semaphore
79 semaphore = asyncio.Semaphore(limit)
80 _bulkhead_semaphores[resource_type] = semaphore
82 logger.info(
83 f"Created bulkhead for {resource_type}",
84 extra={"resource_type": resource_type, "limit": limit},
85 )
87 return semaphore
90def with_bulkhead(
91 resource_type: str,
92 limit: int | None = None,
93 wait: bool = True,
94) -> Callable[[Callable[P, T]], Callable[P, T]]:
95 """
96 Decorator to limit concurrent executions of a function.
98 Args:
99 resource_type: Type of resource (llm, openfga, redis, db, custom)
100 limit: Concurrency limit (overrides config)
101 wait: If True, wait for slot. If False, reject immediately if no slots available.
103 Usage:
104 # Limit to 10 concurrent LLM calls
105 @with_bulkhead(resource_type="llm", limit=10)
106 async def call_llm(prompt: str) -> str:
107 async with httpx.AsyncClient() as client:
108 response = await client.post(...)
109 return response.json()
111 # Reject immediately if no slots (fail-fast)
112 @with_bulkhead(resource_type="openfga", wait=False)
113 async def check_permission(user: str, resource: str) -> bool:
114 return await openfga_client.check(user, resource)
116 # Combine with other resilience patterns
117 @circuit_breaker(name="llm")
118 @retry_with_backoff(max_attempts=3)
119 @with_timeout(operation_type="llm")
120 @with_bulkhead(resource_type="llm")
121 async def call_llm_with_resilience(prompt: str) -> str:
122 return await llm_client.generate(prompt)
123 """
125 def decorator(func: Callable[P, T]) -> Callable[P, T]:
126 # Get or create bulkhead semaphore
127 semaphore = get_bulkhead(resource_type, limit)
129 @functools.wraps(func)
130 async def async_wrapper(*args: P.args, **kwargs: P.kwargs) -> T:
131 """Async wrapper with bulkhead isolation"""
132 # Get waiters count safely (attribute may not exist in all Python versions)
133 try:
134 waiters_count = len(semaphore._waiters) if hasattr(semaphore, "_waiters") and semaphore._waiters else 0
135 except (AttributeError, TypeError):
136 waiters_count = 0
138 with tracer.start_as_current_span(
139 f"bulkhead.{resource_type}",
140 attributes={
141 "bulkhead.resource_type": resource_type,
142 "bulkhead.limit": limit or 10,
143 "bulkhead.available": semaphore._value if hasattr(semaphore, "_value") else 0,
144 },
145 ) as span:
146 # Check if slots are available (semaphore._value == 0 means no slots)
147 slots_available = semaphore._value if hasattr(semaphore, "_value") else 1
148 if not wait and slots_available == 0:
149 # No slots available, reject immediately
150 span.set_attribute("bulkhead.rejected", True)
152 logger.warning(
153 f"Bulkhead rejected request: {resource_type}",
154 extra={
155 "resource_type": resource_type,
156 "function": func.__name__,
157 "reason": "no_available_slots",
158 },
159 )
161 # Emit metric
162 try:
163 bulkhead_rejected_counter.add(
164 1,
165 attributes={
166 "resource_type": resource_type,
167 "function": func.__name__,
168 },
169 )
170 except RuntimeError:
171 # Observability not initialized (can happen in tests or during shutdown)
172 pass
174 # Raise our custom exception
175 from mcp_server_langgraph.core.exceptions import BulkheadRejectedError
177 raise BulkheadRejectedError(
178 message=f"Bulkhead rejected request for {resource_type} (no available slots)",
179 metadata={
180 "resource_type": resource_type,
181 "function": func.__name__,
182 "limit": limit or 10,
183 },
184 )
186 # Acquire semaphore (wait if necessary)
187 async with semaphore:
188 span.set_attribute("bulkhead.rejected", False)
189 span.set_attribute("bulkhead.active", waiters_count + 1)
191 # Emit metric for active operations
192 # Get active count safely (since we're inside the semaphore, use the value we calculated earlier)
193 active_count = waiters_count + 1
194 try:
195 bulkhead_active_operations_gauge.set(
196 active_count,
197 attributes={"resource_type": resource_type},
198 )
199 except RuntimeError:
200 # Observability not initialized (can happen in tests or during shutdown)
201 pass
203 # Execute function
204 result = await func(*args, **kwargs) # type: ignore[misc]
206 return result # type: ignore[no-any-return]
208 # Only works with async functions
209 import asyncio as aio
211 if not aio.iscoroutinefunction(func):
212 msg = f"@with_bulkhead can only be applied to async functions, got {func.__name__}"
213 raise TypeError(msg)
215 return async_wrapper # type: ignore
217 return decorator
220class BulkheadContext:
221 """
222 Context manager for bulkhead isolation (alternative to decorator).
224 Usage:
225 async with BulkheadContext(resource_type="llm"):
226 result = await call_llm(prompt)
227 """
229 def __init__(
230 self,
231 resource_type: str,
232 limit: int | None = None,
233 wait: bool = True,
234 ):
235 self.resource_type = resource_type
236 self.semaphore = get_bulkhead(resource_type, limit)
237 self.wait = wait
239 async def __aenter__(self) -> None:
240 """Acquire bulkhead slot"""
241 # Check if slots are available (semaphore._value == 0 means no slots)
242 slots_available = self.semaphore._value if hasattr(self.semaphore, "_value") else 1
243 if not self.wait and slots_available == 0:
244 # Reject immediately if no slots
245 from mcp_server_langgraph.core.exceptions import BulkheadRejectedError
247 # Calculate total limit safely
248 waiters_count = (
249 len(self.semaphore._waiters) if hasattr(self.semaphore, "_waiters") and self.semaphore._waiters else 0
250 )
251 total_limit = self.semaphore._value + waiters_count if hasattr(self.semaphore, "_value") else 10
253 raise BulkheadRejectedError(
254 message=f"Bulkhead rejected request for {self.resource_type}",
255 metadata={
256 "resource_type": self.resource_type,
257 "limit": total_limit,
258 },
259 )
261 await self.semaphore.acquire()
262 return self # type: ignore[return-value]
264 async def __aexit__(self, exc_type, exc_val, exc_tb: Any) -> None: # type: ignore[no-untyped-def]
265 """Release bulkhead slot"""
266 self.semaphore.release()
267 return None # Don't suppress exceptions
270def get_bulkhead_stats(resource_type: str | None = None) -> dict[str, dict[str, int]]:
271 """
272 Get statistics for bulkheads.
274 Args:
275 resource_type: Specific resource type, or None for all
277 Returns:
278 Dictionary with bulkhead statistics
280 Usage:
281 stats = get_bulkhead_stats()
282 # {
283 # "llm": {"limit": 10, "available": 3, "active": 7, "waiting": 2},
284 # "openfga": {"limit": 50, "available": 45, "active": 5, "waiting": 0},
285 # }
286 """
287 stats = {}
289 resource_types = [resource_type] if resource_type else _bulkhead_semaphores.keys()
291 for res_type in resource_types:
292 if res_type not in _bulkhead_semaphores:
293 continue
295 semaphore = _bulkhead_semaphores[res_type]
297 # Safely get semaphore attributes
298 try:
299 available = semaphore._value if hasattr(semaphore, "_value") else 0
300 waiters = semaphore._waiters if hasattr(semaphore, "_waiters") and semaphore._waiters else []
301 active = len(waiters)
302 waiting = len([w for w in waiters if not w.done()]) if waiters else 0
303 except (AttributeError, TypeError):
304 available = 0
305 active = 0
306 waiting = 0
308 stats[res_type] = {
309 "limit": available + active,
310 "available": available,
311 "active": active,
312 "waiting": waiting,
313 }
315 return stats
318def reset_bulkhead(resource_type: str) -> None:
319 """
320 Reset a bulkhead (for testing).
322 Args:
323 resource_type: Resource type to reset
325 Warning: Only use this for testing! In production, bulkheads should not be reset.
326 """
327 if resource_type in _bulkhead_semaphores:
328 del _bulkhead_semaphores[resource_type]
329 logger.warning(f"Bulkhead reset for {resource_type} (testing only)")
332def reset_all_bulkheads() -> None:
333 """
334 Reset all bulkheads (for testing).
336 Warning: Only use this for testing! In production, bulkheads should not be reset.
337 """
338 _bulkhead_semaphores.clear()
339 logger.warning("All bulkheads reset (testing only)")