Coverage for src / mcp_server_langgraph / resilience / metrics.py: 98%
62 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Resilience metrics for observability.
4Provides OpenTelemetry metrics for all resilience patterns:
5- Circuit breaker state changes and failures
6- Retry attempts and exhaustion
7- Timeout violations
8- Bulkhead rejections and active operations
9- Fallback usage
11These metrics integrate with the existing observability stack.
12"""
14from typing import Any
16from opentelemetry import metrics
18# Get meter from observability stack
19meter = metrics.get_meter(__name__)
22# ==============================================================================
23# Circuit Breaker Metrics
24# ==============================================================================
26circuit_breaker_state_gauge = meter.create_gauge(
27 name="circuit_breaker.state",
28 description="Circuit breaker state (0=closed, 1=open, 0.5=half-open)",
29 unit="1",
30)
32circuit_breaker_failure_counter = meter.create_counter(
33 name="circuit_breaker.failures",
34 description="Total circuit breaker failures",
35 unit="1",
36)
38circuit_breaker_success_counter = meter.create_counter(
39 name="circuit_breaker.successes",
40 description="Total circuit breaker successes",
41 unit="1",
42)
44circuit_breaker_state_change_counter = meter.create_counter(
45 name="circuit_breaker.state_changes",
46 description="Total circuit breaker state changes",
47 unit="1",
48)
51# ==============================================================================
52# Retry Metrics
53# ==============================================================================
55retry_attempt_counter = meter.create_counter(
56 name="retry.attempts",
57 description="Total retry attempts",
58 unit="1",
59)
61retry_exhausted_counter = meter.create_counter(
62 name="retry.exhausted",
63 description="Total retry exhaustion events (all attempts failed)",
64 unit="1",
65)
67retry_success_after_retry_counter = meter.create_counter(
68 name="retry.success_after_retry",
69 description="Total successful retries (succeeded on attempt > 1)",
70 unit="1",
71)
74# ==============================================================================
75# Timeout Metrics
76# ==============================================================================
78timeout_exceeded_counter = meter.create_counter(
79 name="timeout.exceeded",
80 description="Total timeout violations",
81 unit="1",
82)
84timeout_duration_histogram = meter.create_histogram(
85 name="timeout.duration",
86 description="Timeout duration in seconds",
87 unit="s",
88)
91# ==============================================================================
92# Bulkhead Metrics
93# ==============================================================================
95bulkhead_rejected_counter = meter.create_counter(
96 name="bulkhead.rejections",
97 description="Total bulkhead rejections (no available slots)",
98 unit="1",
99)
101bulkhead_active_operations_gauge = meter.create_gauge(
102 name="bulkhead.active_operations",
103 description="Current number of active operations in bulkhead",
104 unit="1",
105)
107bulkhead_queue_depth_gauge = meter.create_gauge(
108 name="bulkhead.queue_depth",
109 description="Current number of operations waiting for bulkhead slot",
110 unit="1",
111)
114# ==============================================================================
115# Fallback Metrics
116# ==============================================================================
118fallback_used_counter = meter.create_counter(
119 name="fallback.used",
120 description="Total fallback invocations",
121 unit="1",
122)
124fallback_cache_hits_counter = meter.create_counter(
125 name="fallback.cache_hits",
126 description="Total fallback cache hits (stale data returned)",
127 unit="1",
128)
131# ==============================================================================
132# Aggregate Resilience Metrics
133# ==============================================================================
135resilience_pattern_invocations_counter = meter.create_counter(
136 name="resilience.pattern_invocations",
137 description="Total resilience pattern invocations",
138 unit="1",
139)
141resilience_pattern_effectiveness_gauge = meter.create_gauge(
142 name="resilience.pattern_effectiveness",
143 description="Resilience pattern effectiveness (0-1, 1=all requests succeeded)",
144 unit="1",
145)
148# ==============================================================================
149# Helper Functions
150# ==============================================================================
153def record_circuit_breaker_event(
154 service: str,
155 event_type: str,
156 exception_type: str | None = None,
157) -> None:
158 """
159 Record a circuit breaker event.
161 Args:
162 service: Service name (llm, openfga, redis, etc.)
163 event_type: Event type (success, failure, state_change)
164 exception_type: Exception type (if failure)
165 """
166 attributes = {"service": service}
168 if event_type == "success":
169 circuit_breaker_success_counter.add(1, attributes)
170 elif event_type == "failure":
171 if exception_type:
172 attributes["exception_type"] = exception_type
173 circuit_breaker_failure_counter.add(1, attributes)
174 elif event_type == "state_change": 174 ↛ exitline 174 didn't return from function 'record_circuit_breaker_event' because the condition on line 174 was always true
175 circuit_breaker_state_change_counter.add(1, attributes)
178def record_retry_event(
179 function: str,
180 event_type: str,
181 attempt_number: int | None = None,
182 exception_type: str | None = None,
183) -> None:
184 """
185 Record a retry event.
187 Args:
188 function: Function name
189 event_type: Event type (attempt, exhausted, success_after_retry)
190 attempt_number: Retry attempt number
191 exception_type: Exception type
192 """
193 attributes = {"function": function}
195 if attempt_number:
196 attributes["attempt_number"] = str(attempt_number)
197 if exception_type:
198 attributes["exception_type"] = exception_type
200 if event_type == "attempt":
201 retry_attempt_counter.add(1, attributes)
202 elif event_type == "exhausted":
203 retry_exhausted_counter.add(1, attributes)
204 elif event_type == "success_after_retry": 204 ↛ exitline 204 didn't return from function 'record_retry_event' because the condition on line 204 was always true
205 retry_success_after_retry_counter.add(1, attributes)
208def record_timeout_event(
209 function: str,
210 operation_type: str,
211 timeout_seconds: int,
212) -> None:
213 """
214 Record a timeout violation.
216 Args:
217 function: Function name
218 operation_type: Operation type (llm, auth, db, http, default)
219 timeout_seconds: Timeout value in seconds
220 """
221 attributes = {
222 "function": function,
223 "operation_type": operation_type,
224 "timeout_seconds": str(timeout_seconds),
225 }
227 timeout_exceeded_counter.add(1, attributes)
228 timeout_duration_histogram.record(timeout_seconds, attributes)
231def record_bulkhead_event(
232 resource_type: str,
233 event_type: str,
234 active_count: int | None = None,
235 queue_depth: int | None = None,
236) -> None:
237 """
238 Record a bulkhead event.
240 Args:
241 resource_type: Resource type (llm, openfga, redis, db)
242 event_type: Event type (rejection, active, queued)
243 active_count: Number of active operations
244 queue_depth: Number of queued operations
245 """
246 attributes = {"resource_type": resource_type}
248 if event_type == "rejection":
249 bulkhead_rejected_counter.add(1, attributes)
250 elif event_type == "active" and active_count is not None:
251 bulkhead_active_operations_gauge.set(active_count, attributes)
252 elif event_type == "queued" and queue_depth is not None:
253 bulkhead_queue_depth_gauge.set(queue_depth, attributes)
256def record_fallback_event(
257 function: str,
258 exception_type: str,
259 fallback_type: str = "default",
260) -> None:
261 """
262 Record a fallback usage event.
264 Args:
265 function: Function name
266 exception_type: Exception type that triggered fallback
267 fallback_type: Type of fallback (default, function, strategy, cache)
268 """
269 attributes = {
270 "function": function,
271 "exception_type": exception_type,
272 "fallback_type": fallback_type,
273 }
275 fallback_used_counter.add(1, attributes)
277 if fallback_type == "cache":
278 fallback_cache_hits_counter.add(1, attributes)
281# ==============================================================================
282# Metrics Export Functions
283# ==============================================================================
286def get_resilience_metrics_summary() -> dict[str, Any]:
287 """
288 Get summary of resilience metrics (for health checks, debugging).
290 Returns:
291 Dictionary with metric summaries
293 Note: This is a snapshot, not real-time metrics.
294 Use Prometheus/Grafana for real-time monitoring.
295 """
296 # This is a placeholder - in production, you'd query the metrics backend
297 # For now, return a structure that shows what's available
298 return {
299 "circuit_breakers": {
300 "total_failures": "circuit_breaker.failures (counter)",
301 "total_successes": "circuit_breaker.successes (counter)",
302 "state_changes": "circuit_breaker.state_changes (counter)",
303 "current_state": "circuit_breaker.state (gauge)",
304 },
305 "retries": {
306 "total_attempts": "retry.attempts (counter)",
307 "exhausted": "retry.exhausted (counter)",
308 "successes": "retry.success_after_retry (counter)",
309 },
310 "timeouts": {
311 "total_exceeded": "timeout.exceeded (counter)",
312 "duration_histogram": "timeout.duration (histogram)",
313 },
314 "bulkheads": {
315 "total_rejections": "bulkhead.rejections (counter)",
316 "active_operations": "bulkhead.active_operations (gauge)",
317 "queue_depth": "bulkhead.queue_depth (gauge)",
318 },
319 "fallbacks": {
320 "total_used": "fallback.used (counter)",
321 "cache_hits": "fallback.cache_hits (counter)",
322 },
323 }
326def export_resilience_metrics_for_prometheus() -> str:
327 """
328 Export metrics in Prometheus exposition format.
330 Returns:
331 Prometheus-formatted metrics
333 Note: This is typically handled by the OpenTelemetry exporter.
334 This function is for manual export/debugging.
335 """
336 # Placeholder - in production, use OpenTelemetry Prometheus exporter
337 return """
338# HELP circuit_breaker_failures Total circuit breaker failures
339# TYPE circuit_breaker_failures counter
340circuit_breaker_failures{service="llm"} 5
341circuit_breaker_failures{service="openfga"} 2
343# HELP circuit_breaker_state Circuit breaker state (0=closed, 1=open)
344# TYPE circuit_breaker_state gauge
345circuit_breaker_state{service="llm"} 0
346circuit_breaker_state{service="openfga"} 0
348# HELP retry_attempts Total retry attempts
349# TYPE retry_attempts counter
350retry_attempts{function="call_llm",attempt_number="1"} 10
351retry_attempts{function="call_llm",attempt_number="2"} 5
353# HELP timeout_exceeded Total timeout violations
354# TYPE timeout_exceeded counter
355timeout_exceeded{function="call_llm",operation_type="llm"} 3
357# HELP bulkhead_rejections Total bulkhead rejections
358# TYPE bulkhead_rejections counter
359bulkhead_rejections{resource_type="llm"} 2
361# HELP fallback_used Total fallback invocations
362# TYPE fallback_used counter
363fallback_used{function="check_permission",exception_type="OpenFGAError"} 1
364"""