Coverage for src / mcp_server_langgraph / monitoring / sla.py: 85%
198 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2SLA Monitoring and Tracking
4Implements Service Level Agreement monitoring for:
5- Uptime percentage (99.9% target)
6- Response time percentiles (p50, p95, p99)
7- Error rate thresholds
8- Automated alerting on SLA breaches
10SOC 2 A1.2 - System Availability Monitoring
11"""
13from datetime import datetime, timedelta, UTC
14from enum import Enum
15from typing import Any
17from pydantic import BaseModel, Field
19from mcp_server_langgraph.integrations.alerting import Alert, AlertingService, AlertSeverity
20from mcp_server_langgraph.monitoring.prometheus_client import get_prometheus_client
21from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer
24class SLAStatus(str, Enum):
25 """SLA compliance status"""
27 MEETING = "meeting" # Meeting SLA targets
28 AT_RISK = "at_risk" # Close to breach (within 10%)
29 BREACH = "breach" # SLA target breached
32class SLAMetric(str, Enum):
33 """SLA metric types"""
35 UPTIME = "uptime" # System uptime percentage
36 RESPONSE_TIME = "response_time" # API response time
37 ERROR_RATE = "error_rate" # Error rate percentage
38 THROUGHPUT = "throughput" # Requests per second
41class SLATarget(BaseModel):
42 """SLA target definition"""
44 metric: SLAMetric
45 target_value: float = Field(..., description="Target value (e.g., 99.9 for uptime)")
46 comparison: str = Field(default=">=", description="Comparison operator: >=, <=, ==, >, <")
47 unit: str = Field(..., description="Unit of measurement (%, ms, rps)")
48 warning_threshold: float = Field(..., description="Threshold for warning alerts (% of target)")
49 critical_threshold: float = Field(..., description="Threshold for critical alerts (% of target)")
52class SLAMeasurement(BaseModel):
53 """SLA measurement result"""
55 metric: SLAMetric
56 measured_value: float
57 target_value: float
58 unit: str
59 status: SLAStatus
60 compliance_percentage: float = Field(..., description="Percentage of target achieved")
61 timestamp: str
62 period_start: str
63 period_end: str
64 breach_details: dict[str, Any] | None = None
67class SLAReport(BaseModel):
68 """SLA compliance report"""
70 report_id: str
71 generated_at: str
72 period_start: str
73 period_end: str
74 measurements: list[SLAMeasurement] = Field(default_factory=list)
75 overall_status: SLAStatus
76 breaches: int = Field(default=0, description="Number of SLA breaches")
77 warnings: int = Field(default=0, description="Number of warnings")
78 compliance_score: float = Field(..., ge=0.0, description="Overall SLA compliance score (can exceed 100%)")
79 summary: dict[str, Any] = Field(default_factory=dict)
82class SLAMonitor:
83 """
84 SLA monitoring and tracking service
86 Monitors system SLAs including uptime, response times, error rates.
87 Provides automated alerting on SLA breaches and trend analysis.
88 """
90 def __init__(self, sla_targets: list[SLATarget] | None = None) -> None:
91 """
92 Initialize SLA monitor
94 Args:
95 sla_targets: List of SLA targets to monitor (if None, uses defaults; if [], uses no targets)
96 """
97 self.sla_targets = sla_targets if sla_targets is not None else self._default_sla_targets()
99 logger.info(
100 "SLA monitor initialized",
101 extra={"target_count": len(self.sla_targets)},
102 )
104 def _default_sla_targets(self) -> list[SLATarget]:
105 """
106 Get default SLA targets
108 Returns:
109 List of default SLA targets
110 """
111 return [
112 SLATarget(
113 metric=SLAMetric.UPTIME,
114 target_value=99.9,
115 comparison=">=",
116 unit="%",
117 warning_threshold=99.5, # Warning at 99.5%
118 critical_threshold=99.0, # Critical below 99%
119 ),
120 SLATarget(
121 metric=SLAMetric.RESPONSE_TIME,
122 target_value=500, # 500ms p95
123 comparison="<=",
124 unit="ms",
125 warning_threshold=600, # Warning at 600ms
126 critical_threshold=1000, # Critical above 1000ms
127 ),
128 SLATarget(
129 metric=SLAMetric.ERROR_RATE,
130 target_value=1.0, # 1% error rate
131 comparison="<=",
132 unit="%",
133 warning_threshold=2.0, # Warning at 2%
134 critical_threshold=5.0, # Critical above 5%
135 ),
136 ]
138 async def measure_uptime(self, start_time: datetime, end_time: datetime) -> SLAMeasurement:
139 """
140 Measure uptime SLA
142 Args:
143 start_time: Start of measurement period
144 end_time: End of measurement period
146 Returns:
147 SLAMeasurement for uptime
148 """
149 with tracer.start_as_current_span("sla.measure_uptime") as span:
150 # Get uptime target
151 uptime_target = next((t for t in self.sla_targets if t.metric == SLAMetric.UPTIME), None)
153 if not uptime_target:
154 msg = "No uptime SLA target configured"
155 raise ValueError(msg)
157 # Calculate total time in period
158 total_seconds = (end_time - start_time).total_seconds()
160 # Query Prometheus for actual downtime
161 try:
162 prometheus = await get_prometheus_client()
163 timerange = f"{int(total_seconds / 86400)}d" # Convert to days
164 downtime_seconds = await prometheus.query_downtime(timerange=timerange)
165 except Exception as e:
166 logger.warning(f"Failed to query Prometheus for downtime: {e}")
167 downtime_seconds = 0 # Fallback to zero if query fails
169 # Calculate uptime percentage
170 uptime_seconds = total_seconds - downtime_seconds
171 uptime_percentage = (uptime_seconds / total_seconds * 100) if total_seconds > 0 else 0
173 # Calculate compliance percentage
174 compliance_percentage = (
175 (uptime_percentage / uptime_target.target_value * 100) if uptime_target.target_value > 0 else 0
176 )
178 # Determine status
179 status = self._determine_status(uptime_percentage, uptime_target, is_higher_better=True)
181 # Breach details
182 breach_details = None
183 if status == SLAStatus.BREACH:
184 breach_details = {
185 "target": uptime_target.target_value,
186 "actual": uptime_percentage,
187 "shortfall": uptime_target.target_value - uptime_percentage,
188 "downtime_seconds": downtime_seconds,
189 "downtime_minutes": downtime_seconds / 60,
190 }
192 measurement = SLAMeasurement(
193 metric=SLAMetric.UPTIME,
194 measured_value=uptime_percentage,
195 target_value=uptime_target.target_value,
196 unit=uptime_target.unit,
197 status=status,
198 compliance_percentage=compliance_percentage,
199 timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
200 period_start=start_time.isoformat().replace("+00:00", "Z"),
201 period_end=end_time.isoformat().replace("+00:00", "Z"),
202 breach_details=breach_details,
203 )
205 span.set_attribute("uptime_percentage", uptime_percentage)
206 span.set_attribute("status", status.value)
208 logger.info(
209 "Uptime SLA measured",
210 extra={
211 "uptime_percentage": uptime_percentage,
212 "target": uptime_target.target_value,
213 "status": status.value,
214 },
215 )
217 return measurement
219 async def measure_response_time(self, start_time: datetime, end_time: datetime, percentile: int = 95) -> SLAMeasurement:
220 """
221 Measure response time SLA
223 Args:
224 start_time: Start of measurement period
225 end_time: End of measurement period
226 percentile: Percentile to measure (50, 95, 99)
228 Returns:
229 SLAMeasurement for response time
230 """
231 with tracer.start_as_current_span("sla.measure_response_time") as span:
232 span.set_attribute("percentile", percentile)
234 # Get response time target
235 rt_target = next(
236 (t for t in self.sla_targets if t.metric == SLAMetric.RESPONSE_TIME),
237 None,
238 )
240 if not rt_target: 240 ↛ 241line 240 didn't jump to line 241 because the condition on line 240 was never true
241 msg = "No response time SLA target configured"
242 raise ValueError(msg)
244 # Query Prometheus for actual response times
245 try:
246 prometheus = await get_prometheus_client()
247 timerange_hours = int((end_time - start_time).total_seconds() / 3600)
248 timerange = f"{max(1, timerange_hours)}h" # At least 1 hour
249 percentiles = await prometheus.query_percentiles(
250 metric="http_request_duration_seconds", percentiles=[percentile], timerange=timerange
251 )
252 response_time_ms = percentiles.get(percentile, 0) * 1000 # Convert seconds to milliseconds
253 except Exception as e:
254 logger.warning(f"Failed to query Prometheus for response times: {e}")
255 response_time_ms = 350 # Fallback to conservative estimate
257 # Calculate compliance percentage
258 compliance_percentage = (rt_target.target_value / response_time_ms * 100) if response_time_ms > 0 else 100
260 # Determine status
261 status = self._determine_status(response_time_ms, rt_target, is_higher_better=False)
263 # Breach details
264 breach_details = None
265 if status == SLAStatus.BREACH: 265 ↛ 266line 265 didn't jump to line 266 because the condition on line 265 was never true
266 breach_details = {
267 "target": rt_target.target_value,
268 "actual": response_time_ms,
269 "overage": response_time_ms - rt_target.target_value,
270 "percentile": f"p{percentile}",
271 }
273 measurement = SLAMeasurement(
274 metric=SLAMetric.RESPONSE_TIME,
275 measured_value=response_time_ms,
276 target_value=rt_target.target_value,
277 unit=rt_target.unit,
278 status=status,
279 compliance_percentage=compliance_percentage,
280 timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
281 period_start=start_time.isoformat().replace("+00:00", "Z"),
282 period_end=end_time.isoformat().replace("+00:00", "Z"),
283 breach_details=breach_details,
284 )
286 span.set_attribute("response_time_ms", response_time_ms)
287 span.set_attribute("status", status.value)
289 logger.info(
290 f"Response time SLA measured (p{percentile})",
291 extra={
292 "response_time_ms": response_time_ms,
293 "target": rt_target.target_value,
294 "status": status.value,
295 },
296 )
298 return measurement
300 async def measure_error_rate(self, start_time: datetime, end_time: datetime) -> SLAMeasurement:
301 """
302 Measure error rate SLA
304 Args:
305 start_time: Start of measurement period
306 end_time: End of measurement period
308 Returns:
309 SLAMeasurement for error rate
310 """
311 with tracer.start_as_current_span("sla.measure_error_rate") as span:
312 # Get error rate target
313 error_target = next((t for t in self.sla_targets if t.metric == SLAMetric.ERROR_RATE), None)
315 if not error_target: 315 ↛ 316line 315 didn't jump to line 316 because the condition on line 315 was never true
316 msg = "No error rate SLA target configured"
317 raise ValueError(msg)
319 # Query Prometheus for actual error rate
320 try:
321 prometheus = await get_prometheus_client()
322 timerange_mins = int((end_time - start_time).total_seconds() / 60)
323 timerange = f"{max(5, timerange_mins)}m" # At least 5 minutes
324 error_rate_percentage = await prometheus.query_error_rate(timerange=timerange)
325 except Exception as e:
326 logger.warning(f"Failed to query Prometheus for error rate: {e}")
327 error_rate_percentage = 0.5 # Fallback to conservative estimate
329 # Calculate compliance percentage
330 compliance_percentage = (
331 (error_target.target_value / error_rate_percentage * 100) if error_rate_percentage > 0 else 100
332 )
334 # Determine status
335 status = self._determine_status(error_rate_percentage, error_target, is_higher_better=False)
337 # Breach details
338 breach_details = None
339 if status == SLAStatus.BREACH: 339 ↛ 340line 339 didn't jump to line 340 because the condition on line 339 was never true
340 breach_details = {
341 "target": error_target.target_value,
342 "actual": error_rate_percentage,
343 "overage": error_rate_percentage - error_target.target_value,
344 }
346 measurement = SLAMeasurement(
347 metric=SLAMetric.ERROR_RATE,
348 measured_value=error_rate_percentage,
349 target_value=error_target.target_value,
350 unit=error_target.unit,
351 status=status,
352 compliance_percentage=compliance_percentage,
353 timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
354 period_start=start_time.isoformat().replace("+00:00", "Z"),
355 period_end=end_time.isoformat().replace("+00:00", "Z"),
356 breach_details=breach_details,
357 )
359 span.set_attribute("error_rate_percentage", error_rate_percentage)
360 span.set_attribute("status", status.value)
362 logger.info(
363 "Error rate SLA measured",
364 extra={
365 "error_rate_percentage": error_rate_percentage,
366 "target": error_target.target_value,
367 "status": status.value,
368 },
369 )
371 return measurement
373 def _determine_status(self, measured_value: float, target: SLATarget, is_higher_better: bool) -> SLAStatus:
374 """
375 Determine SLA status based on measured value and target
377 Args:
378 measured_value: Measured value
379 target: SLA target
380 is_higher_better: True if higher values are better (uptime), False otherwise
382 Returns:
383 SLAStatus
384 """
385 if is_higher_better:
386 # Higher is better (e.g., uptime)
387 if measured_value >= target.target_value:
388 return SLAStatus.MEETING
389 elif measured_value >= target.warning_threshold:
390 return SLAStatus.AT_RISK
391 else:
392 return SLAStatus.BREACH
393 else:
394 # Lower is better (e.g., response time, error rate)
395 if measured_value <= target.target_value:
396 return SLAStatus.MEETING
397 elif measured_value <= target.warning_threshold:
398 return SLAStatus.AT_RISK
399 else:
400 return SLAStatus.BREACH
402 async def generate_sla_report(self, period_days: int = 30) -> SLAReport:
403 """
404 Generate comprehensive SLA report
406 Args:
407 period_days: Number of days to report on
409 Returns:
410 SLAReport with all measurements
411 """
412 with tracer.start_as_current_span("sla.generate_report") as span:
413 span.set_attribute("period_days", period_days)
415 end_time = datetime.now(UTC)
416 start_time = end_time - timedelta(days=period_days)
418 measurements = []
420 # Measure only metrics that have configured targets
421 # Check which metrics are configured
422 configured_metrics = {t.metric for t in self.sla_targets}
424 # Measure uptime if configured
425 uptime = None
426 if SLAMetric.UPTIME in configured_metrics: 426 ↛ 431line 426 didn't jump to line 431 because the condition on line 426 was always true
427 uptime = await self.measure_uptime(start_time, end_time)
428 measurements.append(uptime)
430 # Measure response time if configured
431 response_time = None
432 if SLAMetric.RESPONSE_TIME in configured_metrics:
433 response_time = await self.measure_response_time(start_time, end_time)
434 measurements.append(response_time)
436 # Measure error rate if configured
437 error_rate = None
438 if SLAMetric.ERROR_RATE in configured_metrics:
439 error_rate = await self.measure_error_rate(start_time, end_time)
440 measurements.append(error_rate)
442 # Count breaches and warnings
443 breaches = sum(1 for m in measurements if m.status == SLAStatus.BREACH)
444 warnings = sum(1 for m in measurements if m.status == SLAStatus.AT_RISK)
446 # Determine overall status
447 if breaches > 0: 447 ↛ 449line 447 didn't jump to line 449 because the condition on line 447 was always true
448 overall_status = SLAStatus.BREACH
449 elif warnings > 0:
450 overall_status = SLAStatus.AT_RISK
451 else:
452 overall_status = SLAStatus.MEETING
454 # Calculate overall compliance score (average of all measurements)
455 compliance_score = sum(m.compliance_percentage for m in measurements) / len(measurements) if measurements else 0.0
457 # Generate summary (only include measured metrics)
458 summary = {
459 "all_slas_met": overall_status == SLAStatus.MEETING,
460 "breaches": [
461 {
462 "metric": m.metric.value,
463 "target": m.target_value,
464 "actual": m.measured_value,
465 "details": m.breach_details,
466 }
467 for m in measurements
468 if m.status == SLAStatus.BREACH
469 ],
470 }
472 # Add measured values to summary
473 if uptime is not None: 473 ↛ 475line 473 didn't jump to line 475 because the condition on line 473 was always true
474 summary["uptime_percentage"] = uptime.measured_value
475 if response_time is not None:
476 summary["response_time_p95_ms"] = response_time.measured_value
477 if error_rate is not None:
478 summary["error_rate_percentage"] = error_rate.measured_value
480 report = SLAReport(
481 report_id=f"sla_{datetime.now(UTC).strftime('%Y%m%d_%H%M%S')}",
482 generated_at=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
483 period_start=start_time.isoformat().replace("+00:00", "Z"),
484 period_end=end_time.isoformat().replace("+00:00", "Z"),
485 measurements=measurements,
486 overall_status=overall_status,
487 breaches=breaches,
488 warnings=warnings,
489 compliance_score=compliance_score,
490 summary=summary,
491 )
493 logger.info(
494 "SLA report generated",
495 extra={
496 "report_id": report.report_id,
497 "overall_status": overall_status.value,
498 "compliance_score": compliance_score,
499 "breaches": breaches,
500 },
501 )
503 # Track metrics
504 metrics.successful_calls.add(1, {"operation": "sla_report_generation"})
506 # Alert on breaches
507 if overall_status == SLAStatus.BREACH: 507 ↛ 514line 507 didn't jump to line 514 because the condition on line 507 was always true
508 await self._send_sla_alert(
509 severity="critical",
510 message=f"SLA breach detected: {breaches} metric(s) breached",
511 details=summary,
512 )
514 return report
516 async def _send_sla_alert(self, severity: str, message: str, details: dict[str, Any]) -> None:
517 """
518 Send SLA alert
520 Args:
521 severity: Alert severity (warning, critical)
522 message: Alert message
523 details: Alert details
524 """
525 logger.warning(
526 f"SLA Alert [{severity.upper()}]: {message}",
527 extra={"severity": severity, "details": details},
528 )
530 # Send to alerting system (PagerDuty, Slack, email)
531 try:
532 alerting_service = AlertingService()
533 await alerting_service.initialize()
535 # Map severity string to AlertSeverity enum
536 alert_severity = AlertSeverity.CRITICAL if severity == "critical" else AlertSeverity.WARNING # type: ignore[attr-defined]
538 from mcp_server_langgraph.integrations.alerting import AlertCategory
540 alert = Alert(
541 title=f"SLA {severity.upper()}: {message}",
542 description=message,
543 severity=alert_severity,
544 category=AlertCategory.SLA,
545 source="sla_monitor",
546 metadata=details,
547 )
549 await alerting_service.send_alert(alert)
550 logger.info("SLA alert sent successfully", extra={"alert_id": alert.alert_id})
552 except Exception as e:
553 logger.error(f"Failed to send SLA alert: {e}", exc_info=True)
556# Global SLA monitor instance
557_sla_monitor: SLAMonitor | None = None
560def get_sla_monitor() -> SLAMonitor:
561 """
562 Get or create global SLA monitor instance
564 Returns:
565 SLAMonitor instance
566 """
567 global _sla_monitor
569 if _sla_monitor is None:
570 _sla_monitor = SLAMonitor()
572 return _sla_monitor
575def set_sla_monitor(monitor: SLAMonitor) -> None:
576 """
577 Set global SLA monitor instance
579 Args:
580 monitor: SLAMonitor instance to use globally
581 """
582 global _sla_monitor
583 _sla_monitor = monitor