Coverage for src / mcp_server_langgraph / compliance / gdpr / data_deletion.py: 77%
142 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2GDPR Data Deletion Service - Article 17 (Right to Erasure)
3"""
5from datetime import datetime, UTC
6from typing import Any
8from pydantic import BaseModel, Field
10from mcp_server_langgraph.auth.openfga import OpenFGAClient
11from mcp_server_langgraph.auth.session import SessionStore
12from mcp_server_langgraph.compliance.gdpr.factory import GDPRStorage
13from mcp_server_langgraph.observability.telemetry import logger, tracer
16class DeletionResult(BaseModel):
17 """
18 Result of user data deletion operation
20 Tracks what data was deleted and any errors encountered.
21 """
23 success: bool = Field(..., description="Whether deletion completed successfully")
24 user_id: str = Field(..., description="User identifier")
25 deletion_timestamp: str = Field(..., description="ISO timestamp of deletion")
26 deleted_items: dict[str, int] = Field(default_factory=dict, description="Count of items deleted by type")
27 anonymized_items: dict[str, int] = Field(default_factory=dict, description="Count of items anonymized")
28 errors: list[str] = Field(default_factory=list, description="Any errors encountered")
29 audit_record_id: str | None = Field(None, description="Anonymized audit record ID")
32class DataDeletionService:
33 """
34 Service for deleting user data for GDPR compliance
36 Implements Article 17 (Right to Erasure / Right to be Forgotten).
38 This service handles complete user data deletion including:
39 - User profile and account
40 - All sessions
41 - Conversations and messages
42 - Preferences and settings
43 - Authorization tuples (OpenFGA)
44 - Audit logs (anonymized, not deleted)
45 """
47 def __init__(
48 self,
49 session_store: SessionStore | None = None,
50 gdpr_storage: GDPRStorage | None = None,
51 openfga_client: OpenFGAClient | None = None,
52 ):
53 """
54 Initialize data deletion service
56 Args:
57 session_store: Session storage backend
58 gdpr_storage: GDPR storage backend (user profiles, conversations, consents, etc.)
59 openfga_client: OpenFGA authorization client
60 """
61 self.session_store = session_store
62 self.gdpr_storage = gdpr_storage
63 self.openfga_client = openfga_client
65 async def _safe_delete(self, operation_name: str, delete_func, user_id: str, deleted_items: dict, errors: list) -> None: # type: ignore[no-untyped-def,type-arg]
66 """
67 Safely execute a deletion operation with error handling
69 Args:
70 operation_name: Name of the operation (for logging)
71 delete_func: Async function to execute deletion
72 user_id: User identifier
73 deleted_items: Dict to store deletion counts
74 errors: List to append errors to
75 """
76 try:
77 count = await delete_func(user_id)
78 deleted_items[operation_name] = count
79 logger.info(f"Deleted {count} {operation_name}", extra={"user_id": user_id})
80 except Exception as e:
81 error_msg = f"Failed to delete {operation_name}: {e!s}"
82 errors.append(error_msg)
83 logger.error(error_msg, exc_info=True)
85 async def _safe_anonymize( # type: ignore[no-untyped-def]
86 self,
87 operation_name: str,
88 anonymize_func,
89 user_id: str,
90 anonymized_items: dict[str, Any],
91 errors: list[str],
92 ) -> None:
93 """
94 Safely execute an anonymization operation with error handling
96 Args:
97 operation_name: Name of the operation (for logging)
98 anonymize_func: Async function to execute anonymization
99 user_id: User identifier
100 anonymized_items: Dict to store anonymization counts
101 errors: List to append errors to
102 """
103 try:
104 count = await anonymize_func(user_id)
105 anonymized_items[operation_name] = count
106 logger.info(f"Anonymized {count} {operation_name}", extra={"user_id": user_id})
107 except Exception as e:
108 error_msg = f"Failed to anonymize {operation_name}: {e!s}"
109 errors.append(error_msg)
110 logger.error(error_msg, exc_info=True)
112 async def delete_user_account(self, user_id: str, username: str, reason: str = "user_request") -> DeletionResult:
113 """
114 Delete all user data (GDPR Article 17)
116 This is an irreversible operation that removes all user data.
117 Audit logs are anonymized but retained for compliance.
119 Args:
120 user_id: User identifier
121 username: Username
122 reason: Reason for deletion
124 Returns:
125 DeletionResult with deletion details
126 """
127 with tracer.start_as_current_span("data_deletion.delete_user_account") as span:
128 span.set_attribute("user_id", user_id)
129 span.set_attribute("reason", reason)
131 logger.warning(
132 "User account deletion requested",
133 extra={"user_id": user_id, "username": username, "reason": reason},
134 )
136 deleted_items: dict[str, int] = {}
137 anonymized_items: dict[str, int] = {}
138 errors: list[str] = []
140 # 1. Delete sessions
141 await self._safe_delete("sessions", self._delete_user_sessions, user_id, deleted_items, errors)
143 # 2. Delete conversations
144 await self._safe_delete("conversations", self._delete_user_conversations, user_id, deleted_items, errors)
146 # 3. Delete preferences
147 await self._safe_delete("preferences", self._delete_user_preferences, user_id, deleted_items, errors)
149 # 4. Delete OpenFGA tuples
150 if self.openfga_client: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true
151 await self._safe_delete(
152 "authorization_tuples", self._delete_user_authorization_tuples, user_id, deleted_items, errors
153 )
155 # 5. Delete consent records
156 if self.gdpr_storage:
157 await self._safe_delete("consents", self._delete_user_consents, user_id, deleted_items, errors)
159 # 6. Anonymize audit logs (don't delete for compliance)
160 await self._safe_anonymize("audit_logs", self._anonymize_user_audit_logs, user_id, anonymized_items, errors)
162 # 7. Delete user profile/account
163 try:
164 count = await self._delete_user_profile(user_id)
165 deleted_items["user_profile"] = count
166 logger.info("User profile deleted", extra={"user_id": user_id})
167 except Exception as e:
168 error_msg = f"Failed to delete user profile: {e!s}"
169 errors.append(error_msg)
170 logger.error(error_msg, exc_info=True)
172 # 8. Create final audit record (anonymized)
173 audit_record_id = await self._create_deletion_audit_record(
174 user_id=user_id, username=username, reason=reason, deleted_items=deleted_items, errors=errors
175 )
177 success = len(errors) == 0
178 deletion_timestamp = datetime.now(UTC).isoformat().replace("+00:00", "Z")
180 result = DeletionResult(
181 success=success,
182 user_id=user_id,
183 deletion_timestamp=deletion_timestamp,
184 deleted_items=deleted_items,
185 anonymized_items=anonymized_items,
186 errors=errors,
187 audit_record_id=audit_record_id,
188 )
190 if success:
191 logger.warning(
192 "User account deletion completed successfully",
193 extra={
194 "user_id": user_id,
195 "deleted_items": deleted_items,
196 "anonymized_items": anonymized_items,
197 },
198 )
199 else:
200 logger.error(
201 "User account deletion completed with errors",
202 extra={"user_id": user_id, "errors": errors},
203 )
205 return result
207 async def _delete_user_sessions(self, user_id: str) -> int:
208 """Delete all user sessions"""
209 if not self.session_store:
210 return 0
212 count = await self.session_store.delete_user_sessions(user_id)
213 return count
215 async def _delete_user_conversations(self, user_id: str) -> int:
216 """Delete all user conversations"""
217 if not self.gdpr_storage:
218 return 0
220 try:
221 count = await self.gdpr_storage.conversations.delete_user_conversations(user_id)
222 return count
223 except Exception as e:
224 logger.error(f"Failed to delete user conversations: {e}", exc_info=True)
225 raise
227 async def _delete_user_preferences(self, user_id: str) -> int:
228 """Delete all user preferences"""
229 if not self.gdpr_storage:
230 return 0
232 try:
233 deleted = await self.gdpr_storage.preferences.delete(user_id)
234 return 1 if deleted else 0
235 except Exception as e:
236 logger.error(f"Failed to delete user preferences: {e}", exc_info=True)
237 raise
239 async def _delete_user_authorization_tuples(self, user_id: str) -> int:
240 """Delete all OpenFGA authorization tuples for user"""
241 if not self.openfga_client:
242 return 0
244 # Delete all tuples where user is the subject
245 # Note: This is a simplified implementation
246 # In production, you'd need to query all tuples for the user first
247 try:
248 # Example: Delete all tuples for this user
249 # await self.openfga_client.delete_tuples([
250 # {"user": user_id, "relation": "*", "object": "*"}
251 # ])
252 # For now, return 0 as we need to implement the query logic
253 return 0
254 except Exception as e:
255 logger.error(f"Failed to delete authorization tuples: {e}", exc_info=True)
256 raise
258 async def _anonymize_user_audit_logs(self, user_id: str) -> int:
259 """
260 Anonymize audit logs (don't delete for compliance)
262 Replace user_id with pseudonymized identifier.
263 """
264 if not self.gdpr_storage:
265 return 0
267 try:
268 count = await self.gdpr_storage.audit_logs.anonymize_user_logs(user_id)
269 return count
270 except Exception as e:
271 logger.error(f"Failed to anonymize user audit logs: {e}", exc_info=True)
272 raise
274 async def _delete_user_consents(self, user_id: str) -> int:
275 """Delete all user consent records"""
276 if not self.gdpr_storage: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true
277 return 0
279 try:
280 count = await self.gdpr_storage.consents.delete_user_consents(user_id)
281 return count
282 except Exception as e:
283 logger.error(f"Failed to delete user consents: {e}", exc_info=True)
284 raise
286 async def _delete_user_profile(self, user_id: str) -> int:
287 """Delete user profile/account"""
288 if not self.gdpr_storage:
289 # If no profile store, assume profile was deleted elsewhere
290 return 1
292 try:
293 deleted = await self.gdpr_storage.user_profiles.delete(user_id)
294 return 1 if deleted else 0
295 except Exception as e:
296 logger.error(f"Failed to delete user profile: {e}", exc_info=True)
297 raise
299 async def _create_deletion_audit_record(
300 self,
301 user_id: str,
302 username: str,
303 reason: str,
304 deleted_items: dict[str, int],
305 errors: list[str],
306 ) -> str:
307 """
308 Create audit record for deletion (anonymized)
310 This record is kept for compliance purposes.
311 """
312 from mcp_server_langgraph.compliance.gdpr.storage import AuditLogEntry
314 timestamp = datetime.now(UTC).isoformat().replace("+00:00", "Z")
315 audit_record_id = f"deletion_{datetime.now(UTC).strftime('%Y%m%d%H%M%S%f')}"
317 # Create anonymized audit log entry
318 audit_entry = AuditLogEntry(
319 log_id=audit_record_id,
320 user_id="DELETED", # Anonymized for GDPR compliance
321 action="account_deletion",
322 resource_type="user_account",
323 resource_id=f"hash_{abs(hash(user_id))}", # Hash for correlation if needed
324 timestamp=timestamp,
325 ip_address=None, # Not applicable for system action
326 user_agent=None, # Not applicable for system action
327 metadata={
328 "original_username_hash": abs(hash(username)), # Hash for potential correlation
329 "deletion_reason": reason,
330 "deleted_items": deleted_items,
331 "errors_count": len(errors),
332 "errors": errors if errors else [],
333 "compliance_note": "User data deleted per GDPR Article 17 (Right to Erasure)",
334 },
335 )
337 # Store audit record if GDPR storage is configured
338 if self.gdpr_storage:
339 try:
340 stored_id = await self.gdpr_storage.audit_logs.log(audit_entry)
341 logger.info(
342 "User account deletion audit record stored",
343 extra={
344 "audit_record_id": stored_id,
345 "deleted_items_count": sum(deleted_items.values()),
346 "errors_count": len(errors),
347 },
348 )
349 return stored_id
350 except Exception as e:
351 logger.error(
352 f"Failed to store deletion audit record: {e}",
353 exc_info=True,
354 extra={"audit_record_id": audit_record_id},
355 )
356 # Fall through to log-only mode
358 # Fallback: Log to application logs if audit store not configured
359 logger.warning(
360 "User account deletion audit record (log-only, audit store not configured)",
361 extra={
362 "audit_record_id": audit_record_id,
363 "action": audit_entry.action,
364 "resource_type": audit_entry.resource_type,
365 "resource_id": audit_entry.resource_id,
366 "metadata": audit_entry.metadata,
367 "timestamp": timestamp,
368 },
369 )
371 return audit_record_id