Coverage for src / mcp_server_langgraph / monitoring / sla.py: 85%

198 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2SLA Monitoring and Tracking 

3 

4Implements Service Level Agreement monitoring for: 

5- Uptime percentage (99.9% target) 

6- Response time percentiles (p50, p95, p99) 

7- Error rate thresholds 

8- Automated alerting on SLA breaches 

9 

10SOC 2 A1.2 - System Availability Monitoring 

11""" 

12 

13from datetime import datetime, timedelta, UTC 

14from enum import Enum 

15from typing import Any 

16 

17from pydantic import BaseModel, Field 

18 

19from mcp_server_langgraph.integrations.alerting import Alert, AlertingService, AlertSeverity 

20from mcp_server_langgraph.monitoring.prometheus_client import get_prometheus_client 

21from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer 

22 

23 

24class SLAStatus(str, Enum): 

25 """SLA compliance status""" 

26 

27 MEETING = "meeting" # Meeting SLA targets 

28 AT_RISK = "at_risk" # Close to breach (within 10%) 

29 BREACH = "breach" # SLA target breached 

30 

31 

32class SLAMetric(str, Enum): 

33 """SLA metric types""" 

34 

35 UPTIME = "uptime" # System uptime percentage 

36 RESPONSE_TIME = "response_time" # API response time 

37 ERROR_RATE = "error_rate" # Error rate percentage 

38 THROUGHPUT = "throughput" # Requests per second 

39 

40 

41class SLATarget(BaseModel): 

42 """SLA target definition""" 

43 

44 metric: SLAMetric 

45 target_value: float = Field(..., description="Target value (e.g., 99.9 for uptime)") 

46 comparison: str = Field(default=">=", description="Comparison operator: >=, <=, ==, >, <") 

47 unit: str = Field(..., description="Unit of measurement (%, ms, rps)") 

48 warning_threshold: float = Field(..., description="Threshold for warning alerts (% of target)") 

49 critical_threshold: float = Field(..., description="Threshold for critical alerts (% of target)") 

50 

51 

52class SLAMeasurement(BaseModel): 

53 """SLA measurement result""" 

54 

55 metric: SLAMetric 

56 measured_value: float 

57 target_value: float 

58 unit: str 

59 status: SLAStatus 

60 compliance_percentage: float = Field(..., description="Percentage of target achieved") 

61 timestamp: str 

62 period_start: str 

63 period_end: str 

64 breach_details: dict[str, Any] | None = None 

65 

66 

67class SLAReport(BaseModel): 

68 """SLA compliance report""" 

69 

70 report_id: str 

71 generated_at: str 

72 period_start: str 

73 period_end: str 

74 measurements: list[SLAMeasurement] = Field(default_factory=list) 

75 overall_status: SLAStatus 

76 breaches: int = Field(default=0, description="Number of SLA breaches") 

77 warnings: int = Field(default=0, description="Number of warnings") 

78 compliance_score: float = Field(..., ge=0.0, description="Overall SLA compliance score (can exceed 100%)") 

79 summary: dict[str, Any] = Field(default_factory=dict) 

80 

81 

82class SLAMonitor: 

83 """ 

84 SLA monitoring and tracking service 

85 

86 Monitors system SLAs including uptime, response times, error rates. 

87 Provides automated alerting on SLA breaches and trend analysis. 

88 """ 

89 

90 def __init__(self, sla_targets: list[SLATarget] | None = None) -> None: 

91 """ 

92 Initialize SLA monitor 

93 

94 Args: 

95 sla_targets: List of SLA targets to monitor (if None, uses defaults; if [], uses no targets) 

96 """ 

97 self.sla_targets = sla_targets if sla_targets is not None else self._default_sla_targets() 

98 

99 logger.info( 

100 "SLA monitor initialized", 

101 extra={"target_count": len(self.sla_targets)}, 

102 ) 

103 

104 def _default_sla_targets(self) -> list[SLATarget]: 

105 """ 

106 Get default SLA targets 

107 

108 Returns: 

109 List of default SLA targets 

110 """ 

111 return [ 

112 SLATarget( 

113 metric=SLAMetric.UPTIME, 

114 target_value=99.9, 

115 comparison=">=", 

116 unit="%", 

117 warning_threshold=99.5, # Warning at 99.5% 

118 critical_threshold=99.0, # Critical below 99% 

119 ), 

120 SLATarget( 

121 metric=SLAMetric.RESPONSE_TIME, 

122 target_value=500, # 500ms p95 

123 comparison="<=", 

124 unit="ms", 

125 warning_threshold=600, # Warning at 600ms 

126 critical_threshold=1000, # Critical above 1000ms 

127 ), 

128 SLATarget( 

129 metric=SLAMetric.ERROR_RATE, 

130 target_value=1.0, # 1% error rate 

131 comparison="<=", 

132 unit="%", 

133 warning_threshold=2.0, # Warning at 2% 

134 critical_threshold=5.0, # Critical above 5% 

135 ), 

136 ] 

137 

138 async def measure_uptime(self, start_time: datetime, end_time: datetime) -> SLAMeasurement: 

139 """ 

140 Measure uptime SLA 

141 

142 Args: 

143 start_time: Start of measurement period 

144 end_time: End of measurement period 

145 

146 Returns: 

147 SLAMeasurement for uptime 

148 """ 

149 with tracer.start_as_current_span("sla.measure_uptime") as span: 

150 # Get uptime target 

151 uptime_target = next((t for t in self.sla_targets if t.metric == SLAMetric.UPTIME), None) 

152 

153 if not uptime_target: 

154 msg = "No uptime SLA target configured" 

155 raise ValueError(msg) 

156 

157 # Calculate total time in period 

158 total_seconds = (end_time - start_time).total_seconds() 

159 

160 # Query Prometheus for actual downtime 

161 try: 

162 prometheus = await get_prometheus_client() 

163 timerange = f"{int(total_seconds / 86400)}d" # Convert to days 

164 downtime_seconds = await prometheus.query_downtime(timerange=timerange) 

165 except Exception as e: 

166 logger.warning(f"Failed to query Prometheus for downtime: {e}") 

167 downtime_seconds = 0 # Fallback to zero if query fails 

168 

169 # Calculate uptime percentage 

170 uptime_seconds = total_seconds - downtime_seconds 

171 uptime_percentage = (uptime_seconds / total_seconds * 100) if total_seconds > 0 else 0 

172 

173 # Calculate compliance percentage 

174 compliance_percentage = ( 

175 (uptime_percentage / uptime_target.target_value * 100) if uptime_target.target_value > 0 else 0 

176 ) 

177 

178 # Determine status 

179 status = self._determine_status(uptime_percentage, uptime_target, is_higher_better=True) 

180 

181 # Breach details 

182 breach_details = None 

183 if status == SLAStatus.BREACH: 

184 breach_details = { 

185 "target": uptime_target.target_value, 

186 "actual": uptime_percentage, 

187 "shortfall": uptime_target.target_value - uptime_percentage, 

188 "downtime_seconds": downtime_seconds, 

189 "downtime_minutes": downtime_seconds / 60, 

190 } 

191 

192 measurement = SLAMeasurement( 

193 metric=SLAMetric.UPTIME, 

194 measured_value=uptime_percentage, 

195 target_value=uptime_target.target_value, 

196 unit=uptime_target.unit, 

197 status=status, 

198 compliance_percentage=compliance_percentage, 

199 timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

200 period_start=start_time.isoformat().replace("+00:00", "Z"), 

201 period_end=end_time.isoformat().replace("+00:00", "Z"), 

202 breach_details=breach_details, 

203 ) 

204 

205 span.set_attribute("uptime_percentage", uptime_percentage) 

206 span.set_attribute("status", status.value) 

207 

208 logger.info( 

209 "Uptime SLA measured", 

210 extra={ 

211 "uptime_percentage": uptime_percentage, 

212 "target": uptime_target.target_value, 

213 "status": status.value, 

214 }, 

215 ) 

216 

217 return measurement 

218 

219 async def measure_response_time(self, start_time: datetime, end_time: datetime, percentile: int = 95) -> SLAMeasurement: 

220 """ 

221 Measure response time SLA 

222 

223 Args: 

224 start_time: Start of measurement period 

225 end_time: End of measurement period 

226 percentile: Percentile to measure (50, 95, 99) 

227 

228 Returns: 

229 SLAMeasurement for response time 

230 """ 

231 with tracer.start_as_current_span("sla.measure_response_time") as span: 

232 span.set_attribute("percentile", percentile) 

233 

234 # Get response time target 

235 rt_target = next( 

236 (t for t in self.sla_targets if t.metric == SLAMetric.RESPONSE_TIME), 

237 None, 

238 ) 

239 

240 if not rt_target: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true

241 msg = "No response time SLA target configured" 

242 raise ValueError(msg) 

243 

244 # Query Prometheus for actual response times 

245 try: 

246 prometheus = await get_prometheus_client() 

247 timerange_hours = int((end_time - start_time).total_seconds() / 3600) 

248 timerange = f"{max(1, timerange_hours)}h" # At least 1 hour 

249 percentiles = await prometheus.query_percentiles( 

250 metric="http_request_duration_seconds", percentiles=[percentile], timerange=timerange 

251 ) 

252 response_time_ms = percentiles.get(percentile, 0) * 1000 # Convert seconds to milliseconds 

253 except Exception as e: 

254 logger.warning(f"Failed to query Prometheus for response times: {e}") 

255 response_time_ms = 350 # Fallback to conservative estimate 

256 

257 # Calculate compliance percentage 

258 compliance_percentage = (rt_target.target_value / response_time_ms * 100) if response_time_ms > 0 else 100 

259 

260 # Determine status 

261 status = self._determine_status(response_time_ms, rt_target, is_higher_better=False) 

262 

263 # Breach details 

264 breach_details = None 

265 if status == SLAStatus.BREACH: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true

266 breach_details = { 

267 "target": rt_target.target_value, 

268 "actual": response_time_ms, 

269 "overage": response_time_ms - rt_target.target_value, 

270 "percentile": f"p{percentile}", 

271 } 

272 

273 measurement = SLAMeasurement( 

274 metric=SLAMetric.RESPONSE_TIME, 

275 measured_value=response_time_ms, 

276 target_value=rt_target.target_value, 

277 unit=rt_target.unit, 

278 status=status, 

279 compliance_percentage=compliance_percentage, 

280 timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

281 period_start=start_time.isoformat().replace("+00:00", "Z"), 

282 period_end=end_time.isoformat().replace("+00:00", "Z"), 

283 breach_details=breach_details, 

284 ) 

285 

286 span.set_attribute("response_time_ms", response_time_ms) 

287 span.set_attribute("status", status.value) 

288 

289 logger.info( 

290 f"Response time SLA measured (p{percentile})", 

291 extra={ 

292 "response_time_ms": response_time_ms, 

293 "target": rt_target.target_value, 

294 "status": status.value, 

295 }, 

296 ) 

297 

298 return measurement 

299 

300 async def measure_error_rate(self, start_time: datetime, end_time: datetime) -> SLAMeasurement: 

301 """ 

302 Measure error rate SLA 

303 

304 Args: 

305 start_time: Start of measurement period 

306 end_time: End of measurement period 

307 

308 Returns: 

309 SLAMeasurement for error rate 

310 """ 

311 with tracer.start_as_current_span("sla.measure_error_rate") as span: 

312 # Get error rate target 

313 error_target = next((t for t in self.sla_targets if t.metric == SLAMetric.ERROR_RATE), None) 

314 

315 if not error_target: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true

316 msg = "No error rate SLA target configured" 

317 raise ValueError(msg) 

318 

319 # Query Prometheus for actual error rate 

320 try: 

321 prometheus = await get_prometheus_client() 

322 timerange_mins = int((end_time - start_time).total_seconds() / 60) 

323 timerange = f"{max(5, timerange_mins)}m" # At least 5 minutes 

324 error_rate_percentage = await prometheus.query_error_rate(timerange=timerange) 

325 except Exception as e: 

326 logger.warning(f"Failed to query Prometheus for error rate: {e}") 

327 error_rate_percentage = 0.5 # Fallback to conservative estimate 

328 

329 # Calculate compliance percentage 

330 compliance_percentage = ( 

331 (error_target.target_value / error_rate_percentage * 100) if error_rate_percentage > 0 else 100 

332 ) 

333 

334 # Determine status 

335 status = self._determine_status(error_rate_percentage, error_target, is_higher_better=False) 

336 

337 # Breach details 

338 breach_details = None 

339 if status == SLAStatus.BREACH: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true

340 breach_details = { 

341 "target": error_target.target_value, 

342 "actual": error_rate_percentage, 

343 "overage": error_rate_percentage - error_target.target_value, 

344 } 

345 

346 measurement = SLAMeasurement( 

347 metric=SLAMetric.ERROR_RATE, 

348 measured_value=error_rate_percentage, 

349 target_value=error_target.target_value, 

350 unit=error_target.unit, 

351 status=status, 

352 compliance_percentage=compliance_percentage, 

353 timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

354 period_start=start_time.isoformat().replace("+00:00", "Z"), 

355 period_end=end_time.isoformat().replace("+00:00", "Z"), 

356 breach_details=breach_details, 

357 ) 

358 

359 span.set_attribute("error_rate_percentage", error_rate_percentage) 

360 span.set_attribute("status", status.value) 

361 

362 logger.info( 

363 "Error rate SLA measured", 

364 extra={ 

365 "error_rate_percentage": error_rate_percentage, 

366 "target": error_target.target_value, 

367 "status": status.value, 

368 }, 

369 ) 

370 

371 return measurement 

372 

373 def _determine_status(self, measured_value: float, target: SLATarget, is_higher_better: bool) -> SLAStatus: 

374 """ 

375 Determine SLA status based on measured value and target 

376 

377 Args: 

378 measured_value: Measured value 

379 target: SLA target 

380 is_higher_better: True if higher values are better (uptime), False otherwise 

381 

382 Returns: 

383 SLAStatus 

384 """ 

385 if is_higher_better: 

386 # Higher is better (e.g., uptime) 

387 if measured_value >= target.target_value: 

388 return SLAStatus.MEETING 

389 elif measured_value >= target.warning_threshold: 

390 return SLAStatus.AT_RISK 

391 else: 

392 return SLAStatus.BREACH 

393 else: 

394 # Lower is better (e.g., response time, error rate) 

395 if measured_value <= target.target_value: 

396 return SLAStatus.MEETING 

397 elif measured_value <= target.warning_threshold: 

398 return SLAStatus.AT_RISK 

399 else: 

400 return SLAStatus.BREACH 

401 

402 async def generate_sla_report(self, period_days: int = 30) -> SLAReport: 

403 """ 

404 Generate comprehensive SLA report 

405 

406 Args: 

407 period_days: Number of days to report on 

408 

409 Returns: 

410 SLAReport with all measurements 

411 """ 

412 with tracer.start_as_current_span("sla.generate_report") as span: 

413 span.set_attribute("period_days", period_days) 

414 

415 end_time = datetime.now(UTC) 

416 start_time = end_time - timedelta(days=period_days) 

417 

418 measurements = [] 

419 

420 # Measure only metrics that have configured targets 

421 # Check which metrics are configured 

422 configured_metrics = {t.metric for t in self.sla_targets} 

423 

424 # Measure uptime if configured 

425 uptime = None 

426 if SLAMetric.UPTIME in configured_metrics: 426 ↛ 431line 426 didn't jump to line 431 because the condition on line 426 was always true

427 uptime = await self.measure_uptime(start_time, end_time) 

428 measurements.append(uptime) 

429 

430 # Measure response time if configured 

431 response_time = None 

432 if SLAMetric.RESPONSE_TIME in configured_metrics: 

433 response_time = await self.measure_response_time(start_time, end_time) 

434 measurements.append(response_time) 

435 

436 # Measure error rate if configured 

437 error_rate = None 

438 if SLAMetric.ERROR_RATE in configured_metrics: 

439 error_rate = await self.measure_error_rate(start_time, end_time) 

440 measurements.append(error_rate) 

441 

442 # Count breaches and warnings 

443 breaches = sum(1 for m in measurements if m.status == SLAStatus.BREACH) 

444 warnings = sum(1 for m in measurements if m.status == SLAStatus.AT_RISK) 

445 

446 # Determine overall status 

447 if breaches > 0: 447 ↛ 449line 447 didn't jump to line 449 because the condition on line 447 was always true

448 overall_status = SLAStatus.BREACH 

449 elif warnings > 0: 

450 overall_status = SLAStatus.AT_RISK 

451 else: 

452 overall_status = SLAStatus.MEETING 

453 

454 # Calculate overall compliance score (average of all measurements) 

455 compliance_score = sum(m.compliance_percentage for m in measurements) / len(measurements) if measurements else 0.0 

456 

457 # Generate summary (only include measured metrics) 

458 summary = { 

459 "all_slas_met": overall_status == SLAStatus.MEETING, 

460 "breaches": [ 

461 { 

462 "metric": m.metric.value, 

463 "target": m.target_value, 

464 "actual": m.measured_value, 

465 "details": m.breach_details, 

466 } 

467 for m in measurements 

468 if m.status == SLAStatus.BREACH 

469 ], 

470 } 

471 

472 # Add measured values to summary 

473 if uptime is not None: 473 ↛ 475line 473 didn't jump to line 475 because the condition on line 473 was always true

474 summary["uptime_percentage"] = uptime.measured_value 

475 if response_time is not None: 

476 summary["response_time_p95_ms"] = response_time.measured_value 

477 if error_rate is not None: 

478 summary["error_rate_percentage"] = error_rate.measured_value 

479 

480 report = SLAReport( 

481 report_id=f"sla_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}", 

482 generated_at=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

483 period_start=start_time.isoformat().replace("+00:00", "Z"), 

484 period_end=end_time.isoformat().replace("+00:00", "Z"), 

485 measurements=measurements, 

486 overall_status=overall_status, 

487 breaches=breaches, 

488 warnings=warnings, 

489 compliance_score=compliance_score, 

490 summary=summary, 

491 ) 

492 

493 logger.info( 

494 "SLA report generated", 

495 extra={ 

496 "report_id": report.report_id, 

497 "overall_status": overall_status.value, 

498 "compliance_score": compliance_score, 

499 "breaches": breaches, 

500 }, 

501 ) 

502 

503 # Track metrics 

504 metrics.successful_calls.add(1, {"operation": "sla_report_generation"}) 

505 

506 # Alert on breaches 

507 if overall_status == SLAStatus.BREACH: 507 ↛ 514line 507 didn't jump to line 514 because the condition on line 507 was always true

508 await self._send_sla_alert( 

509 severity="critical", 

510 message=f"SLA breach detected: {breaches} metric(s) breached", 

511 details=summary, 

512 ) 

513 

514 return report 

515 

516 async def _send_sla_alert(self, severity: str, message: str, details: dict[str, Any]) -> None: 

517 """ 

518 Send SLA alert 

519 

520 Args: 

521 severity: Alert severity (warning, critical) 

522 message: Alert message 

523 details: Alert details 

524 """ 

525 logger.warning( 

526 f"SLA Alert [{severity.upper()}]: {message}", 

527 extra={"severity": severity, "details": details}, 

528 ) 

529 

530 # Send to alerting system (PagerDuty, Slack, email) 

531 try: 

532 alerting_service = AlertingService() 

533 await alerting_service.initialize() 

534 

535 # Map severity string to AlertSeverity enum 

536 alert_severity = AlertSeverity.CRITICAL if severity == "critical" else AlertSeverity.WARNING # type: ignore[attr-defined] 

537 

538 from mcp_server_langgraph.integrations.alerting import AlertCategory 

539 

540 alert = Alert( 

541 title=f"SLA {severity.upper()}: {message}", 

542 description=message, 

543 severity=alert_severity, 

544 category=AlertCategory.SLA, 

545 source="sla_monitor", 

546 metadata=details, 

547 ) 

548 

549 await alerting_service.send_alert(alert) 

550 logger.info("SLA alert sent successfully", extra={"alert_id": alert.alert_id}) 

551 

552 except Exception as e: 

553 logger.error(f"Failed to send SLA alert: {e}", exc_info=True) 

554 

555 

556# Global SLA monitor instance 

557_sla_monitor: SLAMonitor | None = None 

558 

559 

560def get_sla_monitor() -> SLAMonitor: 

561 """ 

562 Get or create global SLA monitor instance 

563 

564 Returns: 

565 SLAMonitor instance 

566 """ 

567 global _sla_monitor 

568 

569 if _sla_monitor is None: 

570 _sla_monitor = SLAMonitor() 

571 

572 return _sla_monitor 

573 

574 

575def set_sla_monitor(monitor: SLAMonitor) -> None: 

576 """ 

577 Set global SLA monitor instance 

578 

579 Args: 

580 monitor: SLAMonitor instance to use globally 

581 """ 

582 global _sla_monitor 

583 _sla_monitor = monitor