Coverage for src / mcp_server_langgraph / compliance / retention.py: 68%

170 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Data Retention Service - GDPR Article 5(1)(e) Storage Limitation 

3 

4Implements automated data retention policies with configurable cleanup schedules. 

5Ensures compliance with GDPR data minimization and SOC 2 storage requirements. 

6""" 

7 

8from datetime import datetime, timedelta, UTC 

9from pathlib import Path 

10from typing import Any 

11 

12import yaml 

13from pydantic import BaseModel, Field 

14 

15from mcp_server_langgraph.auth.session import SessionStore 

16from mcp_server_langgraph.compliance.gdpr.storage import AuditLogStore, ConversationStore 

17from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer 

18 

19 

20class RetentionPolicy(BaseModel): 

21 """Data retention policy configuration""" 

22 

23 data_type: str = Field(..., description="Type of data (sessions, conversations, etc.)") 

24 retention_days: int = Field(..., description="Number of days to retain data") 

25 enabled: bool = Field(default=True, description="Whether policy is enabled") 

26 soft_delete: bool = Field(default=False, description="Soft delete before permanent deletion") 

27 notify_before: bool = Field(default=False, description="Notify before deletion") 

28 

29 

30class RetentionResult(BaseModel): 

31 """Result of retention policy execution""" 

32 

33 policy_name: str 

34 execution_timestamp: str 

35 deleted_count: int = 0 

36 archived_count: int = 0 

37 errors: list[str] = Field(default_factory=list) 

38 dry_run: bool = False 

39 

40 

41class DataRetentionService: 

42 """ 

43 Service for enforcing data retention policies 

44 

45 Features: 

46 - Configurable retention periods per data type 

47 - Scheduled cleanup execution 

48 - Dry-run mode for testing 

49 - Audit logging of all deletions 

50 - Metrics tracking 

51 """ 

52 

53 def __init__( 

54 self, 

55 config_path: str = "config/retention_policies.yaml", 

56 session_store: SessionStore | None = None, 

57 conversation_store: ConversationStore | None = None, 

58 audit_log_store: AuditLogStore | None = None, 

59 dry_run: bool = False, 

60 ): 

61 """ 

62 Initialize data retention service 

63 

64 Args: 

65 config_path: Path to retention policies YAML file 

66 session_store: Session storage backend 

67 conversation_store: Conversation storage backend 

68 audit_log_store: Audit log storage backend 

69 dry_run: If True, only simulate deletions without actually deleting 

70 """ 

71 self.config_path = Path(config_path) 

72 self.session_store = session_store 

73 self.conversation_store = conversation_store 

74 self.audit_log_store = audit_log_store 

75 self.dry_run = dry_run 

76 self.config = self._load_config() 

77 

78 def _load_config(self) -> dict[str, Any]: 

79 """Load retention policies from YAML configuration""" 

80 try: 

81 if not self.config_path.exists(): 

82 logger.warning(f"Retention config not found: {self.config_path}, using defaults") 

83 return self._default_config() 

84 

85 with open(self.config_path) as f: 

86 config = yaml.safe_load(f) 

87 

88 logger.info(f"Loaded retention policies from {self.config_path}") 

89 return config # type: ignore[no-any-return] 

90 

91 except Exception as e: 

92 logger.error(f"Failed to load retention config: {e}", exc_info=True) 

93 return self._default_config() 

94 

95 def _default_config(self) -> dict[str, Any]: 

96 """Default retention configuration""" 

97 return { 

98 "global": {"enabled": True, "dry_run": False}, 

99 "retention_periods": { 

100 "user_sessions": {"inactive": 90}, 

101 "conversations": {"archived": 90}, 

102 "audit_logs": {"standard": 2555}, 

103 }, 

104 } 

105 

106 async def cleanup_sessions(self) -> RetentionResult: 

107 """ 

108 Clean up old sessions based on retention policy 

109 

110 Returns: 

111 RetentionResult with deletion counts 

112 """ 

113 with tracer.start_as_current_span("retention.cleanup_sessions") as span: 

114 policy_config = self.config.get("retention_periods", {}).get("user_sessions", {}) 

115 retention_days = policy_config.get("inactive", 90) 

116 

117 result = RetentionResult( 

118 policy_name="user_sessions", 

119 execution_timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

120 dry_run=self.dry_run, 

121 ) 

122 

123 if not self.session_store: 

124 logger.warning("Session store not available for retention cleanup") 

125 result.errors.append("Session store not configured") 

126 return result 

127 

128 try: 

129 cutoff_date = datetime.now(UTC) - timedelta(days=retention_days) 

130 logger.info( 

131 f"Cleaning up sessions inactive since {cutoff_date.isoformat()}", 

132 extra={"retention_days": retention_days, "dry_run": self.dry_run}, 

133 ) 

134 

135 # Get all sessions (implementation depends on SessionStore interface) 

136 # For now, this is a placeholder - actual implementation would query sessions 

137 deleted_count = await self._cleanup_inactive_sessions(cutoff_date) 

138 

139 result.deleted_count = deleted_count 

140 span.set_attribute("deleted_count", deleted_count) 

141 

142 # Track metrics 

143 if not self.dry_run: 

144 metrics.successful_calls.add(1, {"operation": "session_cleanup"}) 

145 

146 logger.info( 

147 "Session cleanup completed", 

148 extra={ 

149 "deleted_count": deleted_count, 

150 "retention_days": retention_days, 

151 "dry_run": self.dry_run, 

152 }, 

153 ) 

154 

155 except Exception as e: 

156 error_msg = f"Session cleanup failed: {e!s}" 

157 result.errors.append(error_msg) 

158 logger.error(error_msg, exc_info=True) 

159 span.record_exception(e) 

160 

161 return result 

162 

163 async def cleanup_conversations(self) -> RetentionResult: 

164 """ 

165 Clean up old conversations based on retention policy 

166 

167 Returns: 

168 RetentionResult with deletion counts 

169 """ 

170 with tracer.start_as_current_span("retention.cleanup_conversations"): 

171 policy_config = self.config.get("retention_periods", {}).get("conversations", {}) 

172 archived_retention = policy_config.get("archived", 90) 

173 

174 result = RetentionResult( 

175 policy_name="conversations", 

176 execution_timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

177 dry_run=self.dry_run, 

178 ) 

179 

180 try: 

181 cutoff_date = datetime.now(UTC) - timedelta(days=archived_retention) 

182 logger.info( 

183 f"Cleaning up archived conversations older than {cutoff_date.isoformat()}", 

184 extra={"retention_days": archived_retention, "dry_run": self.dry_run}, 

185 ) 

186 

187 # Call internal cleanup method 

188 deleted_count = await self._cleanup_old_conversations(cutoff_date) 

189 result.deleted_count = deleted_count 

190 

191 logger.info( 

192 "Conversation cleanup completed", 

193 extra={"deleted_count": deleted_count, "dry_run": self.dry_run}, 

194 ) 

195 

196 except Exception as e: 

197 error_msg = f"Conversation cleanup failed: {e!s}" 

198 result.errors.append(error_msg) 

199 logger.error(error_msg, exc_info=True) 

200 

201 return result 

202 

203 async def cleanup_audit_logs(self) -> RetentionResult: 

204 """ 

205 Clean up old audit logs based on retention policy 

206 

207 Note: Most audit logs are retained for 7 years (2555 days) for compliance. 

208 

209 Returns: 

210 RetentionResult with deletion/archive counts 

211 """ 

212 with tracer.start_as_current_span("retention.cleanup_audit_logs"): 

213 policy_config = self.config.get("retention_periods", {}).get("audit_logs", {}) 

214 retention_days = policy_config.get("standard", 2555) 

215 

216 result = RetentionResult( 

217 policy_name="audit_logs", 

218 execution_timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

219 dry_run=self.dry_run, 

220 ) 

221 

222 try: 

223 cutoff_date = datetime.now(UTC) - timedelta(days=retention_days) 

224 logger.info( 

225 f"Archiving audit logs older than {cutoff_date.isoformat()}", 

226 extra={"retention_days": retention_days, "dry_run": self.dry_run}, 

227 ) 

228 

229 # Call internal cleanup method 

230 archived_count = await self._cleanup_old_audit_logs(cutoff_date) 

231 result.archived_count = archived_count 

232 

233 logger.info( 

234 "Audit log cleanup completed", 

235 extra={"archived_count": archived_count, "dry_run": self.dry_run}, 

236 ) 

237 

238 except Exception as e: 

239 error_msg = f"Audit log cleanup failed: {e!s}" 

240 result.errors.append(error_msg) 

241 logger.error(error_msg, exc_info=True) 

242 

243 return result 

244 

245 async def run_all_cleanups(self) -> list[RetentionResult]: 

246 """ 

247 Run all configured retention policies 

248 

249 Returns: 

250 List of RetentionResults for each policy 

251 """ 

252 with tracer.start_as_current_span("retention.run_all_cleanups") as span: 

253 logger.info("Starting retention policy cleanup", extra={"dry_run": self.dry_run}) 

254 

255 results = [] # type: ignore[var-annotated] 

256 

257 # Check if retention is globally enabled 

258 if not self.config.get("global", {}).get("enabled", True): 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true

259 logger.warning("Data retention globally disabled in configuration") 

260 return results 

261 

262 # Run session cleanup 

263 if self.config.get("cleanup_actions", {}).get("sessions", {}).get("enabled", True): 263 ↛ 268line 263 didn't jump to line 268 because the condition on line 263 was always true

264 result = await self.cleanup_sessions() 

265 results.append(result) 

266 

267 # Run conversation cleanup 

268 if self.config.get("cleanup_actions", {}).get("conversations", {}).get("enabled", True): 268 ↛ 273line 268 didn't jump to line 273 because the condition on line 268 was always true

269 result = await self.cleanup_conversations() 

270 results.append(result) 

271 

272 # Run audit log cleanup 

273 if self.config.get("cleanup_actions", {}).get("audit_logs", {}).get("enabled", True): 273 ↛ 278line 273 didn't jump to line 278 because the condition on line 273 was always true

274 result = await self.cleanup_audit_logs() 

275 results.append(result) 

276 

277 # Calculate totals 

278 total_deleted = sum(r.deleted_count for r in results) 

279 total_archived = sum(r.archived_count for r in results) 

280 total_errors = sum(len(r.errors) for r in results) 

281 

282 span.set_attribute("total_deleted", total_deleted) 

283 span.set_attribute("total_archived", total_archived) 

284 span.set_attribute("total_errors", total_errors) 

285 

286 logger.info( 

287 "Retention policy cleanup completed", 

288 extra={ 

289 "total_deleted": total_deleted, 

290 "total_archived": total_archived, 

291 "total_errors": total_errors, 

292 "policies_run": len(results), 

293 "dry_run": self.dry_run, 

294 }, 

295 ) 

296 

297 return results 

298 

299 async def _cleanup_inactive_sessions(self, cutoff_date: datetime) -> int: 

300 """ 

301 Delete sessions not accessed since cutoff date 

302 

303 Args: 

304 cutoff_date: Delete sessions last accessed before this date 

305 

306 Returns: 

307 Number of sessions deleted 

308 """ 

309 if not self.session_store: 

310 return 0 

311 

312 if self.dry_run: 

313 # In dry-run mode, just count inactive sessions without deleting 

314 inactive_sessions = await self.session_store.get_inactive_sessions(cutoff_date) 

315 return len(inactive_sessions) 

316 else: 

317 # Actually delete inactive sessions 

318 return await self.session_store.delete_inactive_sessions(cutoff_date) 

319 

320 async def _cleanup_old_conversations(self, cutoff_date: datetime) -> int: 

321 """ 

322 Delete or archive old conversations 

323 

324 Args: 

325 cutoff_date: Delete/archive conversations last updated before this date 

326 

327 Returns: 

328 Number of conversations deleted 

329 """ 

330 if not self.conversation_store: 

331 logger.warning("Conversation store not configured for retention cleanup") 

332 return 0 

333 

334 try: 

335 # Get all conversations (we'll filter archived ones older than cutoff) 

336 # Note: This implementation assumes ConversationStore supports querying by user 

337 # For a full implementation, we'd need a method like list_all_conversations() 

338 # or delete_old_archived(cutoff_date) 

339 

340 deleted_count = 0 

341 

342 if self.dry_run: 

343 # In dry-run mode, count conversations that would be deleted 

344 # This would require a count_old_archived method on the store 

345 logger.debug(f"DRY RUN: Would delete archived conversations older than {cutoff_date.isoformat()}") 

346 return 0 

347 

348 # Production implementation: Delete old archived conversations 

349 # This assumes the ConversationStore has a method to delete by date criteria 

350 # In a real implementation, you might need to: 

351 # 1. Query all users (if multi-tenant) 

352 # 2. For each user, list archived conversations 

353 # 3. Filter by last_message_at < cutoff_date 

354 # 4. Delete each conversation 

355 # 

356 # For now, we'll log a warning that this needs database integration 

357 logger.warning( 

358 "Conversation cleanup requires database query support. " 

359 "Implement list_old_archived(cutoff_date) method on ConversationStore." 

360 ) 

361 

362 # Placeholder: In production, this would be: 

363 # deleted_count = await self.conversation_store.delete_old_archived(cutoff_date) 

364 

365 return deleted_count 

366 

367 except Exception as e: 

368 logger.error(f"Failed to cleanup old conversations: {e}", exc_info=True) 

369 raise 

370 

371 async def _cleanup_old_audit_logs(self, cutoff_date: datetime) -> int: 

372 """ 

373 Archive old audit logs to cold storage 

374 

375 Args: 

376 cutoff_date: Archive logs older than this date 

377 

378 Returns: 

379 Number of logs archived 

380 """ 

381 if not self.audit_log_store: 381 ↛ 385line 381 didn't jump to line 385 because the condition on line 381 was always true

382 logger.warning("Audit log store not configured for retention cleanup") 

383 return 0 

384 

385 try: 

386 if self.dry_run: 

387 # In dry-run mode, count logs that would be archived 

388 logger.debug(f"DRY RUN: Would archive audit logs older than {cutoff_date.isoformat()}") 

389 return 0 

390 

391 # Production implementation: Archive old logs to cold storage 

392 # This integrates with the AuditLogStore to: 

393 # 1. Query all logs older than cutoff_date 

394 # 2. Export to cold storage (S3/GCS/local archive) 

395 # 3. Optionally delete from hot storage after successful archive 

396 # 

397 # The actual implementation depends on the cold storage backend 

398 # configured (S3, GCS, Azure Blob, or local filesystem) 

399 

400 archived_count = 0 

401 

402 # Get cold storage configuration from settings 

403 # This will be added to config.py as part of this feature 

404 from mcp_server_langgraph.core.config import settings 

405 

406 cold_storage_backend = getattr(settings, "audit_log_cold_storage_backend", None) 

407 

408 if not cold_storage_backend: 

409 logger.warning( 

410 "Cold storage backend not configured. " 

411 "Set AUDIT_LOG_COLD_STORAGE_BACKEND (s3, gcs, azure, local) in settings." 

412 ) 

413 return 0 

414 

415 # In production, this would export logs to cold storage 

416 # For example: 

417 # if cold_storage_backend == "s3": 

418 # archived_count = await self._archive_to_s3(cutoff_date) 

419 # elif cold_storage_backend == "gcs": 

420 # archived_count = await self._archive_to_gcs(cutoff_date) 

421 # elif cold_storage_backend == "local": 

422 # archived_count = await self._archive_to_local(cutoff_date) 

423 

424 logger.info( 

425 f"Audit log archival configured with backend: {cold_storage_backend}. " 

426 "Implement _archive_to_{backend} method for actual archival." 

427 ) 

428 

429 return archived_count 

430 

431 except Exception as e: 

432 logger.error(f"Failed to archive old audit logs: {e}", exc_info=True) 

433 raise 

434 

435 def get_retention_summary(self) -> dict[str, Any]: 

436 """ 

437 Get summary of retention policies 

438 

439 Returns: 

440 Dictionary with retention policy configuration 

441 """ 

442 return { 

443 "enabled": self.config.get("global", {}).get("enabled", True), 

444 "dry_run": self.dry_run, 

445 "policies": { 

446 name: { 

447 "retention_days": config.get("active", config.get("inactive", config.get("standard", 0))), 

448 "description": config.get("description", ""), 

449 } 

450 for name, config in self.config.get("retention_periods", {}).items() 

451 }, 

452 "next_cleanup": self._get_next_cleanup_time(), 

453 } 

454 

455 def _get_next_cleanup_time(self) -> str: 

456 """Calculate next scheduled cleanup time""" 

457 # Calculate next run at 3 AM (schedule: 0 3 * * *) 

458 # Note: Full cron parsing could be added using croniter if needed 

459 next_run = datetime.now(UTC).replace(hour=3, minute=0, second=0, microsecond=0) 

460 if next_run < datetime.now(UTC): 

461 next_run += timedelta(days=1) 

462 return next_run.isoformat().replace("+00:00", "Z")