Coverage for src / mcp_server_langgraph / compliance / retention.py: 68%
170 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Data Retention Service - GDPR Article 5(1)(e) Storage Limitation
4Implements automated data retention policies with configurable cleanup schedules.
5Ensures compliance with GDPR data minimization and SOC 2 storage requirements.
6"""
8from datetime import datetime, timedelta, UTC
9from pathlib import Path
10from typing import Any
12import yaml
13from pydantic import BaseModel, Field
15from mcp_server_langgraph.auth.session import SessionStore
16from mcp_server_langgraph.compliance.gdpr.storage import AuditLogStore, ConversationStore
17from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer
20class RetentionPolicy(BaseModel):
21 """Data retention policy configuration"""
23 data_type: str = Field(..., description="Type of data (sessions, conversations, etc.)")
24 retention_days: int = Field(..., description="Number of days to retain data")
25 enabled: bool = Field(default=True, description="Whether policy is enabled")
26 soft_delete: bool = Field(default=False, description="Soft delete before permanent deletion")
27 notify_before: bool = Field(default=False, description="Notify before deletion")
30class RetentionResult(BaseModel):
31 """Result of retention policy execution"""
33 policy_name: str
34 execution_timestamp: str
35 deleted_count: int = 0
36 archived_count: int = 0
37 errors: list[str] = Field(default_factory=list)
38 dry_run: bool = False
41class DataRetentionService:
42 """
43 Service for enforcing data retention policies
45 Features:
46 - Configurable retention periods per data type
47 - Scheduled cleanup execution
48 - Dry-run mode for testing
49 - Audit logging of all deletions
50 - Metrics tracking
51 """
53 def __init__(
54 self,
55 config_path: str = "config/retention_policies.yaml",
56 session_store: SessionStore | None = None,
57 conversation_store: ConversationStore | None = None,
58 audit_log_store: AuditLogStore | None = None,
59 dry_run: bool = False,
60 ):
61 """
62 Initialize data retention service
64 Args:
65 config_path: Path to retention policies YAML file
66 session_store: Session storage backend
67 conversation_store: Conversation storage backend
68 audit_log_store: Audit log storage backend
69 dry_run: If True, only simulate deletions without actually deleting
70 """
71 self.config_path = Path(config_path)
72 self.session_store = session_store
73 self.conversation_store = conversation_store
74 self.audit_log_store = audit_log_store
75 self.dry_run = dry_run
76 self.config = self._load_config()
78 def _load_config(self) -> dict[str, Any]:
79 """Load retention policies from YAML configuration"""
80 try:
81 if not self.config_path.exists():
82 logger.warning(f"Retention config not found: {self.config_path}, using defaults")
83 return self._default_config()
85 with open(self.config_path) as f:
86 config = yaml.safe_load(f)
88 logger.info(f"Loaded retention policies from {self.config_path}")
89 return config # type: ignore[no-any-return]
91 except Exception as e:
92 logger.error(f"Failed to load retention config: {e}", exc_info=True)
93 return self._default_config()
95 def _default_config(self) -> dict[str, Any]:
96 """Default retention configuration"""
97 return {
98 "global": {"enabled": True, "dry_run": False},
99 "retention_periods": {
100 "user_sessions": {"inactive": 90},
101 "conversations": {"archived": 90},
102 "audit_logs": {"standard": 2555},
103 },
104 }
106 async def cleanup_sessions(self) -> RetentionResult:
107 """
108 Clean up old sessions based on retention policy
110 Returns:
111 RetentionResult with deletion counts
112 """
113 with tracer.start_as_current_span("retention.cleanup_sessions") as span:
114 policy_config = self.config.get("retention_periods", {}).get("user_sessions", {})
115 retention_days = policy_config.get("inactive", 90)
117 result = RetentionResult(
118 policy_name="user_sessions",
119 execution_timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
120 dry_run=self.dry_run,
121 )
123 if not self.session_store:
124 logger.warning("Session store not available for retention cleanup")
125 result.errors.append("Session store not configured")
126 return result
128 try:
129 cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
130 logger.info(
131 f"Cleaning up sessions inactive since {cutoff_date.isoformat()}",
132 extra={"retention_days": retention_days, "dry_run": self.dry_run},
133 )
135 # Get all sessions (implementation depends on SessionStore interface)
136 # For now, this is a placeholder - actual implementation would query sessions
137 deleted_count = await self._cleanup_inactive_sessions(cutoff_date)
139 result.deleted_count = deleted_count
140 span.set_attribute("deleted_count", deleted_count)
142 # Track metrics
143 if not self.dry_run:
144 metrics.successful_calls.add(1, {"operation": "session_cleanup"})
146 logger.info(
147 "Session cleanup completed",
148 extra={
149 "deleted_count": deleted_count,
150 "retention_days": retention_days,
151 "dry_run": self.dry_run,
152 },
153 )
155 except Exception as e:
156 error_msg = f"Session cleanup failed: {e!s}"
157 result.errors.append(error_msg)
158 logger.error(error_msg, exc_info=True)
159 span.record_exception(e)
161 return result
163 async def cleanup_conversations(self) -> RetentionResult:
164 """
165 Clean up old conversations based on retention policy
167 Returns:
168 RetentionResult with deletion counts
169 """
170 with tracer.start_as_current_span("retention.cleanup_conversations"):
171 policy_config = self.config.get("retention_periods", {}).get("conversations", {})
172 archived_retention = policy_config.get("archived", 90)
174 result = RetentionResult(
175 policy_name="conversations",
176 execution_timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
177 dry_run=self.dry_run,
178 )
180 try:
181 cutoff_date = datetime.now(UTC) - timedelta(days=archived_retention)
182 logger.info(
183 f"Cleaning up archived conversations older than {cutoff_date.isoformat()}",
184 extra={"retention_days": archived_retention, "dry_run": self.dry_run},
185 )
187 # Call internal cleanup method
188 deleted_count = await self._cleanup_old_conversations(cutoff_date)
189 result.deleted_count = deleted_count
191 logger.info(
192 "Conversation cleanup completed",
193 extra={"deleted_count": deleted_count, "dry_run": self.dry_run},
194 )
196 except Exception as e:
197 error_msg = f"Conversation cleanup failed: {e!s}"
198 result.errors.append(error_msg)
199 logger.error(error_msg, exc_info=True)
201 return result
203 async def cleanup_audit_logs(self) -> RetentionResult:
204 """
205 Clean up old audit logs based on retention policy
207 Note: Most audit logs are retained for 7 years (2555 days) for compliance.
209 Returns:
210 RetentionResult with deletion/archive counts
211 """
212 with tracer.start_as_current_span("retention.cleanup_audit_logs"):
213 policy_config = self.config.get("retention_periods", {}).get("audit_logs", {})
214 retention_days = policy_config.get("standard", 2555)
216 result = RetentionResult(
217 policy_name="audit_logs",
218 execution_timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
219 dry_run=self.dry_run,
220 )
222 try:
223 cutoff_date = datetime.now(UTC) - timedelta(days=retention_days)
224 logger.info(
225 f"Archiving audit logs older than {cutoff_date.isoformat()}",
226 extra={"retention_days": retention_days, "dry_run": self.dry_run},
227 )
229 # Call internal cleanup method
230 archived_count = await self._cleanup_old_audit_logs(cutoff_date)
231 result.archived_count = archived_count
233 logger.info(
234 "Audit log cleanup completed",
235 extra={"archived_count": archived_count, "dry_run": self.dry_run},
236 )
238 except Exception as e:
239 error_msg = f"Audit log cleanup failed: {e!s}"
240 result.errors.append(error_msg)
241 logger.error(error_msg, exc_info=True)
243 return result
245 async def run_all_cleanups(self) -> list[RetentionResult]:
246 """
247 Run all configured retention policies
249 Returns:
250 List of RetentionResults for each policy
251 """
252 with tracer.start_as_current_span("retention.run_all_cleanups") as span:
253 logger.info("Starting retention policy cleanup", extra={"dry_run": self.dry_run})
255 results = [] # type: ignore[var-annotated]
257 # Check if retention is globally enabled
258 if not self.config.get("global", {}).get("enabled", True): 258 ↛ 259line 258 didn't jump to line 259 because the condition on line 258 was never true
259 logger.warning("Data retention globally disabled in configuration")
260 return results
262 # Run session cleanup
263 if self.config.get("cleanup_actions", {}).get("sessions", {}).get("enabled", True): 263 ↛ 268line 263 didn't jump to line 268 because the condition on line 263 was always true
264 result = await self.cleanup_sessions()
265 results.append(result)
267 # Run conversation cleanup
268 if self.config.get("cleanup_actions", {}).get("conversations", {}).get("enabled", True): 268 ↛ 273line 268 didn't jump to line 273 because the condition on line 268 was always true
269 result = await self.cleanup_conversations()
270 results.append(result)
272 # Run audit log cleanup
273 if self.config.get("cleanup_actions", {}).get("audit_logs", {}).get("enabled", True): 273 ↛ 278line 273 didn't jump to line 278 because the condition on line 273 was always true
274 result = await self.cleanup_audit_logs()
275 results.append(result)
277 # Calculate totals
278 total_deleted = sum(r.deleted_count for r in results)
279 total_archived = sum(r.archived_count for r in results)
280 total_errors = sum(len(r.errors) for r in results)
282 span.set_attribute("total_deleted", total_deleted)
283 span.set_attribute("total_archived", total_archived)
284 span.set_attribute("total_errors", total_errors)
286 logger.info(
287 "Retention policy cleanup completed",
288 extra={
289 "total_deleted": total_deleted,
290 "total_archived": total_archived,
291 "total_errors": total_errors,
292 "policies_run": len(results),
293 "dry_run": self.dry_run,
294 },
295 )
297 return results
299 async def _cleanup_inactive_sessions(self, cutoff_date: datetime) -> int:
300 """
301 Delete sessions not accessed since cutoff date
303 Args:
304 cutoff_date: Delete sessions last accessed before this date
306 Returns:
307 Number of sessions deleted
308 """
309 if not self.session_store:
310 return 0
312 if self.dry_run:
313 # In dry-run mode, just count inactive sessions without deleting
314 inactive_sessions = await self.session_store.get_inactive_sessions(cutoff_date)
315 return len(inactive_sessions)
316 else:
317 # Actually delete inactive sessions
318 return await self.session_store.delete_inactive_sessions(cutoff_date)
320 async def _cleanup_old_conversations(self, cutoff_date: datetime) -> int:
321 """
322 Delete or archive old conversations
324 Args:
325 cutoff_date: Delete/archive conversations last updated before this date
327 Returns:
328 Number of conversations deleted
329 """
330 if not self.conversation_store:
331 logger.warning("Conversation store not configured for retention cleanup")
332 return 0
334 try:
335 # Get all conversations (we'll filter archived ones older than cutoff)
336 # Note: This implementation assumes ConversationStore supports querying by user
337 # For a full implementation, we'd need a method like list_all_conversations()
338 # or delete_old_archived(cutoff_date)
340 deleted_count = 0
342 if self.dry_run:
343 # In dry-run mode, count conversations that would be deleted
344 # This would require a count_old_archived method on the store
345 logger.debug(f"DRY RUN: Would delete archived conversations older than {cutoff_date.isoformat()}")
346 return 0
348 # Production implementation: Delete old archived conversations
349 # This assumes the ConversationStore has a method to delete by date criteria
350 # In a real implementation, you might need to:
351 # 1. Query all users (if multi-tenant)
352 # 2. For each user, list archived conversations
353 # 3. Filter by last_message_at < cutoff_date
354 # 4. Delete each conversation
355 #
356 # For now, we'll log a warning that this needs database integration
357 logger.warning(
358 "Conversation cleanup requires database query support. "
359 "Implement list_old_archived(cutoff_date) method on ConversationStore."
360 )
362 # Placeholder: In production, this would be:
363 # deleted_count = await self.conversation_store.delete_old_archived(cutoff_date)
365 return deleted_count
367 except Exception as e:
368 logger.error(f"Failed to cleanup old conversations: {e}", exc_info=True)
369 raise
371 async def _cleanup_old_audit_logs(self, cutoff_date: datetime) -> int:
372 """
373 Archive old audit logs to cold storage
375 Args:
376 cutoff_date: Archive logs older than this date
378 Returns:
379 Number of logs archived
380 """
381 if not self.audit_log_store: 381 ↛ 385line 381 didn't jump to line 385 because the condition on line 381 was always true
382 logger.warning("Audit log store not configured for retention cleanup")
383 return 0
385 try:
386 if self.dry_run:
387 # In dry-run mode, count logs that would be archived
388 logger.debug(f"DRY RUN: Would archive audit logs older than {cutoff_date.isoformat()}")
389 return 0
391 # Production implementation: Archive old logs to cold storage
392 # This integrates with the AuditLogStore to:
393 # 1. Query all logs older than cutoff_date
394 # 2. Export to cold storage (S3/GCS/local archive)
395 # 3. Optionally delete from hot storage after successful archive
396 #
397 # The actual implementation depends on the cold storage backend
398 # configured (S3, GCS, Azure Blob, or local filesystem)
400 archived_count = 0
402 # Get cold storage configuration from settings
403 # This will be added to config.py as part of this feature
404 from mcp_server_langgraph.core.config import settings
406 cold_storage_backend = getattr(settings, "audit_log_cold_storage_backend", None)
408 if not cold_storage_backend:
409 logger.warning(
410 "Cold storage backend not configured. "
411 "Set AUDIT_LOG_COLD_STORAGE_BACKEND (s3, gcs, azure, local) in settings."
412 )
413 return 0
415 # In production, this would export logs to cold storage
416 # For example:
417 # if cold_storage_backend == "s3":
418 # archived_count = await self._archive_to_s3(cutoff_date)
419 # elif cold_storage_backend == "gcs":
420 # archived_count = await self._archive_to_gcs(cutoff_date)
421 # elif cold_storage_backend == "local":
422 # archived_count = await self._archive_to_local(cutoff_date)
424 logger.info(
425 f"Audit log archival configured with backend: {cold_storage_backend}. "
426 "Implement _archive_to_{backend} method for actual archival."
427 )
429 return archived_count
431 except Exception as e:
432 logger.error(f"Failed to archive old audit logs: {e}", exc_info=True)
433 raise
435 def get_retention_summary(self) -> dict[str, Any]:
436 """
437 Get summary of retention policies
439 Returns:
440 Dictionary with retention policy configuration
441 """
442 return {
443 "enabled": self.config.get("global", {}).get("enabled", True),
444 "dry_run": self.dry_run,
445 "policies": {
446 name: {
447 "retention_days": config.get("active", config.get("inactive", config.get("standard", 0))),
448 "description": config.get("description", ""),
449 }
450 for name, config in self.config.get("retention_periods", {}).items()
451 },
452 "next_cleanup": self._get_next_cleanup_time(),
453 }
455 def _get_next_cleanup_time(self) -> str:
456 """Calculate next scheduled cleanup time"""
457 # Calculate next run at 3 AM (schedule: 0 3 * * *)
458 # Note: Full cron parsing could be added using croniter if needed
459 next_run = datetime.now(UTC).replace(hour=3, minute=0, second=0, microsecond=0)
460 if next_run < datetime.now(UTC):
461 next_run += timedelta(days=1)
462 return next_run.isoformat().replace("+00:00", "Z")