Coverage for src / mcp_server_langgraph / integrations / alerting.py: 37%
238 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Alerting system integration for production monitoring and incident management.
4Supports multiple alerting providers:
5- PagerDuty (incidents, on-call routing)
6- Slack (webhooks, channels)
7- Email (SendGrid, AWS SES)
8- SMS (Twilio)
10Features:
11- Configurable alert routing by severity
12- Alert deduplication and grouping
13- Alert history and analytics
14- Retry logic with exponential backoff
15- Graceful degradation on provider failures
17Resolves production TODOs:
18- schedulers/compliance.py:418 - Compliance scheduler alerts
19- schedulers/compliance.py:433 - Security team notifications
20- schedulers/compliance.py:452 - Compliance team notifications
21- monitoring/sla.py:505 - SLA breach alerts
22- auth/hipaa.py:183 - HIPAA security alerts
23"""
25import asyncio
26import hashlib
27import logging
28from abc import ABC, abstractmethod
29from dataclasses import dataclass, field
30from datetime import datetime, timedelta, UTC
31from enum import Enum
32from typing import Any
34import httpx
35from pydantic import BaseModel, Field
37from mcp_server_langgraph.core.config import settings
39logger = logging.getLogger(__name__)
42class AlertSeverity(str, Enum):
43 """Alert severity levels matching industry standards"""
45 CRITICAL = "critical" # Production down, data loss, security breach
46 HIGH = "high" # Major feature broken, significant degradation
47 MEDIUM = "medium" # Feature partially broken, minor degradation
48 LOW = "low" # Minor issue, cosmetic problem
49 INFO = "info" # Informational, no action required
52class AlertCategory(str, Enum):
53 """Alert categories for routing and filtering"""
55 SECURITY = "security" # HIPAA violations, unauthorized access
56 COMPLIANCE = "compliance" # GDPR, SOC2 violations
57 SLA = "sla" # SLA breaches, uptime issues
58 PERFORMANCE = "performance" # High latency, resource exhaustion
59 ERROR = "error" # Application errors, exceptions
60 INFRASTRUCTURE = "infrastructure" # Pod crashes, network issues
63@dataclass
64class Alert:
65 """Alert data structure"""
67 title: str
68 description: str
69 severity: AlertSeverity
70 category: AlertCategory
71 source: str # Component that generated the alert
72 timestamp: datetime = field(default_factory=lambda: datetime.min) # Sentinel, set in __post_init__
73 metadata: dict[str, Any] = field(default_factory=dict)
74 dedupe_key: str | None = None # For deduplication
75 alert_id: str = "" # Sentinel, set in __post_init__ for freezegun compatibility # nosec B324
77 def __post_init__(self) -> None:
78 """Generate timestamps and deduplication key if not provided. Ensures freezegun compatibility."""
79 # Set timestamp if sentinel value (moved from default_factory for freezegun compatibility)
80 if self.timestamp == datetime.min: 80 ↛ 84line 80 didn't jump to line 84 because the condition on line 80 was always true
81 self.timestamp = datetime.now(UTC)
83 # Set alert_id if empty sentinel (moved from default_factory for freezegun compatibility)
84 if not self.alert_id: 84 ↛ 89line 84 didn't jump to line 89 because the condition on line 84 was always true
85 # MD5 used for non-cryptographic ID generation only
86 self.alert_id = hashlib.md5(str(datetime.now(UTC)).encode(), usedforsecurity=False).hexdigest()[:16] # nosec B324
88 # Generate dedupe_key if not provided
89 if self.dedupe_key is None: 89 ↛ exitline 89 didn't return from function '__post_init__' because the condition on line 89 was always true
90 # Generate hash from title, source, and category
91 # MD5 used for non-cryptographic deduplication only
92 content = f"{self.title}:{self.source}:{self.category}"
93 self.dedupe_key = hashlib.md5(content.encode(), usedforsecurity=False).hexdigest() # nosec B324
95 def to_dict(self) -> dict[str, Any]:
96 """Convert to dictionary for serialization"""
97 return {
98 "title": self.title,
99 "description": self.description,
100 "severity": self.severity.value,
101 "category": self.category.value,
102 "source": self.source,
103 "timestamp": self.timestamp.isoformat(),
104 "metadata": self.metadata,
105 "dedupe_key": self.dedupe_key,
106 }
109class AlertProvider(ABC):
110 """Abstract base class for alert providers"""
112 @abstractmethod
113 async def send_alert(self, alert: Alert) -> bool:
114 """
115 Send alert to provider.
117 Returns:
118 True if successful, False otherwise
119 """
121 @abstractmethod
122 async def close(self) -> None:
123 """Cleanup resources"""
126class PagerDutyProvider(AlertProvider):
127 """PagerDuty integration for incident management"""
129 def __init__(
130 self,
131 integration_key: str,
132 severity_mapping: dict[str, str] | None = None,
133 ):
134 self.integration_key = integration_key
135 self.api_url = "https://events.pagerduty.com/v2/enqueue"
136 self.severity_mapping = severity_mapping or {
137 AlertSeverity.CRITICAL: "critical",
138 AlertSeverity.HIGH: "error",
139 AlertSeverity.MEDIUM: "warning",
140 AlertSeverity.LOW: "info",
141 AlertSeverity.INFO: "info",
142 }
143 self.client = httpx.AsyncClient(timeout=30.0)
145 async def send_alert(self, alert: Alert) -> bool:
146 """Send alert to PagerDuty Events API v2"""
147 try:
148 payload = {
149 "routing_key": self.integration_key,
150 "event_action": "trigger",
151 "dedup_key": alert.dedupe_key,
152 "payload": {
153 "summary": alert.title,
154 "severity": self.severity_mapping.get(alert.severity, "error"),
155 "source": alert.source,
156 "timestamp": alert.timestamp.isoformat(),
157 "custom_details": {
158 "description": alert.description,
159 "category": alert.category.value,
160 **alert.metadata,
161 },
162 },
163 }
165 response = await self.client.post(self.api_url, json=payload)
166 response.raise_for_status()
168 logger.info(
169 f"PagerDuty alert sent successfully: {alert.title}",
170 extra={"alert_severity": alert.severity.value, "dedupe_key": alert.dedupe_key},
171 )
172 return True
174 except Exception as e:
175 logger.error(
176 f"Failed to send PagerDuty alert: {e}",
177 exc_info=True,
178 extra={"alert_title": alert.title, "severity": alert.severity.value},
179 )
180 return False
182 async def close(self) -> None:
183 """Close HTTP client"""
184 await self.client.aclose()
187class SlackProvider(AlertProvider):
188 """Slack webhook integration for team notifications"""
190 def __init__(
191 self,
192 webhook_url: str,
193 channel: str | None = None,
194 mention_on_critical: str | None = None,
195 ):
196 self.webhook_url = webhook_url
197 self.channel = channel
198 self.mention_on_critical = mention_on_critical # e.g., "@oncall"
199 self.client = httpx.AsyncClient(timeout=30.0)
201 # Emoji mapping for severity
202 self.severity_emoji = {
203 AlertSeverity.CRITICAL: ":rotating_light:",
204 AlertSeverity.HIGH: ":red_circle:",
205 AlertSeverity.MEDIUM: ":large_orange_diamond:",
206 AlertSeverity.LOW: ":large_blue_diamond:",
207 AlertSeverity.INFO: ":information_source:",
208 }
210 # Color mapping
211 self.severity_color = {
212 AlertSeverity.CRITICAL: "#FF0000",
213 AlertSeverity.HIGH: "#FF6600",
214 AlertSeverity.MEDIUM: "#FFA500",
215 AlertSeverity.LOW: "#3AA3E3",
216 AlertSeverity.INFO: "#808080",
217 }
219 async def send_alert(self, alert: Alert) -> bool:
220 """Send alert to Slack via webhook"""
221 try:
222 # Build message text
223 emoji = self.severity_emoji.get(alert.severity, ":bell:")
224 text = f"{emoji} *{alert.severity.value.upper()}*: {alert.title}"
226 if alert.severity == AlertSeverity.CRITICAL and self.mention_on_critical:
227 text = f"{self.mention_on_critical} {text}"
229 # Build attachment
230 attachment = {
231 "color": self.severity_color.get(alert.severity),
232 "title": alert.title,
233 "text": alert.description,
234 "fields": [
235 {"title": "Source", "value": alert.source, "short": True},
236 {"title": "Category", "value": alert.category.value, "short": True},
237 {"title": "Severity", "value": alert.severity.value.upper(), "short": True},
238 {"title": "Time", "value": alert.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC"), "short": True},
239 ],
240 "footer": "MCP Server Alerting",
241 "ts": int(alert.timestamp.timestamp()),
242 }
244 # Add metadata fields
245 for key, value in alert.metadata.items():
246 attachment["fields"].append({"title": key, "value": str(value), "short": True}) # type: ignore[union-attr]
248 payload = {"text": text, "attachments": [attachment]}
250 if self.channel:
251 payload["channel"] = self.channel
253 response = await self.client.post(self.webhook_url, json=payload)
254 response.raise_for_status()
256 logger.info(
257 f"Slack alert sent successfully: {alert.title}",
258 extra={"alert_severity": alert.severity.value},
259 )
260 return True
262 except Exception as e:
263 logger.error(
264 f"Failed to send Slack alert: {e}",
265 exc_info=True,
266 extra={"alert_title": alert.title},
267 )
268 return False
270 async def close(self) -> None:
271 """Close HTTP client"""
272 await self.client.aclose()
275class EmailProvider(AlertProvider):
276 """Email alerting via SMTP or API (SendGrid, AWS SES)"""
278 def __init__(
279 self,
280 provider_type: str, # "sendgrid" or "ses"
281 api_key: str | None = None,
282 from_email: str = "alerts@example.com",
283 to_emails: list[str] | None = None,
284 ):
285 self.provider_type = provider_type
286 self.api_key = api_key
287 self.from_email = from_email
288 self.to_emails = to_emails or []
289 self.client = httpx.AsyncClient(timeout=30.0)
291 async def send_alert(self, alert: Alert) -> bool:
292 """Send alert via email"""
293 if not self.to_emails:
294 logger.warning("No recipient emails configured, skipping email alert")
295 return False
297 try:
298 subject = f"[{alert.severity.value.upper()}] {alert.title}"
299 body = self._format_email_body(alert)
301 if self.provider_type == "sendgrid":
302 return await self._send_via_sendgrid(subject, body)
303 elif self.provider_type == "ses":
304 return await self._send_via_ses(subject, body)
305 else:
306 logger.error(f"Unknown email provider: {self.provider_type}")
307 return False
309 except Exception as e:
310 logger.error(f"Failed to send email alert: {e}", exc_info=True)
311 return False
313 def _format_email_body(self, alert: Alert) -> str:
314 """Format alert as HTML email body"""
315 severity_color = {
316 AlertSeverity.CRITICAL: "#FF0000",
317 AlertSeverity.HIGH: "#FF6600",
318 AlertSeverity.MEDIUM: "#FFA500",
319 AlertSeverity.LOW: "#3AA3E3",
320 AlertSeverity.INFO: "#808080",
321 }
323 metadata_rows = "".join(
324 f"<tr><td><strong>{key}</strong></td><td>{value}</td></tr>" for key, value in alert.metadata.items()
325 )
327 return f"""
328 <html>
329 <body style="font-family: Arial, sans-serif;">
330 <div style="border-left: 4px solid {severity_color.get(alert.severity, "#808080")}; padding-left: 12px;">
331 <h2 style="color: {severity_color.get(alert.severity)};">{alert.title}</h2>
332 <p>{alert.description}</p>
333 <table style="margin-top: 20px;">
334 <tr><td><strong>Severity</strong></td><td>{alert.severity.value.upper()}</td></tr>
335 <tr><td><strong>Category</strong></td><td>{alert.category.value}</td></tr>
336 <tr><td><strong>Source</strong></td><td>{alert.source}</td></tr>
337 <tr><td><strong>Time</strong></td><td>{alert.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC")}</td></tr>
338 {metadata_rows}
339 </table>
340 </div>
341 <p style="margin-top: 30px; color: #666; font-size: 12px;">
342 This is an automated alert from MCP Server monitoring system.
343 </p>
344 </body>
345 </html>
346 """
348 async def _send_via_sendgrid(self, subject: str, body: str) -> bool:
349 """Send via SendGrid API"""
350 if not self.api_key:
351 logger.error("SendGrid API key not configured")
352 return False
354 url = "https://api.sendgrid.com/v3/mail/send"
355 headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"}
357 payload = {
358 "personalizations": [{"to": [{"email": email} for email in self.to_emails]}],
359 "from": {"email": self.from_email},
360 "subject": subject,
361 "content": [{"type": "text/html", "value": body}],
362 }
364 response = await self.client.post(url, headers=headers, json=payload)
365 response.raise_for_status()
366 return True
368 async def _send_via_ses(self, subject: str, body: str) -> bool:
369 """Send via AWS SES (placeholder - requires boto3)"""
370 logger.warning("AWS SES integration not yet implemented")
371 return False
373 async def close(self) -> None:
374 """Close HTTP client"""
375 await self.client.aclose()
378class AlertingConfig(BaseModel):
379 """Alerting configuration"""
381 enabled: bool = Field(default=True, description="Enable alerting system")
382 providers: dict[str, dict[str, Any]] = Field(default_factory=dict, description="Provider configurations")
383 routing_rules: list[dict[str, Any]] = Field(default_factory=list, description="Alert routing rules")
384 deduplication_window: int = Field(default=300, description="Deduplication window in seconds")
387class AlertingService:
388 """
389 Main alerting service coordinating multiple providers.
391 Usage:
392 alerting = AlertingService()
393 await alerting.initialize()
395 await alerting.send_alert(Alert(
396 title="SLA Breach Detected",
397 description="Uptime dropped below 99.9%",
398 severity=AlertSeverity.HIGH,
399 category=AlertCategory.SLA,
400 source="sla_monitor",
401 metadata={"current_uptime": "99.85%"}
402 ))
404 await alerting.close()
405 """
407 def __init__(self, config: AlertingConfig | None = None) -> None:
408 self.config = config or self._load_config_from_settings()
409 self.providers: list[AlertProvider] = []
410 self.alert_history: list[Alert] = []
411 self.dedupe_cache: dict[str, datetime] = {}
412 self._initialized = False
414 def _load_config_from_settings(self) -> AlertingConfig:
415 """Load configuration from application settings"""
416 # Load provider configurations from settings
417 providers = {}
419 if settings.pagerduty_integration_key: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true
420 providers["pagerduty"] = {"integration_key": settings.pagerduty_integration_key}
422 if settings.slack_webhook_url: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true
423 providers["slack"] = {"webhook_url": settings.slack_webhook_url}
425 if settings.opsgenie_api_key: 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true
426 providers["opsgenie"] = {"api_key": settings.opsgenie_api_key}
428 if settings.email_smtp_host and settings.email_from_address: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true
429 providers["email"] = {
430 "smtp_host": settings.email_smtp_host,
431 "smtp_port": settings.email_smtp_port, # type: ignore[dict-item]
432 "from_address": settings.email_from_address,
433 "to_addresses": settings.email_to_addresses.split(",") if settings.email_to_addresses else [], # type: ignore[dict-item]
434 }
436 # Enabled if at least one provider is configured
437 enabled = len(providers) > 0
439 return AlertingConfig(enabled=enabled, providers=providers, routing_rules=[])
441 async def initialize(self) -> None:
442 """Initialize alerting providers"""
443 if self._initialized: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true
444 return
446 if not self.config.enabled: 446 ↛ 451line 446 didn't jump to line 451 because the condition on line 446 was always true
447 logger.info("Alerting system disabled by configuration")
448 return
450 # Initialize PagerDuty
451 pagerduty_config = self.config.providers.get("pagerduty", {})
452 if pagerduty_config.get("enabled"):
453 integration_key = pagerduty_config.get("integration_key")
454 if integration_key:
455 provider = PagerDutyProvider(
456 integration_key=integration_key,
457 severity_mapping=pagerduty_config.get("severity_mapping"),
458 )
459 self.providers.append(provider)
460 logger.info("PagerDuty alerting provider initialized")
462 # Initialize Slack
463 slack_config = self.config.providers.get("slack", {})
464 if slack_config.get("enabled"):
465 webhook_url = slack_config.get("webhook_url")
466 if webhook_url:
467 provider = SlackProvider( # type: ignore[assignment]
468 webhook_url=webhook_url,
469 channel=slack_config.get("channel"),
470 mention_on_critical=slack_config.get("mention_on_critical"),
471 )
472 self.providers.append(provider)
473 logger.info("Slack alerting provider initialized")
475 # Initialize Email
476 email_config = self.config.providers.get("email", {})
477 if email_config.get("enabled"):
478 provider = EmailProvider( # type: ignore[assignment]
479 provider_type=email_config.get("provider_type", "sendgrid"),
480 api_key=email_config.get("api_key"),
481 from_email=email_config.get("from_email", "alerts@example.com"),
482 to_emails=email_config.get("to_emails", []),
483 )
484 self.providers.append(provider)
485 logger.info("Email alerting provider initialized")
487 self._initialized = True
488 logger.info(f"Alerting service initialized with {len(self.providers)} providers")
490 async def send_alert(self, alert: Alert) -> dict[str, bool]:
491 """
492 Send alert to all configured providers.
494 Returns:
495 Dictionary mapping provider names to success status
496 """
497 if not self._initialized: 497 ↛ 500line 497 didn't jump to line 500 because the condition on line 497 was always true
498 await self.initialize()
500 if not self.config.enabled: 500 ↛ 505line 500 didn't jump to line 505 because the condition on line 500 was always true
501 logger.debug("Alerting disabled, skipping alert")
502 return {}
504 # Check deduplication
505 if self._is_duplicate(alert):
506 logger.info(
507 f"Alert deduplicated: {alert.title}",
508 extra={"dedupe_key": alert.dedupe_key},
509 )
510 return {}
512 # Add to history
513 self.alert_history.append(alert)
515 # Update dedupe cache
516 self.dedupe_cache[alert.dedupe_key] = datetime.now(UTC) # type: ignore[index]
518 # Send to all providers concurrently
519 results = {}
520 if self.providers:
521 tasks = [provider.send_alert(alert) for provider in self.providers]
522 provider_results = await asyncio.gather(*tasks, return_exceptions=True)
524 for i, result in enumerate(provider_results):
525 provider_name = self.providers[i].__class__.__name__
526 results[provider_name] = isinstance(result, bool) and result
528 logger.info(
529 f"Alert sent: {alert.title}",
530 extra={
531 "severity": alert.severity.value,
532 "category": alert.category.value,
533 "providers": len(self.providers),
534 "results": results,
535 },
536 )
538 return results
540 def _is_duplicate(self, alert: Alert) -> bool:
541 """Check if alert is a duplicate within deduplication window"""
542 if alert.dedupe_key not in self.dedupe_cache:
543 return False
545 last_sent = self.dedupe_cache[alert.dedupe_key]
546 window = timedelta(seconds=self.config.deduplication_window)
548 return datetime.now(UTC) - last_sent < window
550 def get_alert_history(self, limit: int = 100) -> list[Alert]:
551 """Get recent alert history"""
552 return self.alert_history[-limit:]
554 def get_alert_statistics(self) -> dict[str, Any]:
555 """Get alerting statistics"""
556 if not self.alert_history:
557 return {"total_alerts": 0}
559 severity_counts = {} # type: ignore[var-annotated]
560 category_counts = {} # type: ignore[var-annotated]
562 for alert in self.alert_history:
563 severity_counts[alert.severity.value] = severity_counts.get(alert.severity.value, 0) + 1
564 category_counts[alert.category.value] = category_counts.get(alert.category.value, 0) + 1
566 return {
567 "total_alerts": len(self.alert_history),
568 "by_severity": severity_counts,
569 "by_category": category_counts,
570 "providers_active": len(self.providers),
571 "deduplication_cache_size": len(self.dedupe_cache),
572 }
574 async def close(self) -> None:
575 """Cleanup all providers"""
576 for provider in self.providers:
577 await provider.close()
579 logger.info("Alerting service closed")
582# Global alerting service instance
583_alerting_service: AlertingService | None = None
586async def get_alerting_service() -> AlertingService:
587 """Get or create global alerting service instance"""
588 global _alerting_service
590 if _alerting_service is None:
591 _alerting_service = AlertingService()
592 await _alerting_service.initialize()
594 return _alerting_service
597async def send_alert(
598 title: str,
599 description: str,
600 severity: AlertSeverity,
601 category: AlertCategory,
602 source: str,
603 metadata: dict[str, Any] | None = None,
604) -> dict[str, bool]:
605 """
606 Convenience function to send an alert.
608 Usage:
609 await send_alert(
610 title="Database Connection Lost",
611 description="Unable to connect to PostgreSQL",
612 severity=AlertSeverity.CRITICAL,
613 category=AlertCategory.INFRASTRUCTURE,
614 source="database_health_check",
615 metadata={"host": "postgres-primary", "error": "Connection refused"}
616 )
617 """
618 service = await get_alerting_service()
620 alert = Alert(
621 title=title,
622 description=description,
623 severity=severity,
624 category=category,
625 source=source,
626 metadata=metadata or {},
627 )
629 return await service.send_alert(alert)