Coverage for src / mcp_server_langgraph / resilience / metrics.py: 98%

62 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Resilience metrics for observability. 

3 

4Provides OpenTelemetry metrics for all resilience patterns: 

5- Circuit breaker state changes and failures 

6- Retry attempts and exhaustion 

7- Timeout violations 

8- Bulkhead rejections and active operations 

9- Fallback usage 

10 

11These metrics integrate with the existing observability stack. 

12""" 

13 

14from typing import Any 

15 

16from opentelemetry import metrics 

17 

18# Get meter from observability stack 

19meter = metrics.get_meter(__name__) 

20 

21 

22# ============================================================================== 

23# Circuit Breaker Metrics 

24# ============================================================================== 

25 

26circuit_breaker_state_gauge = meter.create_gauge( 

27 name="circuit_breaker.state", 

28 description="Circuit breaker state (0=closed, 1=open, 0.5=half-open)", 

29 unit="1", 

30) 

31 

32circuit_breaker_failure_counter = meter.create_counter( 

33 name="circuit_breaker.failures", 

34 description="Total circuit breaker failures", 

35 unit="1", 

36) 

37 

38circuit_breaker_success_counter = meter.create_counter( 

39 name="circuit_breaker.successes", 

40 description="Total circuit breaker successes", 

41 unit="1", 

42) 

43 

44circuit_breaker_state_change_counter = meter.create_counter( 

45 name="circuit_breaker.state_changes", 

46 description="Total circuit breaker state changes", 

47 unit="1", 

48) 

49 

50 

51# ============================================================================== 

52# Retry Metrics 

53# ============================================================================== 

54 

55retry_attempt_counter = meter.create_counter( 

56 name="retry.attempts", 

57 description="Total retry attempts", 

58 unit="1", 

59) 

60 

61retry_exhausted_counter = meter.create_counter( 

62 name="retry.exhausted", 

63 description="Total retry exhaustion events (all attempts failed)", 

64 unit="1", 

65) 

66 

67retry_success_after_retry_counter = meter.create_counter( 

68 name="retry.success_after_retry", 

69 description="Total successful retries (succeeded on attempt > 1)", 

70 unit="1", 

71) 

72 

73 

74# ============================================================================== 

75# Timeout Metrics 

76# ============================================================================== 

77 

78timeout_exceeded_counter = meter.create_counter( 

79 name="timeout.exceeded", 

80 description="Total timeout violations", 

81 unit="1", 

82) 

83 

84timeout_duration_histogram = meter.create_histogram( 

85 name="timeout.duration", 

86 description="Timeout duration in seconds", 

87 unit="s", 

88) 

89 

90 

91# ============================================================================== 

92# Bulkhead Metrics 

93# ============================================================================== 

94 

95bulkhead_rejected_counter = meter.create_counter( 

96 name="bulkhead.rejections", 

97 description="Total bulkhead rejections (no available slots)", 

98 unit="1", 

99) 

100 

101bulkhead_active_operations_gauge = meter.create_gauge( 

102 name="bulkhead.active_operations", 

103 description="Current number of active operations in bulkhead", 

104 unit="1", 

105) 

106 

107bulkhead_queue_depth_gauge = meter.create_gauge( 

108 name="bulkhead.queue_depth", 

109 description="Current number of operations waiting for bulkhead slot", 

110 unit="1", 

111) 

112 

113 

114# ============================================================================== 

115# Fallback Metrics 

116# ============================================================================== 

117 

118fallback_used_counter = meter.create_counter( 

119 name="fallback.used", 

120 description="Total fallback invocations", 

121 unit="1", 

122) 

123 

124fallback_cache_hits_counter = meter.create_counter( 

125 name="fallback.cache_hits", 

126 description="Total fallback cache hits (stale data returned)", 

127 unit="1", 

128) 

129 

130 

131# ============================================================================== 

132# Aggregate Resilience Metrics 

133# ============================================================================== 

134 

135resilience_pattern_invocations_counter = meter.create_counter( 

136 name="resilience.pattern_invocations", 

137 description="Total resilience pattern invocations", 

138 unit="1", 

139) 

140 

141resilience_pattern_effectiveness_gauge = meter.create_gauge( 

142 name="resilience.pattern_effectiveness", 

143 description="Resilience pattern effectiveness (0-1, 1=all requests succeeded)", 

144 unit="1", 

145) 

146 

147 

148# ============================================================================== 

149# Helper Functions 

150# ============================================================================== 

151 

152 

153def record_circuit_breaker_event( 

154 service: str, 

155 event_type: str, 

156 exception_type: str | None = None, 

157) -> None: 

158 """ 

159 Record a circuit breaker event. 

160 

161 Args: 

162 service: Service name (llm, openfga, redis, etc.) 

163 event_type: Event type (success, failure, state_change) 

164 exception_type: Exception type (if failure) 

165 """ 

166 attributes = {"service": service} 

167 

168 if event_type == "success": 

169 circuit_breaker_success_counter.add(1, attributes) 

170 elif event_type == "failure": 

171 if exception_type: 

172 attributes["exception_type"] = exception_type 

173 circuit_breaker_failure_counter.add(1, attributes) 

174 elif event_type == "state_change": 174 ↛ exitline 174 didn't return from function 'record_circuit_breaker_event' because the condition on line 174 was always true

175 circuit_breaker_state_change_counter.add(1, attributes) 

176 

177 

178def record_retry_event( 

179 function: str, 

180 event_type: str, 

181 attempt_number: int | None = None, 

182 exception_type: str | None = None, 

183) -> None: 

184 """ 

185 Record a retry event. 

186 

187 Args: 

188 function: Function name 

189 event_type: Event type (attempt, exhausted, success_after_retry) 

190 attempt_number: Retry attempt number 

191 exception_type: Exception type 

192 """ 

193 attributes = {"function": function} 

194 

195 if attempt_number: 

196 attributes["attempt_number"] = str(attempt_number) 

197 if exception_type: 

198 attributes["exception_type"] = exception_type 

199 

200 if event_type == "attempt": 

201 retry_attempt_counter.add(1, attributes) 

202 elif event_type == "exhausted": 

203 retry_exhausted_counter.add(1, attributes) 

204 elif event_type == "success_after_retry": 204 ↛ exitline 204 didn't return from function 'record_retry_event' because the condition on line 204 was always true

205 retry_success_after_retry_counter.add(1, attributes) 

206 

207 

208def record_timeout_event( 

209 function: str, 

210 operation_type: str, 

211 timeout_seconds: int, 

212) -> None: 

213 """ 

214 Record a timeout violation. 

215 

216 Args: 

217 function: Function name 

218 operation_type: Operation type (llm, auth, db, http, default) 

219 timeout_seconds: Timeout value in seconds 

220 """ 

221 attributes = { 

222 "function": function, 

223 "operation_type": operation_type, 

224 "timeout_seconds": str(timeout_seconds), 

225 } 

226 

227 timeout_exceeded_counter.add(1, attributes) 

228 timeout_duration_histogram.record(timeout_seconds, attributes) 

229 

230 

231def record_bulkhead_event( 

232 resource_type: str, 

233 event_type: str, 

234 active_count: int | None = None, 

235 queue_depth: int | None = None, 

236) -> None: 

237 """ 

238 Record a bulkhead event. 

239 

240 Args: 

241 resource_type: Resource type (llm, openfga, redis, db) 

242 event_type: Event type (rejection, active, queued) 

243 active_count: Number of active operations 

244 queue_depth: Number of queued operations 

245 """ 

246 attributes = {"resource_type": resource_type} 

247 

248 if event_type == "rejection": 

249 bulkhead_rejected_counter.add(1, attributes) 

250 elif event_type == "active" and active_count is not None: 

251 bulkhead_active_operations_gauge.set(active_count, attributes) 

252 elif event_type == "queued" and queue_depth is not None: 

253 bulkhead_queue_depth_gauge.set(queue_depth, attributes) 

254 

255 

256def record_fallback_event( 

257 function: str, 

258 exception_type: str, 

259 fallback_type: str = "default", 

260) -> None: 

261 """ 

262 Record a fallback usage event. 

263 

264 Args: 

265 function: Function name 

266 exception_type: Exception type that triggered fallback 

267 fallback_type: Type of fallback (default, function, strategy, cache) 

268 """ 

269 attributes = { 

270 "function": function, 

271 "exception_type": exception_type, 

272 "fallback_type": fallback_type, 

273 } 

274 

275 fallback_used_counter.add(1, attributes) 

276 

277 if fallback_type == "cache": 

278 fallback_cache_hits_counter.add(1, attributes) 

279 

280 

281# ============================================================================== 

282# Metrics Export Functions 

283# ============================================================================== 

284 

285 

286def get_resilience_metrics_summary() -> dict[str, Any]: 

287 """ 

288 Get summary of resilience metrics (for health checks, debugging). 

289 

290 Returns: 

291 Dictionary with metric summaries 

292 

293 Note: This is a snapshot, not real-time metrics. 

294 Use Prometheus/Grafana for real-time monitoring. 

295 """ 

296 # This is a placeholder - in production, you'd query the metrics backend 

297 # For now, return a structure that shows what's available 

298 return { 

299 "circuit_breakers": { 

300 "total_failures": "circuit_breaker.failures (counter)", 

301 "total_successes": "circuit_breaker.successes (counter)", 

302 "state_changes": "circuit_breaker.state_changes (counter)", 

303 "current_state": "circuit_breaker.state (gauge)", 

304 }, 

305 "retries": { 

306 "total_attempts": "retry.attempts (counter)", 

307 "exhausted": "retry.exhausted (counter)", 

308 "successes": "retry.success_after_retry (counter)", 

309 }, 

310 "timeouts": { 

311 "total_exceeded": "timeout.exceeded (counter)", 

312 "duration_histogram": "timeout.duration (histogram)", 

313 }, 

314 "bulkheads": { 

315 "total_rejections": "bulkhead.rejections (counter)", 

316 "active_operations": "bulkhead.active_operations (gauge)", 

317 "queue_depth": "bulkhead.queue_depth (gauge)", 

318 }, 

319 "fallbacks": { 

320 "total_used": "fallback.used (counter)", 

321 "cache_hits": "fallback.cache_hits (counter)", 

322 }, 

323 } 

324 

325 

326def export_resilience_metrics_for_prometheus() -> str: 

327 """ 

328 Export metrics in Prometheus exposition format. 

329 

330 Returns: 

331 Prometheus-formatted metrics 

332 

333 Note: This is typically handled by the OpenTelemetry exporter. 

334 This function is for manual export/debugging. 

335 """ 

336 # Placeholder - in production, use OpenTelemetry Prometheus exporter 

337 return """ 

338# HELP circuit_breaker_failures Total circuit breaker failures 

339# TYPE circuit_breaker_failures counter 

340circuit_breaker_failures{service="llm"} 5 

341circuit_breaker_failures{service="openfga"} 2 

342 

343# HELP circuit_breaker_state Circuit breaker state (0=closed, 1=open) 

344# TYPE circuit_breaker_state gauge 

345circuit_breaker_state{service="llm"} 0 

346circuit_breaker_state{service="openfga"} 0 

347 

348# HELP retry_attempts Total retry attempts 

349# TYPE retry_attempts counter 

350retry_attempts{function="call_llm",attempt_number="1"} 10 

351retry_attempts{function="call_llm",attempt_number="2"} 5 

352 

353# HELP timeout_exceeded Total timeout violations 

354# TYPE timeout_exceeded counter 

355timeout_exceeded{function="call_llm",operation_type="llm"} 3 

356 

357# HELP bulkhead_rejections Total bulkhead rejections 

358# TYPE bulkhead_rejections counter 

359bulkhead_rejections{resource_type="llm"} 2 

360 

361# HELP fallback_used Total fallback invocations 

362# TYPE fallback_used counter 

363fallback_used{function="check_permission",exception_type="OpenFGAError"} 1 

364"""