Coverage for src / mcp_server_langgraph / schedulers / compliance.py: 80%

180 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2SOC 2 Compliance Scheduler 

3 

4Automated scheduling for: 

5- Daily compliance checks 

6- Weekly access reviews 

7- Monthly compliance reports 

8 

9Uses APScheduler for cron-based job scheduling. 

10""" 

11 

12import asyncio 

13from datetime import datetime, timedelta, UTC 

14from pathlib import Path 

15from typing import Any 

16 

17from apscheduler.schedulers.asyncio import AsyncIOScheduler 

18from apscheduler.triggers.cron import CronTrigger 

19from pydantic import BaseModel, Field 

20 

21from mcp_server_langgraph.auth.session import SessionStore 

22from mcp_server_langgraph.compliance.soc2.evidence import EvidenceCollector 

23from mcp_server_langgraph.integrations.alerting import Alert, AlertCategory, AlertingService, AlertSeverity 

24from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer 

25 

26 

27class AccessReviewItem(BaseModel): 

28 """Access review item for a user""" 

29 

30 user_id: str 

31 username: str 

32 roles: list[str] 

33 active_sessions: int 

34 last_login: str | None = None 

35 account_status: str = Field(..., description="active, inactive, locked") 

36 review_status: str = Field(default="pending", description="approved, revoked, pending") 

37 review_notes: str = Field(default="", description="Review notes") 

38 

39 

40class AccessReviewReport(BaseModel): 

41 """Weekly access review report""" 

42 

43 review_id: str 

44 generated_at: str 

45 period_start: str 

46 period_end: str 

47 total_users: int 

48 active_users: int 

49 inactive_users: int 

50 users_reviewed: list[AccessReviewItem] = Field(default_factory=list) 

51 recommendations: list[str] = Field(default_factory=list) 

52 actions_required: list[str] = Field(default_factory=list) 

53 

54 

55class ComplianceScheduler: 

56 """ 

57 SOC 2 compliance automation scheduler 

58 

59 Schedules: 

60 - Daily compliance checks (6 AM UTC) 

61 - Weekly access reviews (Monday 9 AM UTC) 

62 - Monthly compliance reports (1st day of month, 9 AM UTC) 

63 """ 

64 

65 def __init__( 

66 self, 

67 evidence_collector: EvidenceCollector | None = None, 

68 session_store: SessionStore | None = None, 

69 evidence_dir: Path | None = None, 

70 enabled: bool = True, 

71 ): 

72 """ 

73 Initialize compliance scheduler 

74 

75 Args: 

76 evidence_collector: Evidence collector service 

77 session_store: Session storage backend 

78 evidence_dir: Directory for evidence files 

79 enabled: Enable/disable scheduler 

80 """ 

81 self.evidence_collector = evidence_collector or EvidenceCollector( 

82 session_store=session_store, 

83 evidence_dir=evidence_dir, 

84 ) 

85 self.session_store = session_store 

86 self.enabled = enabled 

87 

88 self.scheduler = AsyncIOScheduler() 

89 

90 logger.info(f"Compliance scheduler initialized (enabled: {enabled})") 

91 

92 async def start(self) -> None: 

93 """Start the compliance scheduler""" 

94 if not self.enabled: 94 ↛ 95line 94 didn't jump to line 95 because the condition on line 94 was never true

95 logger.info("Compliance scheduler disabled, skipping startup") 

96 return 

97 

98 with tracer.start_as_current_span("compliance_scheduler.start"): 

99 # Daily compliance checks (6 AM UTC) 

100 self.scheduler.add_job( 

101 self._run_daily_compliance_check, 

102 trigger=CronTrigger(hour=6, minute=0, timezone="UTC"), 

103 id="daily_compliance_check", 

104 max_instances=1, 

105 replace_existing=True, 

106 ) 

107 

108 # Weekly access reviews (Monday 9 AM UTC) 

109 self.scheduler.add_job( 

110 self._run_weekly_access_review, 

111 trigger=CronTrigger(day_of_week="mon", hour=9, minute=0, timezone="UTC"), 

112 id="weekly_access_review", 

113 max_instances=1, 

114 replace_existing=True, 

115 ) 

116 

117 # Monthly compliance reports (1st day of month, 9 AM UTC) 

118 self.scheduler.add_job( 

119 self._run_monthly_compliance_report, 

120 trigger=CronTrigger(day=1, hour=9, minute=0, timezone="UTC"), 

121 id="monthly_compliance_report", 

122 max_instances=1, 

123 replace_existing=True, 

124 ) 

125 

126 self.scheduler.start() 

127 

128 logger.info( 

129 "Compliance scheduler started", 

130 extra={ 

131 "jobs": [ 

132 "daily_compliance_check", 

133 "weekly_access_review", 

134 "monthly_compliance_report", 

135 ] 

136 }, 

137 ) 

138 

139 async def stop(self) -> None: 

140 """Stop the compliance scheduler""" 

141 if self.scheduler.running: 141 ↛ exitline 141 didn't return from function 'stop' because the condition on line 141 was always true

142 # Shutdown the scheduler. Use wait=False since we're in async context 

143 # The scheduler.running flag will be set to False immediately 

144 self.scheduler.shutdown(wait=False) 

145 

146 # Give the scheduler a moment to update its state 

147 await asyncio.sleep(0.1) 

148 

149 logger.info("Compliance scheduler stopped") 

150 

151 async def trigger_daily_check(self) -> dict[str, Any]: 

152 """ 

153 Manually trigger daily compliance check 

154 

155 Returns: 

156 Report summary 

157 """ 

158 return await self._run_daily_compliance_check() 

159 

160 async def trigger_weekly_review(self) -> AccessReviewReport: 

161 """ 

162 Manually trigger weekly access review 

163 

164 Returns: 

165 Access review report 

166 """ 

167 return await self._run_weekly_access_review() 

168 

169 async def trigger_monthly_report(self) -> dict[str, Any]: 

170 """ 

171 Manually trigger monthly compliance report 

172 

173 Returns: 

174 Compliance report summary 

175 """ 

176 return await self._run_monthly_compliance_report() 

177 

178 # --- Scheduled Jobs --- 

179 

180 async def _run_daily_compliance_check(self) -> dict[str, Any]: 

181 """ 

182 Run daily compliance check 

183 

184 Collects evidence for all SOC 2 controls and generates summary. 

185 """ 

186 with tracer.start_as_current_span("compliance.daily_check") as span: 

187 start_time = datetime.now(UTC) 

188 

189 try: 

190 logger.info("Starting daily compliance check") 

191 

192 # Collect all evidence 

193 evidence_items = await self.evidence_collector.collect_all_evidence() 

194 

195 # Calculate pass/fail rates 

196 passed = sum(1 for e in evidence_items if e.status.value == "success") 

197 failed = sum(1 for e in evidence_items if e.status.value == "failure") 

198 partial = sum(1 for e in evidence_items if e.status.value == "partial") 

199 total = len(evidence_items) 

200 

201 compliance_score = (passed + (partial * 0.5)) / total * 100 if total > 0 else 0 

202 

203 # Collect all findings 

204 findings = [] 

205 for evidence in evidence_items: 

206 findings.extend(evidence.findings) 

207 

208 summary = { 

209 "date": start_time.strftime("%Y-%m-%d"), 

210 "evidence_collected": total, 

211 "passed_controls": passed, 

212 "failed_controls": failed, 

213 "partial_controls": partial, 

214 "compliance_score": f"{compliance_score:.1f}%", 

215 "total_findings": len(findings), 

216 "findings": findings, 

217 } 

218 

219 # Log summary 

220 logger.info( 

221 "Daily compliance check completed", 

222 extra=summary, 

223 ) 

224 

225 # Track metrics 

226 metrics.successful_calls.add(1, {"operation": "daily_compliance_check"}) 

227 

228 # Alert if compliance score is low 

229 if compliance_score < 80: 229 ↛ 230line 229 didn't jump to line 230 because the condition on line 229 was never true

230 await self._send_compliance_alert( 

231 severity="high", 

232 message=f"Daily compliance score below threshold: {compliance_score:.1f}%", 

233 details=summary, 

234 ) 

235 

236 span.set_attribute("compliance_score", compliance_score) 

237 span.set_attribute("total_findings", len(findings)) 

238 

239 return summary 

240 

241 except Exception as e: 

242 logger.error(f"Daily compliance check failed: {e}", exc_info=True) 

243 metrics.failed_calls.add(1, {"operation": "daily_compliance_check"}) 

244 

245 await self._send_compliance_alert( 

246 severity="critical", 

247 message=f"Daily compliance check failed: {e!s}", 

248 details={"error": str(e)}, 

249 ) 

250 

251 return {"error": str(e), "status": "failed"} 

252 

253 async def _run_weekly_access_review(self) -> AccessReviewReport: 

254 """ 

255 Run weekly access review 

256 

257 Reviews all user access, identifies inactive accounts, and generates 

258 access review report for security team. 

259 """ 

260 with tracer.start_as_current_span("compliance.weekly_access_review") as span: 

261 start_time = datetime.now(UTC) 

262 end_time = start_time 

263 start_of_period = start_time - timedelta(days=7) 

264 

265 try: 

266 logger.info("Starting weekly access review") 

267 

268 users_reviewed = [] # type: ignore[var-annotated] 

269 recommendations = [] 

270 actions_required = [] 

271 

272 # Query user provider for all users 

273 # Note: Requires UserProvider injected via constructor 

274 # For production: self.user_provider = user_provider_factory() 

275 # Then: users = await self.user_provider.list_users() 

276 total_users = 0 

277 active_users = 0 

278 inactive_users = 0 

279 

280 # Analyze session store for active sessions 

281 if self.session_store: 281 ↛ 306line 281 didn't jump to line 306 because the condition on line 281 was always true

282 # Implement user session analysis 

283 # Note: Requires session pattern analysis implementation 

284 # Production tasks: 

285 # - Get all users with sessions 

286 # - Check last login time from session metadata 

287 # - Identify inactive accounts (no login > 90 days) 

288 # - Detect concurrent sessions 

289 # - Flag suspicious activity patterns 

290 pass 

291 

292 # Example review item 

293 # users_reviewed.append( 

294 # AccessReviewItem( 

295 # user_id="user:alice", 

296 # username="alice", 

297 # roles=["admin", "user"], 

298 # active_sessions=2, 

299 # last_login=start_time.isoformat().replace("+00:00", "Z"), 

300 # account_status="active", 

301 # review_status="pending", 

302 # ) 

303 # ) 

304 

305 # Generate recommendations 

306 if inactive_users > 0: 306 ↛ 307line 306 didn't jump to line 307 because the condition on line 306 was never true

307 recommendations.append(f"Review {inactive_users} inactive user accounts (no login > 90 days)") 

308 actions_required.append("Disable or delete inactive user accounts") 

309 

310 # Create report 

311 report = AccessReviewReport( 

312 review_id=f"access_review_{start_time.strftime('%Y%m%d')}", 

313 generated_at=start_time.isoformat().replace("+00:00", "Z"), 

314 period_start=start_of_period.isoformat().replace("+00:00", "Z"), 

315 period_end=end_time.isoformat().replace("+00:00", "Z"), 

316 total_users=total_users, 

317 active_users=active_users, 

318 inactive_users=inactive_users, 

319 users_reviewed=users_reviewed, 

320 recommendations=recommendations, 

321 actions_required=actions_required, 

322 ) 

323 

324 # Save report 

325 report_file = self.evidence_collector.evidence_dir / f"{report.review_id}.json" 

326 with open(report_file, "w") as f: 

327 f.write(report.model_dump_json(indent=2)) 

328 

329 logger.info( 

330 "Weekly access review completed", 

331 extra={ 

332 "review_id": report.review_id, 

333 "total_users": total_users, 

334 "inactive_users": inactive_users, 

335 }, 

336 ) 

337 

338 # Send notification 

339 await self._send_access_review_notification(report) 

340 

341 metrics.successful_calls.add(1, {"operation": "weekly_access_review"}) 

342 

343 span.set_attribute("total_users", total_users) 

344 span.set_attribute("inactive_users", inactive_users) 

345 

346 return report 

347 

348 except Exception as e: 

349 logger.error(f"Weekly access review failed: {e}", exc_info=True) 

350 metrics.failed_calls.add(1, {"operation": "weekly_access_review"}) 

351 

352 await self._send_compliance_alert( 

353 severity="high", 

354 message=f"Weekly access review failed: {e!s}", 

355 details={"error": str(e)}, 

356 ) 

357 

358 raise 

359 

360 async def _run_monthly_compliance_report(self) -> dict[str, Any]: 

361 """ 

362 Run monthly compliance report 

363 

364 Generates comprehensive SOC 2 compliance report for the month. 

365 """ 

366 with tracer.start_as_current_span("compliance.monthly_report") as span: 

367 try: 

368 logger.info("Starting monthly compliance report generation") 

369 

370 # Generate comprehensive report 

371 report = await self.evidence_collector.generate_compliance_report(report_type="monthly", period_days=30) 

372 

373 summary = { 

374 "report_id": report.report_id, 

375 "compliance_score": f"{report.compliance_score:.1f}%", 

376 "total_controls": report.total_controls, 

377 "passed_controls": report.passed_controls, 

378 "failed_controls": report.failed_controls, 

379 "total_findings": report.summary.get("findings_summary", {}).get("total_findings", 0), 

380 } 

381 

382 logger.info( 

383 "Monthly compliance report generated", 

384 extra=summary, 

385 ) 

386 

387 # Send report to compliance team 

388 await self._send_monthly_report_notification(report) 

389 

390 metrics.successful_calls.add(1, {"operation": "monthly_compliance_report"}) 

391 

392 span.set_attribute("compliance_score", report.compliance_score) 

393 span.set_attribute("total_controls", report.total_controls) 

394 

395 return summary 

396 

397 except Exception as e: 

398 logger.error(f"Monthly compliance report failed: {e}", exc_info=True) 

399 metrics.failed_calls.add(1, {"operation": "monthly_compliance_report"}) 

400 

401 await self._send_compliance_alert( 

402 severity="critical", 

403 message=f"Monthly compliance report failed: {e!s}", 

404 details={"error": str(e)}, 

405 ) 

406 

407 return {"error": str(e), "status": "failed"} 

408 

409 # --- Notification Helpers --- 

410 

411 async def _send_compliance_alert(self, severity: str, message: str, details: dict[str, Any]) -> None: 

412 """ 

413 Send compliance alert 

414 

415 Args: 

416 severity: Alert severity (low, medium, high, critical) 

417 message: Alert message 

418 details: Alert details 

419 """ 

420 logger.warning( 

421 f"Compliance Alert [{severity.upper()}]: {message}", 

422 extra={"severity": severity, "details": details}, 

423 ) 

424 

425 # Send to alerting system (PagerDuty, Slack, email) 

426 try: 

427 alerting_service = AlertingService() 

428 await alerting_service.initialize() 

429 

430 alert_severity = AlertSeverity.CRITICAL if severity == "critical" else AlertSeverity.WARNING # type: ignore[attr-defined] 

431 

432 alert = Alert( 

433 title=f"Compliance {severity.upper()}: {message}", 

434 description=message, 

435 severity=alert_severity, 

436 category=AlertCategory.COMPLIANCE, 

437 source="compliance_scheduler", 

438 metadata=details, 

439 ) 

440 

441 await alerting_service.send_alert(alert) 

442 logger.info("Compliance alert sent successfully", extra={"alert_id": alert.alert_id}) 

443 

444 except Exception as e: 

445 logger.error(f"Failed to send compliance alert: {e}", exc_info=True) 

446 

447 async def _send_access_review_notification(self, report: AccessReviewReport) -> None: 

448 """ 

449 Send access review notification 

450 

451 Args: 

452 report: Access review report 

453 """ 

454 logger.info( 

455 "Sending access review notification", 

456 extra={"review_id": report.review_id}, 

457 ) 

458 

459 # Send notification to security team 

460 try: 

461 alerting_service = AlertingService() 

462 await alerting_service.initialize() 

463 

464 alert = Alert( 

465 title="Weekly Access Review Ready", 

466 description=f"Access review {report.review_id} has been generated", 

467 severity=AlertSeverity.INFO, 

468 category=AlertCategory.COMPLIANCE, 

469 source="compliance_scheduler", 

470 metadata={ 

471 "review_id": report.review_id, 

472 "total_users": report.total_users, 

473 "inactive_users": report.inactive_users, 

474 "excessive_access": report.excessive_access, # type: ignore[attr-defined] 

475 }, 

476 ) 

477 

478 await alerting_service.send_alert(alert) 

479 logger.info("Access review notification sent", extra={"alert_id": alert.alert_id}) 

480 

481 except Exception as e: 

482 logger.error(f"Failed to send access review notification: {e}", exc_info=True) 

483 

484 async def _send_monthly_report_notification(self, report: Any) -> None: 

485 """ 

486 Send monthly compliance report notification 

487 

488 Args: 

489 report: Compliance report 

490 """ 

491 logger.info( 

492 "Sending monthly compliance report notification", 

493 extra={"report_id": report.report_id}, 

494 ) 

495 

496 # Send notification to compliance team 

497 try: 

498 alerting_service = AlertingService() 

499 await alerting_service.initialize() 

500 

501 alert = Alert( 

502 title="Monthly SOC 2 Compliance Report Ready", 

503 description=f"Compliance report {report.report_id} has been generated", 

504 severity=AlertSeverity.INFO, 

505 category=AlertCategory.COMPLIANCE, 

506 source="compliance_scheduler", 

507 metadata={ 

508 "report_id": report.report_id, 

509 "generated_at": report.generated_at, 

510 "period_start": report.period_start, 

511 "period_end": report.period_end, 

512 }, 

513 ) 

514 

515 await alerting_service.send_alert(alert) 

516 logger.info("Monthly report notification sent", extra={"alert_id": alert.alert_id}) 

517 

518 except Exception as e: 

519 logger.error(f"Failed to send monthly report notification: {e}", exc_info=True) 

520 

521 

522# Global scheduler instance 

523_compliance_scheduler: ComplianceScheduler | None = None 

524 

525 

526async def start_compliance_scheduler( 

527 session_store: SessionStore | None = None, 

528 evidence_dir: Path | None = None, 

529 enabled: bool = True, 

530) -> ComplianceScheduler: 

531 """ 

532 Start global compliance scheduler 

533 

534 Args: 

535 session_store: Session storage backend 

536 evidence_dir: Directory for evidence files 

537 enabled: Enable/disable scheduler 

538 

539 Returns: 

540 ComplianceScheduler instance 

541 """ 

542 global _compliance_scheduler 

543 

544 if _compliance_scheduler is None: 

545 evidence_collector = EvidenceCollector(session_store=session_store, evidence_dir=evidence_dir) 

546 

547 _compliance_scheduler = ComplianceScheduler( 

548 evidence_collector=evidence_collector, 

549 session_store=session_store, 

550 evidence_dir=evidence_dir, 

551 enabled=enabled, 

552 ) 

553 

554 await _compliance_scheduler.start() 

555 return _compliance_scheduler 

556 

557 

558async def stop_compliance_scheduler() -> None: 

559 """Stop global compliance scheduler""" 

560 global _compliance_scheduler 

561 

562 if _compliance_scheduler is not None: 

563 await _compliance_scheduler.stop() 

564 _compliance_scheduler = None 

565 

566 

567def get_compliance_scheduler() -> ComplianceScheduler | None: 

568 """ 

569 Get global compliance scheduler instance 

570 

571 Returns: 

572 ComplianceScheduler instance or None 

573 """ 

574 return _compliance_scheduler