Coverage for src / mcp_server_langgraph / compliance / gdpr / data_deletion.py: 77%

142 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2GDPR Data Deletion Service - Article 17 (Right to Erasure) 

3""" 

4 

5from datetime import datetime, UTC 

6from typing import Any 

7 

8from pydantic import BaseModel, Field 

9 

10from mcp_server_langgraph.auth.openfga import OpenFGAClient 

11from mcp_server_langgraph.auth.session import SessionStore 

12from mcp_server_langgraph.compliance.gdpr.factory import GDPRStorage 

13from mcp_server_langgraph.observability.telemetry import logger, tracer 

14 

15 

16class DeletionResult(BaseModel): 

17 """ 

18 Result of user data deletion operation 

19 

20 Tracks what data was deleted and any errors encountered. 

21 """ 

22 

23 success: bool = Field(..., description="Whether deletion completed successfully") 

24 user_id: str = Field(..., description="User identifier") 

25 deletion_timestamp: str = Field(..., description="ISO timestamp of deletion") 

26 deleted_items: dict[str, int] = Field(default_factory=dict, description="Count of items deleted by type") 

27 anonymized_items: dict[str, int] = Field(default_factory=dict, description="Count of items anonymized") 

28 errors: list[str] = Field(default_factory=list, description="Any errors encountered") 

29 audit_record_id: str | None = Field(None, description="Anonymized audit record ID") 

30 

31 

32class DataDeletionService: 

33 """ 

34 Service for deleting user data for GDPR compliance 

35 

36 Implements Article 17 (Right to Erasure / Right to be Forgotten). 

37 

38 This service handles complete user data deletion including: 

39 - User profile and account 

40 - All sessions 

41 - Conversations and messages 

42 - Preferences and settings 

43 - Authorization tuples (OpenFGA) 

44 - Audit logs (anonymized, not deleted) 

45 """ 

46 

47 def __init__( 

48 self, 

49 session_store: SessionStore | None = None, 

50 gdpr_storage: GDPRStorage | None = None, 

51 openfga_client: OpenFGAClient | None = None, 

52 ): 

53 """ 

54 Initialize data deletion service 

55 

56 Args: 

57 session_store: Session storage backend 

58 gdpr_storage: GDPR storage backend (user profiles, conversations, consents, etc.) 

59 openfga_client: OpenFGA authorization client 

60 """ 

61 self.session_store = session_store 

62 self.gdpr_storage = gdpr_storage 

63 self.openfga_client = openfga_client 

64 

65 async def _safe_delete(self, operation_name: str, delete_func, user_id: str, deleted_items: dict, errors: list) -> None: # type: ignore[no-untyped-def,type-arg] 

66 """ 

67 Safely execute a deletion operation with error handling 

68 

69 Args: 

70 operation_name: Name of the operation (for logging) 

71 delete_func: Async function to execute deletion 

72 user_id: User identifier 

73 deleted_items: Dict to store deletion counts 

74 errors: List to append errors to 

75 """ 

76 try: 

77 count = await delete_func(user_id) 

78 deleted_items[operation_name] = count 

79 logger.info(f"Deleted {count} {operation_name}", extra={"user_id": user_id}) 

80 except Exception as e: 

81 error_msg = f"Failed to delete {operation_name}: {e!s}" 

82 errors.append(error_msg) 

83 logger.error(error_msg, exc_info=True) 

84 

85 async def _safe_anonymize( # type: ignore[no-untyped-def] 

86 self, 

87 operation_name: str, 

88 anonymize_func, 

89 user_id: str, 

90 anonymized_items: dict[str, Any], 

91 errors: list[str], 

92 ) -> None: 

93 """ 

94 Safely execute an anonymization operation with error handling 

95 

96 Args: 

97 operation_name: Name of the operation (for logging) 

98 anonymize_func: Async function to execute anonymization 

99 user_id: User identifier 

100 anonymized_items: Dict to store anonymization counts 

101 errors: List to append errors to 

102 """ 

103 try: 

104 count = await anonymize_func(user_id) 

105 anonymized_items[operation_name] = count 

106 logger.info(f"Anonymized {count} {operation_name}", extra={"user_id": user_id}) 

107 except Exception as e: 

108 error_msg = f"Failed to anonymize {operation_name}: {e!s}" 

109 errors.append(error_msg) 

110 logger.error(error_msg, exc_info=True) 

111 

112 async def delete_user_account(self, user_id: str, username: str, reason: str = "user_request") -> DeletionResult: 

113 """ 

114 Delete all user data (GDPR Article 17) 

115 

116 This is an irreversible operation that removes all user data. 

117 Audit logs are anonymized but retained for compliance. 

118 

119 Args: 

120 user_id: User identifier 

121 username: Username 

122 reason: Reason for deletion 

123 

124 Returns: 

125 DeletionResult with deletion details 

126 """ 

127 with tracer.start_as_current_span("data_deletion.delete_user_account") as span: 

128 span.set_attribute("user_id", user_id) 

129 span.set_attribute("reason", reason) 

130 

131 logger.warning( 

132 "User account deletion requested", 

133 extra={"user_id": user_id, "username": username, "reason": reason}, 

134 ) 

135 

136 deleted_items: dict[str, int] = {} 

137 anonymized_items: dict[str, int] = {} 

138 errors: list[str] = [] 

139 

140 # 1. Delete sessions 

141 await self._safe_delete("sessions", self._delete_user_sessions, user_id, deleted_items, errors) 

142 

143 # 2. Delete conversations 

144 await self._safe_delete("conversations", self._delete_user_conversations, user_id, deleted_items, errors) 

145 

146 # 3. Delete preferences 

147 await self._safe_delete("preferences", self._delete_user_preferences, user_id, deleted_items, errors) 

148 

149 # 4. Delete OpenFGA tuples 

150 if self.openfga_client: 150 ↛ 151line 150 didn't jump to line 151 because the condition on line 150 was never true

151 await self._safe_delete( 

152 "authorization_tuples", self._delete_user_authorization_tuples, user_id, deleted_items, errors 

153 ) 

154 

155 # 5. Delete consent records 

156 if self.gdpr_storage: 

157 await self._safe_delete("consents", self._delete_user_consents, user_id, deleted_items, errors) 

158 

159 # 6. Anonymize audit logs (don't delete for compliance) 

160 await self._safe_anonymize("audit_logs", self._anonymize_user_audit_logs, user_id, anonymized_items, errors) 

161 

162 # 7. Delete user profile/account 

163 try: 

164 count = await self._delete_user_profile(user_id) 

165 deleted_items["user_profile"] = count 

166 logger.info("User profile deleted", extra={"user_id": user_id}) 

167 except Exception as e: 

168 error_msg = f"Failed to delete user profile: {e!s}" 

169 errors.append(error_msg) 

170 logger.error(error_msg, exc_info=True) 

171 

172 # 8. Create final audit record (anonymized) 

173 audit_record_id = await self._create_deletion_audit_record( 

174 user_id=user_id, username=username, reason=reason, deleted_items=deleted_items, errors=errors 

175 ) 

176 

177 success = len(errors) == 0 

178 deletion_timestamp = datetime.now(UTC).isoformat().replace("+00:00", "Z") 

179 

180 result = DeletionResult( 

181 success=success, 

182 user_id=user_id, 

183 deletion_timestamp=deletion_timestamp, 

184 deleted_items=deleted_items, 

185 anonymized_items=anonymized_items, 

186 errors=errors, 

187 audit_record_id=audit_record_id, 

188 ) 

189 

190 if success: 

191 logger.warning( 

192 "User account deletion completed successfully", 

193 extra={ 

194 "user_id": user_id, 

195 "deleted_items": deleted_items, 

196 "anonymized_items": anonymized_items, 

197 }, 

198 ) 

199 else: 

200 logger.error( 

201 "User account deletion completed with errors", 

202 extra={"user_id": user_id, "errors": errors}, 

203 ) 

204 

205 return result 

206 

207 async def _delete_user_sessions(self, user_id: str) -> int: 

208 """Delete all user sessions""" 

209 if not self.session_store: 

210 return 0 

211 

212 count = await self.session_store.delete_user_sessions(user_id) 

213 return count 

214 

215 async def _delete_user_conversations(self, user_id: str) -> int: 

216 """Delete all user conversations""" 

217 if not self.gdpr_storage: 

218 return 0 

219 

220 try: 

221 count = await self.gdpr_storage.conversations.delete_user_conversations(user_id) 

222 return count 

223 except Exception as e: 

224 logger.error(f"Failed to delete user conversations: {e}", exc_info=True) 

225 raise 

226 

227 async def _delete_user_preferences(self, user_id: str) -> int: 

228 """Delete all user preferences""" 

229 if not self.gdpr_storage: 

230 return 0 

231 

232 try: 

233 deleted = await self.gdpr_storage.preferences.delete(user_id) 

234 return 1 if deleted else 0 

235 except Exception as e: 

236 logger.error(f"Failed to delete user preferences: {e}", exc_info=True) 

237 raise 

238 

239 async def _delete_user_authorization_tuples(self, user_id: str) -> int: 

240 """Delete all OpenFGA authorization tuples for user""" 

241 if not self.openfga_client: 

242 return 0 

243 

244 # Delete all tuples where user is the subject 

245 # Note: This is a simplified implementation 

246 # In production, you'd need to query all tuples for the user first 

247 try: 

248 # Example: Delete all tuples for this user 

249 # await self.openfga_client.delete_tuples([ 

250 # {"user": user_id, "relation": "*", "object": "*"} 

251 # ]) 

252 # For now, return 0 as we need to implement the query logic 

253 return 0 

254 except Exception as e: 

255 logger.error(f"Failed to delete authorization tuples: {e}", exc_info=True) 

256 raise 

257 

258 async def _anonymize_user_audit_logs(self, user_id: str) -> int: 

259 """ 

260 Anonymize audit logs (don't delete for compliance) 

261 

262 Replace user_id with pseudonymized identifier. 

263 """ 

264 if not self.gdpr_storage: 

265 return 0 

266 

267 try: 

268 count = await self.gdpr_storage.audit_logs.anonymize_user_logs(user_id) 

269 return count 

270 except Exception as e: 

271 logger.error(f"Failed to anonymize user audit logs: {e}", exc_info=True) 

272 raise 

273 

274 async def _delete_user_consents(self, user_id: str) -> int: 

275 """Delete all user consent records""" 

276 if not self.gdpr_storage: 276 ↛ 277line 276 didn't jump to line 277 because the condition on line 276 was never true

277 return 0 

278 

279 try: 

280 count = await self.gdpr_storage.consents.delete_user_consents(user_id) 

281 return count 

282 except Exception as e: 

283 logger.error(f"Failed to delete user consents: {e}", exc_info=True) 

284 raise 

285 

286 async def _delete_user_profile(self, user_id: str) -> int: 

287 """Delete user profile/account""" 

288 if not self.gdpr_storage: 

289 # If no profile store, assume profile was deleted elsewhere 

290 return 1 

291 

292 try: 

293 deleted = await self.gdpr_storage.user_profiles.delete(user_id) 

294 return 1 if deleted else 0 

295 except Exception as e: 

296 logger.error(f"Failed to delete user profile: {e}", exc_info=True) 

297 raise 

298 

299 async def _create_deletion_audit_record( 

300 self, 

301 user_id: str, 

302 username: str, 

303 reason: str, 

304 deleted_items: dict[str, int], 

305 errors: list[str], 

306 ) -> str: 

307 """ 

308 Create audit record for deletion (anonymized) 

309 

310 This record is kept for compliance purposes. 

311 """ 

312 from mcp_server_langgraph.compliance.gdpr.storage import AuditLogEntry 

313 

314 timestamp = datetime.now(UTC).isoformat().replace("+00:00", "Z") 

315 audit_record_id = f"deletion_{datetime.now(UTC).strftime('%Y%m%d%H%M%S%f')}" 

316 

317 # Create anonymized audit log entry 

318 audit_entry = AuditLogEntry( 

319 log_id=audit_record_id, 

320 user_id="DELETED", # Anonymized for GDPR compliance 

321 action="account_deletion", 

322 resource_type="user_account", 

323 resource_id=f"hash_{abs(hash(user_id))}", # Hash for correlation if needed 

324 timestamp=timestamp, 

325 ip_address=None, # Not applicable for system action 

326 user_agent=None, # Not applicable for system action 

327 metadata={ 

328 "original_username_hash": abs(hash(username)), # Hash for potential correlation 

329 "deletion_reason": reason, 

330 "deleted_items": deleted_items, 

331 "errors_count": len(errors), 

332 "errors": errors if errors else [], 

333 "compliance_note": "User data deleted per GDPR Article 17 (Right to Erasure)", 

334 }, 

335 ) 

336 

337 # Store audit record if GDPR storage is configured 

338 if self.gdpr_storage: 

339 try: 

340 stored_id = await self.gdpr_storage.audit_logs.log(audit_entry) 

341 logger.info( 

342 "User account deletion audit record stored", 

343 extra={ 

344 "audit_record_id": stored_id, 

345 "deleted_items_count": sum(deleted_items.values()), 

346 "errors_count": len(errors), 

347 }, 

348 ) 

349 return stored_id 

350 except Exception as e: 

351 logger.error( 

352 f"Failed to store deletion audit record: {e}", 

353 exc_info=True, 

354 extra={"audit_record_id": audit_record_id}, 

355 ) 

356 # Fall through to log-only mode 

357 

358 # Fallback: Log to application logs if audit store not configured 

359 logger.warning( 

360 "User account deletion audit record (log-only, audit store not configured)", 

361 extra={ 

362 "audit_record_id": audit_record_id, 

363 "action": audit_entry.action, 

364 "resource_type": audit_entry.resource_type, 

365 "resource_id": audit_entry.resource_id, 

366 "metadata": audit_entry.metadata, 

367 "timestamp": timestamp, 

368 }, 

369 ) 

370 

371 return audit_record_id