Coverage for src / mcp_server_langgraph / schedulers / compliance.py: 80%
180 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2SOC 2 Compliance Scheduler
4Automated scheduling for:
5- Daily compliance checks
6- Weekly access reviews
7- Monthly compliance reports
9Uses APScheduler for cron-based job scheduling.
10"""
12import asyncio
13from datetime import datetime, timedelta, UTC
14from pathlib import Path
15from typing import Any
17from apscheduler.schedulers.asyncio import AsyncIOScheduler
18from apscheduler.triggers.cron import CronTrigger
19from pydantic import BaseModel, Field
21from mcp_server_langgraph.auth.session import SessionStore
22from mcp_server_langgraph.compliance.soc2.evidence import EvidenceCollector
23from mcp_server_langgraph.integrations.alerting import Alert, AlertCategory, AlertingService, AlertSeverity
24from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer
27class AccessReviewItem(BaseModel):
28 """Access review item for a user"""
30 user_id: str
31 username: str
32 roles: list[str]
33 active_sessions: int
34 last_login: str | None = None
35 account_status: str = Field(..., description="active, inactive, locked")
36 review_status: str = Field(default="pending", description="approved, revoked, pending")
37 review_notes: str = Field(default="", description="Review notes")
40class AccessReviewReport(BaseModel):
41 """Weekly access review report"""
43 review_id: str
44 generated_at: str
45 period_start: str
46 period_end: str
47 total_users: int
48 active_users: int
49 inactive_users: int
50 users_reviewed: list[AccessReviewItem] = Field(default_factory=list)
51 recommendations: list[str] = Field(default_factory=list)
52 actions_required: list[str] = Field(default_factory=list)
55class ComplianceScheduler:
56 """
57 SOC 2 compliance automation scheduler
59 Schedules:
60 - Daily compliance checks (6 AM UTC)
61 - Weekly access reviews (Monday 9 AM UTC)
62 - Monthly compliance reports (1st day of month, 9 AM UTC)
63 """
65 def __init__(
66 self,
67 evidence_collector: EvidenceCollector | None = None,
68 session_store: SessionStore | None = None,
69 evidence_dir: Path | None = None,
70 enabled: bool = True,
71 ):
72 """
73 Initialize compliance scheduler
75 Args:
76 evidence_collector: Evidence collector service
77 session_store: Session storage backend
78 evidence_dir: Directory for evidence files
79 enabled: Enable/disable scheduler
80 """
81 self.evidence_collector = evidence_collector or EvidenceCollector(
82 session_store=session_store,
83 evidence_dir=evidence_dir,
84 )
85 self.session_store = session_store
86 self.enabled = enabled
88 self.scheduler = AsyncIOScheduler()
90 logger.info(f"Compliance scheduler initialized (enabled: {enabled})")
92 async def start(self) -> None:
93 """Start the compliance scheduler"""
94 if not self.enabled: 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true
95 logger.info("Compliance scheduler disabled, skipping startup")
96 return
98 with tracer.start_as_current_span("compliance_scheduler.start"):
99 # Daily compliance checks (6 AM UTC)
100 self.scheduler.add_job(
101 self._run_daily_compliance_check,
102 trigger=CronTrigger(hour=6, minute=0, timezone="UTC"),
103 id="daily_compliance_check",
104 max_instances=1,
105 replace_existing=True,
106 )
108 # Weekly access reviews (Monday 9 AM UTC)
109 self.scheduler.add_job(
110 self._run_weekly_access_review,
111 trigger=CronTrigger(day_of_week="mon", hour=9, minute=0, timezone="UTC"),
112 id="weekly_access_review",
113 max_instances=1,
114 replace_existing=True,
115 )
117 # Monthly compliance reports (1st day of month, 9 AM UTC)
118 self.scheduler.add_job(
119 self._run_monthly_compliance_report,
120 trigger=CronTrigger(day=1, hour=9, minute=0, timezone="UTC"),
121 id="monthly_compliance_report",
122 max_instances=1,
123 replace_existing=True,
124 )
126 self.scheduler.start()
128 logger.info(
129 "Compliance scheduler started",
130 extra={
131 "jobs": [
132 "daily_compliance_check",
133 "weekly_access_review",
134 "monthly_compliance_report",
135 ]
136 },
137 )
139 async def stop(self) -> None:
140 """Stop the compliance scheduler"""
141 if self.scheduler.running: 141 ↛ exitline 141 didn't return from function 'stop' because the condition on line 141 was always true
142 # Shutdown the scheduler. Use wait=False since we're in async context
143 # The scheduler.running flag will be set to False immediately
144 self.scheduler.shutdown(wait=False)
146 # Give the scheduler a moment to update its state
147 await asyncio.sleep(0.1)
149 logger.info("Compliance scheduler stopped")
151 async def trigger_daily_check(self) -> dict[str, Any]:
152 """
153 Manually trigger daily compliance check
155 Returns:
156 Report summary
157 """
158 return await self._run_daily_compliance_check()
160 async def trigger_weekly_review(self) -> AccessReviewReport:
161 """
162 Manually trigger weekly access review
164 Returns:
165 Access review report
166 """
167 return await self._run_weekly_access_review()
169 async def trigger_monthly_report(self) -> dict[str, Any]:
170 """
171 Manually trigger monthly compliance report
173 Returns:
174 Compliance report summary
175 """
176 return await self._run_monthly_compliance_report()
178 # --- Scheduled Jobs ---
180 async def _run_daily_compliance_check(self) -> dict[str, Any]:
181 """
182 Run daily compliance check
184 Collects evidence for all SOC 2 controls and generates summary.
185 """
186 with tracer.start_as_current_span("compliance.daily_check") as span:
187 start_time = datetime.now(UTC)
189 try:
190 logger.info("Starting daily compliance check")
192 # Collect all evidence
193 evidence_items = await self.evidence_collector.collect_all_evidence()
195 # Calculate pass/fail rates
196 passed = sum(1 for e in evidence_items if e.status.value == "success")
197 failed = sum(1 for e in evidence_items if e.status.value == "failure")
198 partial = sum(1 for e in evidence_items if e.status.value == "partial")
199 total = len(evidence_items)
201 compliance_score = (passed + (partial * 0.5)) / total * 100 if total > 0 else 0
203 # Collect all findings
204 findings = []
205 for evidence in evidence_items:
206 findings.extend(evidence.findings)
208 summary = {
209 "date": start_time.strftime("%Y-%m-%d"),
210 "evidence_collected": total,
211 "passed_controls": passed,
212 "failed_controls": failed,
213 "partial_controls": partial,
214 "compliance_score": f"{compliance_score:.1f}%",
215 "total_findings": len(findings),
216 "findings": findings,
217 }
219 # Log summary
220 logger.info(
221 "Daily compliance check completed",
222 extra=summary,
223 )
225 # Track metrics
226 metrics.successful_calls.add(1, {"operation": "daily_compliance_check"})
228 # Alert if compliance score is low
229 if compliance_score < 80: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true
230 await self._send_compliance_alert(
231 severity="high",
232 message=f"Daily compliance score below threshold: {compliance_score:.1f}%",
233 details=summary,
234 )
236 span.set_attribute("compliance_score", compliance_score)
237 span.set_attribute("total_findings", len(findings))
239 return summary
241 except Exception as e:
242 logger.error(f"Daily compliance check failed: {e}", exc_info=True)
243 metrics.failed_calls.add(1, {"operation": "daily_compliance_check"})
245 await self._send_compliance_alert(
246 severity="critical",
247 message=f"Daily compliance check failed: {e!s}",
248 details={"error": str(e)},
249 )
251 return {"error": str(e), "status": "failed"}
253 async def _run_weekly_access_review(self) -> AccessReviewReport:
254 """
255 Run weekly access review
257 Reviews all user access, identifies inactive accounts, and generates
258 access review report for security team.
259 """
260 with tracer.start_as_current_span("compliance.weekly_access_review") as span:
261 start_time = datetime.now(UTC)
262 end_time = start_time
263 start_of_period = start_time - timedelta(days=7)
265 try:
266 logger.info("Starting weekly access review")
268 users_reviewed = [] # type: ignore[var-annotated]
269 recommendations = []
270 actions_required = []
272 # Query user provider for all users
273 # Note: Requires UserProvider injected via constructor
274 # For production: self.user_provider = user_provider_factory()
275 # Then: users = await self.user_provider.list_users()
276 total_users = 0
277 active_users = 0
278 inactive_users = 0
280 # Analyze session store for active sessions
281 if self.session_store: 281 ↛ 306line 281 didn't jump to line 306 because the condition on line 281 was always true
282 # Implement user session analysis
283 # Note: Requires session pattern analysis implementation
284 # Production tasks:
285 # - Get all users with sessions
286 # - Check last login time from session metadata
287 # - Identify inactive accounts (no login > 90 days)
288 # - Detect concurrent sessions
289 # - Flag suspicious activity patterns
290 pass
292 # Example review item
293 # users_reviewed.append(
294 # AccessReviewItem(
295 # user_id="user:alice",
296 # username="alice",
297 # roles=["admin", "user"],
298 # active_sessions=2,
299 # last_login=start_time.isoformat().replace("+00:00", "Z"),
300 # account_status="active",
301 # review_status="pending",
302 # )
303 # )
305 # Generate recommendations
306 if inactive_users > 0: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true
307 recommendations.append(f"Review {inactive_users} inactive user accounts (no login > 90 days)")
308 actions_required.append("Disable or delete inactive user accounts")
310 # Create report
311 report = AccessReviewReport(
312 review_id=f"access_review_{start_time.strftime('%Y%m%d')}",
313 generated_at=start_time.isoformat().replace("+00:00", "Z"),
314 period_start=start_of_period.isoformat().replace("+00:00", "Z"),
315 period_end=end_time.isoformat().replace("+00:00", "Z"),
316 total_users=total_users,
317 active_users=active_users,
318 inactive_users=inactive_users,
319 users_reviewed=users_reviewed,
320 recommendations=recommendations,
321 actions_required=actions_required,
322 )
324 # Save report
325 report_file = self.evidence_collector.evidence_dir / f"{report.review_id}.json"
326 with open(report_file, "w") as f:
327 f.write(report.model_dump_json(indent=2))
329 logger.info(
330 "Weekly access review completed",
331 extra={
332 "review_id": report.review_id,
333 "total_users": total_users,
334 "inactive_users": inactive_users,
335 },
336 )
338 # Send notification
339 await self._send_access_review_notification(report)
341 metrics.successful_calls.add(1, {"operation": "weekly_access_review"})
343 span.set_attribute("total_users", total_users)
344 span.set_attribute("inactive_users", inactive_users)
346 return report
348 except Exception as e:
349 logger.error(f"Weekly access review failed: {e}", exc_info=True)
350 metrics.failed_calls.add(1, {"operation": "weekly_access_review"})
352 await self._send_compliance_alert(
353 severity="high",
354 message=f"Weekly access review failed: {e!s}",
355 details={"error": str(e)},
356 )
358 raise
360 async def _run_monthly_compliance_report(self) -> dict[str, Any]:
361 """
362 Run monthly compliance report
364 Generates comprehensive SOC 2 compliance report for the month.
365 """
366 with tracer.start_as_current_span("compliance.monthly_report") as span:
367 try:
368 logger.info("Starting monthly compliance report generation")
370 # Generate comprehensive report
371 report = await self.evidence_collector.generate_compliance_report(report_type="monthly", period_days=30)
373 summary = {
374 "report_id": report.report_id,
375 "compliance_score": f"{report.compliance_score:.1f}%",
376 "total_controls": report.total_controls,
377 "passed_controls": report.passed_controls,
378 "failed_controls": report.failed_controls,
379 "total_findings": report.summary.get("findings_summary", {}).get("total_findings", 0),
380 }
382 logger.info(
383 "Monthly compliance report generated",
384 extra=summary,
385 )
387 # Send report to compliance team
388 await self._send_monthly_report_notification(report)
390 metrics.successful_calls.add(1, {"operation": "monthly_compliance_report"})
392 span.set_attribute("compliance_score", report.compliance_score)
393 span.set_attribute("total_controls", report.total_controls)
395 return summary
397 except Exception as e:
398 logger.error(f"Monthly compliance report failed: {e}", exc_info=True)
399 metrics.failed_calls.add(1, {"operation": "monthly_compliance_report"})
401 await self._send_compliance_alert(
402 severity="critical",
403 message=f"Monthly compliance report failed: {e!s}",
404 details={"error": str(e)},
405 )
407 return {"error": str(e), "status": "failed"}
409 # --- Notification Helpers ---
411 async def _send_compliance_alert(self, severity: str, message: str, details: dict[str, Any]) -> None:
412 """
413 Send compliance alert
415 Args:
416 severity: Alert severity (low, medium, high, critical)
417 message: Alert message
418 details: Alert details
419 """
420 logger.warning(
421 f"Compliance Alert [{severity.upper()}]: {message}",
422 extra={"severity": severity, "details": details},
423 )
425 # Send to alerting system (PagerDuty, Slack, email)
426 try:
427 alerting_service = AlertingService()
428 await alerting_service.initialize()
430 alert_severity = AlertSeverity.CRITICAL if severity == "critical" else AlertSeverity.WARNING # type: ignore[attr-defined]
432 alert = Alert(
433 title=f"Compliance {severity.upper()}: {message}",
434 description=message,
435 severity=alert_severity,
436 category=AlertCategory.COMPLIANCE,
437 source="compliance_scheduler",
438 metadata=details,
439 )
441 await alerting_service.send_alert(alert)
442 logger.info("Compliance alert sent successfully", extra={"alert_id": alert.alert_id})
444 except Exception as e:
445 logger.error(f"Failed to send compliance alert: {e}", exc_info=True)
447 async def _send_access_review_notification(self, report: AccessReviewReport) -> None:
448 """
449 Send access review notification
451 Args:
452 report: Access review report
453 """
454 logger.info(
455 "Sending access review notification",
456 extra={"review_id": report.review_id},
457 )
459 # Send notification to security team
460 try:
461 alerting_service = AlertingService()
462 await alerting_service.initialize()
464 alert = Alert(
465 title="Weekly Access Review Ready",
466 description=f"Access review {report.review_id} has been generated",
467 severity=AlertSeverity.INFO,
468 category=AlertCategory.COMPLIANCE,
469 source="compliance_scheduler",
470 metadata={
471 "review_id": report.review_id,
472 "total_users": report.total_users,
473 "inactive_users": report.inactive_users,
474 "excessive_access": report.excessive_access, # type: ignore[attr-defined]
475 },
476 )
478 await alerting_service.send_alert(alert)
479 logger.info("Access review notification sent", extra={"alert_id": alert.alert_id})
481 except Exception as e:
482 logger.error(f"Failed to send access review notification: {e}", exc_info=True)
484 async def _send_monthly_report_notification(self, report: Any) -> None:
485 """
486 Send monthly compliance report notification
488 Args:
489 report: Compliance report
490 """
491 logger.info(
492 "Sending monthly compliance report notification",
493 extra={"report_id": report.report_id},
494 )
496 # Send notification to compliance team
497 try:
498 alerting_service = AlertingService()
499 await alerting_service.initialize()
501 alert = Alert(
502 title="Monthly SOC 2 Compliance Report Ready",
503 description=f"Compliance report {report.report_id} has been generated",
504 severity=AlertSeverity.INFO,
505 category=AlertCategory.COMPLIANCE,
506 source="compliance_scheduler",
507 metadata={
508 "report_id": report.report_id,
509 "generated_at": report.generated_at,
510 "period_start": report.period_start,
511 "period_end": report.period_end,
512 },
513 )
515 await alerting_service.send_alert(alert)
516 logger.info("Monthly report notification sent", extra={"alert_id": alert.alert_id})
518 except Exception as e:
519 logger.error(f"Failed to send monthly report notification: {e}", exc_info=True)
522# Global scheduler instance
523_compliance_scheduler: ComplianceScheduler | None = None
526async def start_compliance_scheduler(
527 session_store: SessionStore | None = None,
528 evidence_dir: Path | None = None,
529 enabled: bool = True,
530) -> ComplianceScheduler:
531 """
532 Start global compliance scheduler
534 Args:
535 session_store: Session storage backend
536 evidence_dir: Directory for evidence files
537 enabled: Enable/disable scheduler
539 Returns:
540 ComplianceScheduler instance
541 """
542 global _compliance_scheduler
544 if _compliance_scheduler is None:
545 evidence_collector = EvidenceCollector(session_store=session_store, evidence_dir=evidence_dir)
547 _compliance_scheduler = ComplianceScheduler(
548 evidence_collector=evidence_collector,
549 session_store=session_store,
550 evidence_dir=evidence_dir,
551 enabled=enabled,
552 )
554 await _compliance_scheduler.start()
555 return _compliance_scheduler
558async def stop_compliance_scheduler() -> None:
559 """Stop global compliance scheduler"""
560 global _compliance_scheduler
562 if _compliance_scheduler is not None:
563 await _compliance_scheduler.stop()
564 _compliance_scheduler = None
567def get_compliance_scheduler() -> ComplianceScheduler | None:
568 """
569 Get global compliance scheduler instance
571 Returns:
572 ComplianceScheduler instance or None
573 """
574 return _compliance_scheduler