Coverage for src / mcp_server_langgraph / compliance / gdpr / data_export.py: 73%
163 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2GDPR Data Export Service - Article 15 (Right to Access) & Article 20 (Data Portability)
3"""
5import csv
6import io
7from datetime import datetime, UTC
8from typing import Any
10from pydantic import BaseModel, ConfigDict, Field
12from mcp_server_langgraph.auth.session import SessionStore
13from mcp_server_langgraph.compliance.gdpr.factory import GDPRStorage
14from mcp_server_langgraph.observability.telemetry import logger, tracer
17class UserDataExport(BaseModel):
18 """
19 Complete user data export for GDPR compliance
21 Includes all personal data associated with a user.
22 """
24 export_id: str = Field(..., description="Unique export identifier")
25 export_timestamp: str = Field(..., description="ISO timestamp of export")
26 user_id: str = Field(..., description="User identifier")
27 username: str = Field(..., description="Username")
28 email: str = Field(..., description="User email address")
29 profile: dict[str, Any] = Field(default_factory=dict, description="User profile data")
30 sessions: list[dict[str, Any]] = Field(default_factory=list, description="Active and recent sessions")
31 conversations: list[dict[str, Any]] = Field(default_factory=list, description="Conversation history")
32 preferences: dict[str, Any] = Field(default_factory=dict, description="User preferences and settings")
33 audit_log: list[dict[str, Any]] = Field(default_factory=list, description="User activity audit log")
34 consents: list[dict[str, Any]] = Field(default_factory=list, description="Consent records")
35 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
37 model_config = ConfigDict(
38 json_schema_extra={
39 "example": {
40 "export_id": "exp_20250101120000_user123",
41 "export_timestamp": "2025-01-01T12:00:00Z",
42 "user_id": "user:alice",
43 "username": "alice",
44 "email": "alice@acme.com",
45 "profile": {"name": "Alice", "created_at": "2024-01-01"},
46 "sessions": [{"session_id": "sess_123", "created_at": "2025-01-01T10:00:00Z"}],
47 "conversations": [],
48 "preferences": {"theme": "dark"},
49 "audit_log": [],
50 "consents": [],
51 }
52 }
53 )
56class DataExportService:
57 """
58 Service for exporting user data for GDPR compliance
60 Implements Article 15 (Right to Access) and Article 20 (Data Portability).
61 """
63 def __init__(
64 self,
65 session_store: SessionStore | None = None,
66 gdpr_storage: GDPRStorage | None = None,
67 ):
68 """
69 Initialize data export service
71 Args:
72 session_store: Session storage backend
73 gdpr_storage: GDPR storage backend (user profiles, conversations, consents, etc.)
74 """
75 self.session_store = session_store
76 self.gdpr_storage = gdpr_storage
78 async def export_user_data(self, user_id: str, username: str, email: str) -> UserDataExport:
79 """
80 Export all data for a user (GDPR Article 15)
82 Args:
83 user_id: User identifier
84 username: Username
85 email: User email
87 Returns:
88 Complete user data export
89 """
90 with tracer.start_as_current_span("data_export.export_user_data") as span:
91 span.set_attribute("user_id", user_id)
93 export_id = f"exp_{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}_{user_id.replace(':', '_')}"
95 logger.info("Starting user data export", extra={"user_id": user_id, "export_id": export_id})
97 # Gather all user data
98 profile = await self._get_user_profile(user_id)
99 sessions = await self._get_user_sessions(user_id)
100 conversations = await self._get_user_conversations(user_id)
101 preferences = await self._get_user_preferences(user_id)
102 audit_log = await self._get_user_audit_log(user_id)
103 consents = await self._get_user_consents(user_id)
105 export = UserDataExport(
106 export_id=export_id,
107 export_timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
108 user_id=user_id,
109 username=username,
110 email=email,
111 profile=profile,
112 sessions=sessions,
113 conversations=conversations,
114 preferences=preferences,
115 audit_log=audit_log,
116 consents=consents,
117 metadata={"export_reason": "user_request", "gdpr_article": "15"},
118 )
120 logger.info(
121 "User data export completed",
122 extra={
123 "user_id": user_id,
124 "export_id": export_id,
125 "sessions_count": len(sessions),
126 "conversations_count": len(conversations),
127 },
128 )
130 return export
132 async def export_user_data_portable(
133 self, user_id: str, username: str, email: str, format: str = "json"
134 ) -> tuple[bytes, str]:
135 """
136 Export user data in portable format (GDPR Article 20)
138 Args:
139 user_id: User identifier
140 username: Username
141 email: User email
142 format: Export format ('json' or 'csv')
144 Returns:
145 Tuple of (data_bytes, content_type)
146 """
147 with tracer.start_as_current_span("data_export.export_portable") as span:
148 span.set_attribute("user_id", user_id)
149 span.set_attribute("format", format)
151 export = await self.export_user_data(user_id, username, email)
153 if format == "json":
154 # JSON export (machine-readable)
155 data = export.model_dump_json(indent=2).encode("utf-8")
156 content_type = "application/json"
158 elif format == "csv":
159 # CSV export (human-readable)
160 data = self._convert_to_csv(export)
161 content_type = "text/csv"
163 else:
164 msg = f"Unsupported export format: {format}"
165 raise ValueError(msg)
167 logger.info(
168 "Portable data export completed",
169 extra={"user_id": user_id, "format": format, "size_bytes": len(data)},
170 )
172 return data, content_type
174 def _convert_to_csv(self, export: UserDataExport) -> bytes:
175 """Convert export data to CSV format"""
176 output = io.StringIO()
177 writer = csv.writer(output)
179 # Write header
180 writer.writerow(["Export Metadata"])
181 writer.writerow(["Export ID", export.export_id])
182 writer.writerow(["Export Timestamp", export.export_timestamp])
183 writer.writerow(["User ID", export.user_id])
184 writer.writerow(["Username", export.username])
185 writer.writerow(["Email", export.email])
186 writer.writerow([])
188 # Write profile
189 writer.writerow(["Profile"])
190 writer.writerow(["Key", "Value"])
191 for key, value in export.profile.items():
192 writer.writerow([key, str(value)])
193 writer.writerow([])
195 # Write sessions
196 writer.writerow(["Sessions"])
197 if export.sessions: 197 ↛ 199line 197 didn't jump to line 199 because the condition on line 197 was never true
198 # Get all unique keys from sessions
199 keys = set() # type: ignore
200 for session in export.sessions:
201 keys.update(session.keys())
202 writer.writerow(list(keys))
203 for session in export.sessions:
204 writer.writerow([session.get(key, "") for key in keys])
205 else:
206 writer.writerow(["No sessions found"])
207 writer.writerow([])
209 # Write conversations
210 writer.writerow(["Conversations"])
211 if export.conversations: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true
212 keys = set()
213 for conv in export.conversations:
214 keys.update(conv.keys())
215 writer.writerow(list(keys))
216 for conv in export.conversations:
217 writer.writerow([conv.get(key, "") for key in keys])
218 else:
219 writer.writerow(["No conversations found"])
220 writer.writerow([])
222 # Write preferences
223 writer.writerow(["Preferences"])
224 writer.writerow(["Key", "Value"])
225 for key, value in export.preferences.items(): 225 ↛ 226line 225 didn't jump to line 226 because the loop on line 225 never started
226 writer.writerow([key, str(value)])
227 writer.writerow([])
229 # Write consents
230 writer.writerow(["Consents"])
231 if export.consents: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true
232 keys = set()
233 for consent in export.consents:
234 keys.update(consent.keys())
235 writer.writerow(list(keys))
236 for consent in export.consents:
237 writer.writerow([consent.get(key, "") for key in keys])
238 else:
239 writer.writerow(["No consent records found"])
241 return output.getvalue().encode("utf-8")
243 async def _get_user_profile(self, user_id: str) -> dict[str, Any]:
244 """Get user profile data"""
245 if not self.gdpr_storage:
246 # Return minimal data if no storage configured
247 return {
248 "user_id": user_id,
249 "created_at": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
250 "last_updated": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
251 }
253 try:
254 profile = await self.gdpr_storage.user_profiles.get(user_id)
255 if profile: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true
256 return profile.model_dump()
257 else:
258 # User exists but no profile data
259 return {
260 "user_id": user_id,
261 "created_at": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
262 "last_updated": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
263 }
264 except Exception as e:
265 logger.error(f"Failed to retrieve user profile: {e}", exc_info=True)
266 return {"user_id": user_id, "error": "Failed to retrieve profile"}
268 async def _get_user_sessions(self, user_id: str) -> list[dict[str, Any]]:
269 """Get all user sessions"""
270 if not self.session_store: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true
271 return []
273 try:
274 sessions = await self.session_store.list_user_sessions(user_id)
275 return [
276 {
277 "session_id": s.session_id,
278 "username": s.username,
279 "roles": s.roles,
280 "created_at": s.created_at,
281 "last_accessed": s.last_accessed,
282 "expires_at": s.expires_at,
283 "metadata": s.metadata,
284 }
285 for s in sessions
286 ]
287 except Exception as e:
288 logger.error(f"Failed to retrieve user sessions: {e}", exc_info=True)
289 return []
291 async def _get_user_conversations(self, user_id: str) -> list[dict[str, Any]]:
292 """Get user conversation history"""
293 if not self.gdpr_storage:
294 return []
296 try:
297 conversations = await self.gdpr_storage.conversations.list_user_conversations(user_id)
298 return [conv.model_dump() for conv in conversations]
299 except Exception as e:
300 logger.error(f"Failed to retrieve user conversations: {e}", exc_info=True)
301 return []
303 async def _get_user_preferences(self, user_id: str) -> dict[str, Any]:
304 """Get user preferences"""
305 if not self.gdpr_storage:
306 return {}
308 try:
309 preferences = await self.gdpr_storage.preferences.get(user_id)
310 if preferences: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true
311 return preferences.preferences
312 return {}
313 except Exception as e:
314 logger.error(f"Failed to retrieve user preferences: {e}", exc_info=True)
315 return {}
317 async def _get_user_audit_log(self, user_id: str) -> list[dict[str, Any]]:
318 """Get user audit log entries"""
319 if not self.gdpr_storage:
320 return []
322 try:
323 logs = await self.gdpr_storage.audit_logs.list_user_logs(user_id, limit=1000)
324 return [log.model_dump() for log in logs]
325 except Exception as e:
326 logger.error(f"Failed to retrieve user audit logs: {e}", exc_info=True)
327 return []
329 async def _get_user_consents(self, user_id: str) -> list[dict[str, Any]]:
330 """Get user consent records"""
331 if not self.gdpr_storage:
332 return []
334 try:
335 consents = await self.gdpr_storage.consents.get_user_consents(user_id)
336 return [consent.model_dump() for consent in consents]
337 except Exception as e:
338 logger.error(f"Failed to retrieve user consents: {e}", exc_info=True)
339 return []