Coverage for src / mcp_server_langgraph / integrations / alerting.py: 37%

238 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Alerting system integration for production monitoring and incident management. 

3 

4Supports multiple alerting providers: 

5- PagerDuty (incidents, on-call routing) 

6- Slack (webhooks, channels) 

7- Email (SendGrid, AWS SES) 

8- SMS (Twilio) 

9 

10Features: 

11- Configurable alert routing by severity 

12- Alert deduplication and grouping 

13- Alert history and analytics 

14- Retry logic with exponential backoff 

15- Graceful degradation on provider failures 

16 

17Resolves production TODOs: 

18- schedulers/compliance.py:418 - Compliance scheduler alerts 

19- schedulers/compliance.py:433 - Security team notifications 

20- schedulers/compliance.py:452 - Compliance team notifications 

21- monitoring/sla.py:505 - SLA breach alerts 

22- auth/hipaa.py:183 - HIPAA security alerts 

23""" 

24 

25import asyncio 

26import hashlib 

27import logging 

28from abc import ABC, abstractmethod 

29from dataclasses import dataclass, field 

30from datetime import datetime, timedelta, UTC 

31from enum import Enum 

32from typing import Any 

33 

34import httpx 

35from pydantic import BaseModel, Field 

36 

37from mcp_server_langgraph.core.config import settings 

38 

39logger = logging.getLogger(__name__) 

40 

41 

42class AlertSeverity(str, Enum): 

43 """Alert severity levels matching industry standards""" 

44 

45 CRITICAL = "critical" # Production down, data loss, security breach 

46 HIGH = "high" # Major feature broken, significant degradation 

47 MEDIUM = "medium" # Feature partially broken, minor degradation 

48 LOW = "low" # Minor issue, cosmetic problem 

49 INFO = "info" # Informational, no action required 

50 

51 

52class AlertCategory(str, Enum): 

53 """Alert categories for routing and filtering""" 

54 

55 SECURITY = "security" # HIPAA violations, unauthorized access 

56 COMPLIANCE = "compliance" # GDPR, SOC2 violations 

57 SLA = "sla" # SLA breaches, uptime issues 

58 PERFORMANCE = "performance" # High latency, resource exhaustion 

59 ERROR = "error" # Application errors, exceptions 

60 INFRASTRUCTURE = "infrastructure" # Pod crashes, network issues 

61 

62 

63@dataclass 

64class Alert: 

65 """Alert data structure""" 

66 

67 title: str 

68 description: str 

69 severity: AlertSeverity 

70 category: AlertCategory 

71 source: str # Component that generated the alert 

72 timestamp: datetime = field(default_factory=lambda: datetime.min) # Sentinel, set in __post_init__ 

73 metadata: dict[str, Any] = field(default_factory=dict) 

74 dedupe_key: str | None = None # For deduplication 

75 alert_id: str = "" # Sentinel, set in __post_init__ for freezegun compatibility # nosec B324 

76 

77 def __post_init__(self) -> None: 

78 """Generate timestamps and deduplication key if not provided. Ensures freezegun compatibility.""" 

79 # Set timestamp if sentinel value (moved from default_factory for freezegun compatibility) 

80 if self.timestamp == datetime.min: 80 ↛ 84line 80 didn't jump to line 84 because the condition on line 80 was always true

81 self.timestamp = datetime.now(UTC) 

82 

83 # Set alert_id if empty sentinel (moved from default_factory for freezegun compatibility) 

84 if not self.alert_id: 84 ↛ 89line 84 didn't jump to line 89 because the condition on line 84 was always true

85 # MD5 used for non-cryptographic ID generation only 

86 self.alert_id = hashlib.md5(str(datetime.now(UTC)).encode(), usedforsecurity=False).hexdigest()[:16] # nosec B324 

87 

88 # Generate dedupe_key if not provided 

89 if self.dedupe_key is None: 89 ↛ exitline 89 didn't return from function '__post_init__' because the condition on line 89 was always true

90 # Generate hash from title, source, and category 

91 # MD5 used for non-cryptographic deduplication only 

92 content = f"{self.title}:{self.source}:{self.category}" 

93 self.dedupe_key = hashlib.md5(content.encode(), usedforsecurity=False).hexdigest() # nosec B324 

94 

95 def to_dict(self) -> dict[str, Any]: 

96 """Convert to dictionary for serialization""" 

97 return { 

98 "title": self.title, 

99 "description": self.description, 

100 "severity": self.severity.value, 

101 "category": self.category.value, 

102 "source": self.source, 

103 "timestamp": self.timestamp.isoformat(), 

104 "metadata": self.metadata, 

105 "dedupe_key": self.dedupe_key, 

106 } 

107 

108 

109class AlertProvider(ABC): 

110 """Abstract base class for alert providers""" 

111 

112 @abstractmethod 

113 async def send_alert(self, alert: Alert) -> bool: 

114 """ 

115 Send alert to provider. 

116 

117 Returns: 

118 True if successful, False otherwise 

119 """ 

120 

121 @abstractmethod 

122 async def close(self) -> None: 

123 """Cleanup resources""" 

124 

125 

126class PagerDutyProvider(AlertProvider): 

127 """PagerDuty integration for incident management""" 

128 

129 def __init__( 

130 self, 

131 integration_key: str, 

132 severity_mapping: dict[str, str] | None = None, 

133 ): 

134 self.integration_key = integration_key 

135 self.api_url = "https://events.pagerduty.com/v2/enqueue" 

136 self.severity_mapping = severity_mapping or { 

137 AlertSeverity.CRITICAL: "critical", 

138 AlertSeverity.HIGH: "error", 

139 AlertSeverity.MEDIUM: "warning", 

140 AlertSeverity.LOW: "info", 

141 AlertSeverity.INFO: "info", 

142 } 

143 self.client = httpx.AsyncClient(timeout=30.0) 

144 

145 async def send_alert(self, alert: Alert) -> bool: 

146 """Send alert to PagerDuty Events API v2""" 

147 try: 

148 payload = { 

149 "routing_key": self.integration_key, 

150 "event_action": "trigger", 

151 "dedup_key": alert.dedupe_key, 

152 "payload": { 

153 "summary": alert.title, 

154 "severity": self.severity_mapping.get(alert.severity, "error"), 

155 "source": alert.source, 

156 "timestamp": alert.timestamp.isoformat(), 

157 "custom_details": { 

158 "description": alert.description, 

159 "category": alert.category.value, 

160 **alert.metadata, 

161 }, 

162 }, 

163 } 

164 

165 response = await self.client.post(self.api_url, json=payload) 

166 response.raise_for_status() 

167 

168 logger.info( 

169 f"PagerDuty alert sent successfully: {alert.title}", 

170 extra={"alert_severity": alert.severity.value, "dedupe_key": alert.dedupe_key}, 

171 ) 

172 return True 

173 

174 except Exception as e: 

175 logger.error( 

176 f"Failed to send PagerDuty alert: {e}", 

177 exc_info=True, 

178 extra={"alert_title": alert.title, "severity": alert.severity.value}, 

179 ) 

180 return False 

181 

182 async def close(self) -> None: 

183 """Close HTTP client""" 

184 await self.client.aclose() 

185 

186 

187class SlackProvider(AlertProvider): 

188 """Slack webhook integration for team notifications""" 

189 

190 def __init__( 

191 self, 

192 webhook_url: str, 

193 channel: str | None = None, 

194 mention_on_critical: str | None = None, 

195 ): 

196 self.webhook_url = webhook_url 

197 self.channel = channel 

198 self.mention_on_critical = mention_on_critical # e.g., "@oncall" 

199 self.client = httpx.AsyncClient(timeout=30.0) 

200 

201 # Emoji mapping for severity 

202 self.severity_emoji = { 

203 AlertSeverity.CRITICAL: ":rotating_light:", 

204 AlertSeverity.HIGH: ":red_circle:", 

205 AlertSeverity.MEDIUM: ":large_orange_diamond:", 

206 AlertSeverity.LOW: ":large_blue_diamond:", 

207 AlertSeverity.INFO: ":information_source:", 

208 } 

209 

210 # Color mapping 

211 self.severity_color = { 

212 AlertSeverity.CRITICAL: "#FF0000", 

213 AlertSeverity.HIGH: "#FF6600", 

214 AlertSeverity.MEDIUM: "#FFA500", 

215 AlertSeverity.LOW: "#3AA3E3", 

216 AlertSeverity.INFO: "#808080", 

217 } 

218 

219 async def send_alert(self, alert: Alert) -> bool: 

220 """Send alert to Slack via webhook""" 

221 try: 

222 # Build message text 

223 emoji = self.severity_emoji.get(alert.severity, ":bell:") 

224 text = f"{emoji} *{alert.severity.value.upper()}*: {alert.title}" 

225 

226 if alert.severity == AlertSeverity.CRITICAL and self.mention_on_critical: 

227 text = f"{self.mention_on_critical} {text}" 

228 

229 # Build attachment 

230 attachment = { 

231 "color": self.severity_color.get(alert.severity), 

232 "title": alert.title, 

233 "text": alert.description, 

234 "fields": [ 

235 {"title": "Source", "value": alert.source, "short": True}, 

236 {"title": "Category", "value": alert.category.value, "short": True}, 

237 {"title": "Severity", "value": alert.severity.value.upper(), "short": True}, 

238 {"title": "Time", "value": alert.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC"), "short": True}, 

239 ], 

240 "footer": "MCP Server Alerting", 

241 "ts": int(alert.timestamp.timestamp()), 

242 } 

243 

244 # Add metadata fields 

245 for key, value in alert.metadata.items(): 

246 attachment["fields"].append({"title": key, "value": str(value), "short": True}) # type: ignore[union-attr] 

247 

248 payload = {"text": text, "attachments": [attachment]} 

249 

250 if self.channel: 

251 payload["channel"] = self.channel 

252 

253 response = await self.client.post(self.webhook_url, json=payload) 

254 response.raise_for_status() 

255 

256 logger.info( 

257 f"Slack alert sent successfully: {alert.title}", 

258 extra={"alert_severity": alert.severity.value}, 

259 ) 

260 return True 

261 

262 except Exception as e: 

263 logger.error( 

264 f"Failed to send Slack alert: {e}", 

265 exc_info=True, 

266 extra={"alert_title": alert.title}, 

267 ) 

268 return False 

269 

270 async def close(self) -> None: 

271 """Close HTTP client""" 

272 await self.client.aclose() 

273 

274 

275class EmailProvider(AlertProvider): 

276 """Email alerting via SMTP or API (SendGrid, AWS SES)""" 

277 

278 def __init__( 

279 self, 

280 provider_type: str, # "sendgrid" or "ses" 

281 api_key: str | None = None, 

282 from_email: str = "alerts@example.com", 

283 to_emails: list[str] | None = None, 

284 ): 

285 self.provider_type = provider_type 

286 self.api_key = api_key 

287 self.from_email = from_email 

288 self.to_emails = to_emails or [] 

289 self.client = httpx.AsyncClient(timeout=30.0) 

290 

291 async def send_alert(self, alert: Alert) -> bool: 

292 """Send alert via email""" 

293 if not self.to_emails: 

294 logger.warning("No recipient emails configured, skipping email alert") 

295 return False 

296 

297 try: 

298 subject = f"[{alert.severity.value.upper()}] {alert.title}" 

299 body = self._format_email_body(alert) 

300 

301 if self.provider_type == "sendgrid": 

302 return await self._send_via_sendgrid(subject, body) 

303 elif self.provider_type == "ses": 

304 return await self._send_via_ses(subject, body) 

305 else: 

306 logger.error(f"Unknown email provider: {self.provider_type}") 

307 return False 

308 

309 except Exception as e: 

310 logger.error(f"Failed to send email alert: {e}", exc_info=True) 

311 return False 

312 

313 def _format_email_body(self, alert: Alert) -> str: 

314 """Format alert as HTML email body""" 

315 severity_color = { 

316 AlertSeverity.CRITICAL: "#FF0000", 

317 AlertSeverity.HIGH: "#FF6600", 

318 AlertSeverity.MEDIUM: "#FFA500", 

319 AlertSeverity.LOW: "#3AA3E3", 

320 AlertSeverity.INFO: "#808080", 

321 } 

322 

323 metadata_rows = "".join( 

324 f"<tr><td><strong>{key}</strong></td><td>{value}</td></tr>" for key, value in alert.metadata.items() 

325 ) 

326 

327 return f""" 

328 <html> 

329 <body style="font-family: Arial, sans-serif;"> 

330 <div style="border-left: 4px solid {severity_color.get(alert.severity, "#808080")}; padding-left: 12px;"> 

331 <h2 style="color: {severity_color.get(alert.severity)};">{alert.title}</h2> 

332 <p>{alert.description}</p> 

333 <table style="margin-top: 20px;"> 

334 <tr><td><strong>Severity</strong></td><td>{alert.severity.value.upper()}</td></tr> 

335 <tr><td><strong>Category</strong></td><td>{alert.category.value}</td></tr> 

336 <tr><td><strong>Source</strong></td><td>{alert.source}</td></tr> 

337 <tr><td><strong>Time</strong></td><td>{alert.timestamp.strftime("%Y-%m-%d %H:%M:%S UTC")}</td></tr> 

338 {metadata_rows} 

339 </table> 

340 </div> 

341 <p style="margin-top: 30px; color: #666; font-size: 12px;"> 

342 This is an automated alert from MCP Server monitoring system. 

343 </p> 

344 </body> 

345 </html> 

346 """ 

347 

348 async def _send_via_sendgrid(self, subject: str, body: str) -> bool: 

349 """Send via SendGrid API""" 

350 if not self.api_key: 

351 logger.error("SendGrid API key not configured") 

352 return False 

353 

354 url = "https://api.sendgrid.com/v3/mail/send" 

355 headers = {"Authorization": f"Bearer {self.api_key}", "Content-Type": "application/json"} 

356 

357 payload = { 

358 "personalizations": [{"to": [{"email": email} for email in self.to_emails]}], 

359 "from": {"email": self.from_email}, 

360 "subject": subject, 

361 "content": [{"type": "text/html", "value": body}], 

362 } 

363 

364 response = await self.client.post(url, headers=headers, json=payload) 

365 response.raise_for_status() 

366 return True 

367 

368 async def _send_via_ses(self, subject: str, body: str) -> bool: 

369 """Send via AWS SES (placeholder - requires boto3)""" 

370 logger.warning("AWS SES integration not yet implemented") 

371 return False 

372 

373 async def close(self) -> None: 

374 """Close HTTP client""" 

375 await self.client.aclose() 

376 

377 

378class AlertingConfig(BaseModel): 

379 """Alerting configuration""" 

380 

381 enabled: bool = Field(default=True, description="Enable alerting system") 

382 providers: dict[str, dict[str, Any]] = Field(default_factory=dict, description="Provider configurations") 

383 routing_rules: list[dict[str, Any]] = Field(default_factory=list, description="Alert routing rules") 

384 deduplication_window: int = Field(default=300, description="Deduplication window in seconds") 

385 

386 

387class AlertingService: 

388 """ 

389 Main alerting service coordinating multiple providers. 

390 

391 Usage: 

392 alerting = AlertingService() 

393 await alerting.initialize() 

394 

395 await alerting.send_alert(Alert( 

396 title="SLA Breach Detected", 

397 description="Uptime dropped below 99.9%", 

398 severity=AlertSeverity.HIGH, 

399 category=AlertCategory.SLA, 

400 source="sla_monitor", 

401 metadata={"current_uptime": "99.85%"} 

402 )) 

403 

404 await alerting.close() 

405 """ 

406 

407 def __init__(self, config: AlertingConfig | None = None) -> None: 

408 self.config = config or self._load_config_from_settings() 

409 self.providers: list[AlertProvider] = [] 

410 self.alert_history: list[Alert] = [] 

411 self.dedupe_cache: dict[str, datetime] = {} 

412 self._initialized = False 

413 

414 def _load_config_from_settings(self) -> AlertingConfig: 

415 """Load configuration from application settings""" 

416 # Load provider configurations from settings 

417 providers = {} 

418 

419 if settings.pagerduty_integration_key: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true

420 providers["pagerduty"] = {"integration_key": settings.pagerduty_integration_key} 

421 

422 if settings.slack_webhook_url: 422 ↛ 423line 422 didn't jump to line 423 because the condition on line 422 was never true

423 providers["slack"] = {"webhook_url": settings.slack_webhook_url} 

424 

425 if settings.opsgenie_api_key: 425 ↛ 426line 425 didn't jump to line 426 because the condition on line 425 was never true

426 providers["opsgenie"] = {"api_key": settings.opsgenie_api_key} 

427 

428 if settings.email_smtp_host and settings.email_from_address: 428 ↛ 429line 428 didn't jump to line 429 because the condition on line 428 was never true

429 providers["email"] = { 

430 "smtp_host": settings.email_smtp_host, 

431 "smtp_port": settings.email_smtp_port, # type: ignore[dict-item] 

432 "from_address": settings.email_from_address, 

433 "to_addresses": settings.email_to_addresses.split(",") if settings.email_to_addresses else [], # type: ignore[dict-item] 

434 } 

435 

436 # Enabled if at least one provider is configured 

437 enabled = len(providers) > 0 

438 

439 return AlertingConfig(enabled=enabled, providers=providers, routing_rules=[]) 

440 

441 async def initialize(self) -> None: 

442 """Initialize alerting providers""" 

443 if self._initialized: 443 ↛ 444line 443 didn't jump to line 444 because the condition on line 443 was never true

444 return 

445 

446 if not self.config.enabled: 446 ↛ 451line 446 didn't jump to line 451 because the condition on line 446 was always true

447 logger.info("Alerting system disabled by configuration") 

448 return 

449 

450 # Initialize PagerDuty 

451 pagerduty_config = self.config.providers.get("pagerduty", {}) 

452 if pagerduty_config.get("enabled"): 

453 integration_key = pagerduty_config.get("integration_key") 

454 if integration_key: 

455 provider = PagerDutyProvider( 

456 integration_key=integration_key, 

457 severity_mapping=pagerduty_config.get("severity_mapping"), 

458 ) 

459 self.providers.append(provider) 

460 logger.info("PagerDuty alerting provider initialized") 

461 

462 # Initialize Slack 

463 slack_config = self.config.providers.get("slack", {}) 

464 if slack_config.get("enabled"): 

465 webhook_url = slack_config.get("webhook_url") 

466 if webhook_url: 

467 provider = SlackProvider( # type: ignore[assignment] 

468 webhook_url=webhook_url, 

469 channel=slack_config.get("channel"), 

470 mention_on_critical=slack_config.get("mention_on_critical"), 

471 ) 

472 self.providers.append(provider) 

473 logger.info("Slack alerting provider initialized") 

474 

475 # Initialize Email 

476 email_config = self.config.providers.get("email", {}) 

477 if email_config.get("enabled"): 

478 provider = EmailProvider( # type: ignore[assignment] 

479 provider_type=email_config.get("provider_type", "sendgrid"), 

480 api_key=email_config.get("api_key"), 

481 from_email=email_config.get("from_email", "alerts@example.com"), 

482 to_emails=email_config.get("to_emails", []), 

483 ) 

484 self.providers.append(provider) 

485 logger.info("Email alerting provider initialized") 

486 

487 self._initialized = True 

488 logger.info(f"Alerting service initialized with {len(self.providers)} providers") 

489 

490 async def send_alert(self, alert: Alert) -> dict[str, bool]: 

491 """ 

492 Send alert to all configured providers. 

493 

494 Returns: 

495 Dictionary mapping provider names to success status 

496 """ 

497 if not self._initialized: 497 ↛ 500line 497 didn't jump to line 500 because the condition on line 497 was always true

498 await self.initialize() 

499 

500 if not self.config.enabled: 500 ↛ 505line 500 didn't jump to line 505 because the condition on line 500 was always true

501 logger.debug("Alerting disabled, skipping alert") 

502 return {} 

503 

504 # Check deduplication 

505 if self._is_duplicate(alert): 

506 logger.info( 

507 f"Alert deduplicated: {alert.title}", 

508 extra={"dedupe_key": alert.dedupe_key}, 

509 ) 

510 return {} 

511 

512 # Add to history 

513 self.alert_history.append(alert) 

514 

515 # Update dedupe cache 

516 self.dedupe_cache[alert.dedupe_key] = datetime.now(UTC) # type: ignore[index] 

517 

518 # Send to all providers concurrently 

519 results = {} 

520 if self.providers: 

521 tasks = [provider.send_alert(alert) for provider in self.providers] 

522 provider_results = await asyncio.gather(*tasks, return_exceptions=True) 

523 

524 for i, result in enumerate(provider_results): 

525 provider_name = self.providers[i].__class__.__name__ 

526 results[provider_name] = isinstance(result, bool) and result 

527 

528 logger.info( 

529 f"Alert sent: {alert.title}", 

530 extra={ 

531 "severity": alert.severity.value, 

532 "category": alert.category.value, 

533 "providers": len(self.providers), 

534 "results": results, 

535 }, 

536 ) 

537 

538 return results 

539 

540 def _is_duplicate(self, alert: Alert) -> bool: 

541 """Check if alert is a duplicate within deduplication window""" 

542 if alert.dedupe_key not in self.dedupe_cache: 

543 return False 

544 

545 last_sent = self.dedupe_cache[alert.dedupe_key] 

546 window = timedelta(seconds=self.config.deduplication_window) 

547 

548 return datetime.now(UTC) - last_sent < window 

549 

550 def get_alert_history(self, limit: int = 100) -> list[Alert]: 

551 """Get recent alert history""" 

552 return self.alert_history[-limit:] 

553 

554 def get_alert_statistics(self) -> dict[str, Any]: 

555 """Get alerting statistics""" 

556 if not self.alert_history: 

557 return {"total_alerts": 0} 

558 

559 severity_counts = {} # type: ignore[var-annotated] 

560 category_counts = {} # type: ignore[var-annotated] 

561 

562 for alert in self.alert_history: 

563 severity_counts[alert.severity.value] = severity_counts.get(alert.severity.value, 0) + 1 

564 category_counts[alert.category.value] = category_counts.get(alert.category.value, 0) + 1 

565 

566 return { 

567 "total_alerts": len(self.alert_history), 

568 "by_severity": severity_counts, 

569 "by_category": category_counts, 

570 "providers_active": len(self.providers), 

571 "deduplication_cache_size": len(self.dedupe_cache), 

572 } 

573 

574 async def close(self) -> None: 

575 """Cleanup all providers""" 

576 for provider in self.providers: 

577 await provider.close() 

578 

579 logger.info("Alerting service closed") 

580 

581 

582# Global alerting service instance 

583_alerting_service: AlertingService | None = None 

584 

585 

586async def get_alerting_service() -> AlertingService: 

587 """Get or create global alerting service instance""" 

588 global _alerting_service 

589 

590 if _alerting_service is None: 

591 _alerting_service = AlertingService() 

592 await _alerting_service.initialize() 

593 

594 return _alerting_service 

595 

596 

597async def send_alert( 

598 title: str, 

599 description: str, 

600 severity: AlertSeverity, 

601 category: AlertCategory, 

602 source: str, 

603 metadata: dict[str, Any] | None = None, 

604) -> dict[str, bool]: 

605 """ 

606 Convenience function to send an alert. 

607 

608 Usage: 

609 await send_alert( 

610 title="Database Connection Lost", 

611 description="Unable to connect to PostgreSQL", 

612 severity=AlertSeverity.CRITICAL, 

613 category=AlertCategory.INFRASTRUCTURE, 

614 source="database_health_check", 

615 metadata={"host": "postgres-primary", "error": "Connection refused"} 

616 ) 

617 """ 

618 service = await get_alerting_service() 

619 

620 alert = Alert( 

621 title=title, 

622 description=description, 

623 severity=severity, 

624 category=category, 

625 source=source, 

626 metadata=metadata or {}, 

627 ) 

628 

629 return await service.send_alert(alert)