Coverage for src / mcp_server_langgraph / compliance / gdpr / data_export.py: 73%

163 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2GDPR Data Export Service - Article 15 (Right to Access) & Article 20 (Data Portability) 

3""" 

4 

5import csv 

6import io 

7from datetime import datetime, UTC 

8from typing import Any 

9 

10from pydantic import BaseModel, ConfigDict, Field 

11 

12from mcp_server_langgraph.auth.session import SessionStore 

13from mcp_server_langgraph.compliance.gdpr.factory import GDPRStorage 

14from mcp_server_langgraph.observability.telemetry import logger, tracer 

15 

16 

17class UserDataExport(BaseModel): 

18 """ 

19 Complete user data export for GDPR compliance 

20 

21 Includes all personal data associated with a user. 

22 """ 

23 

24 export_id: str = Field(..., description="Unique export identifier") 

25 export_timestamp: str = Field(..., description="ISO timestamp of export") 

26 user_id: str = Field(..., description="User identifier") 

27 username: str = Field(..., description="Username") 

28 email: str = Field(..., description="User email address") 

29 profile: dict[str, Any] = Field(default_factory=dict, description="User profile data") 

30 sessions: list[dict[str, Any]] = Field(default_factory=list, description="Active and recent sessions") 

31 conversations: list[dict[str, Any]] = Field(default_factory=list, description="Conversation history") 

32 preferences: dict[str, Any] = Field(default_factory=dict, description="User preferences and settings") 

33 audit_log: list[dict[str, Any]] = Field(default_factory=list, description="User activity audit log") 

34 consents: list[dict[str, Any]] = Field(default_factory=list, description="Consent records") 

35 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata") 

36 

37 model_config = ConfigDict( 

38 json_schema_extra={ 

39 "example": { 

40 "export_id": "exp_20250101120000_user123", 

41 "export_timestamp": "2025-01-01T12:00:00Z", 

42 "user_id": "user:alice", 

43 "username": "alice", 

44 "email": "alice@acme.com", 

45 "profile": {"name": "Alice", "created_at": "2024-01-01"}, 

46 "sessions": [{"session_id": "sess_123", "created_at": "2025-01-01T10:00:00Z"}], 

47 "conversations": [], 

48 "preferences": {"theme": "dark"}, 

49 "audit_log": [], 

50 "consents": [], 

51 } 

52 } 

53 ) 

54 

55 

56class DataExportService: 

57 """ 

58 Service for exporting user data for GDPR compliance 

59 

60 Implements Article 15 (Right to Access) and Article 20 (Data Portability). 

61 """ 

62 

63 def __init__( 

64 self, 

65 session_store: SessionStore | None = None, 

66 gdpr_storage: GDPRStorage | None = None, 

67 ): 

68 """ 

69 Initialize data export service 

70 

71 Args: 

72 session_store: Session storage backend 

73 gdpr_storage: GDPR storage backend (user profiles, conversations, consents, etc.) 

74 """ 

75 self.session_store = session_store 

76 self.gdpr_storage = gdpr_storage 

77 

78 async def export_user_data(self, user_id: str, username: str, email: str) -> UserDataExport: 

79 """ 

80 Export all data for a user (GDPR Article 15) 

81 

82 Args: 

83 user_id: User identifier 

84 username: Username 

85 email: User email 

86 

87 Returns: 

88 Complete user data export 

89 """ 

90 with tracer.start_as_current_span("data_export.export_user_data") as span: 

91 span.set_attribute("user_id", user_id) 

92 

93 export_id = f"exp_{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}_{user_id.replace(':', '_')}" 

94 

95 logger.info("Starting user data export", extra={"user_id": user_id, "export_id": export_id}) 

96 

97 # Gather all user data 

98 profile = await self._get_user_profile(user_id) 

99 sessions = await self._get_user_sessions(user_id) 

100 conversations = await self._get_user_conversations(user_id) 

101 preferences = await self._get_user_preferences(user_id) 

102 audit_log = await self._get_user_audit_log(user_id) 

103 consents = await self._get_user_consents(user_id) 

104 

105 export = UserDataExport( 

106 export_id=export_id, 

107 export_timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

108 user_id=user_id, 

109 username=username, 

110 email=email, 

111 profile=profile, 

112 sessions=sessions, 

113 conversations=conversations, 

114 preferences=preferences, 

115 audit_log=audit_log, 

116 consents=consents, 

117 metadata={"export_reason": "user_request", "gdpr_article": "15"}, 

118 ) 

119 

120 logger.info( 

121 "User data export completed", 

122 extra={ 

123 "user_id": user_id, 

124 "export_id": export_id, 

125 "sessions_count": len(sessions), 

126 "conversations_count": len(conversations), 

127 }, 

128 ) 

129 

130 return export 

131 

132 async def export_user_data_portable( 

133 self, user_id: str, username: str, email: str, format: str = "json" 

134 ) -> tuple[bytes, str]: 

135 """ 

136 Export user data in portable format (GDPR Article 20) 

137 

138 Args: 

139 user_id: User identifier 

140 username: Username 

141 email: User email 

142 format: Export format ('json' or 'csv') 

143 

144 Returns: 

145 Tuple of (data_bytes, content_type) 

146 """ 

147 with tracer.start_as_current_span("data_export.export_portable") as span: 

148 span.set_attribute("user_id", user_id) 

149 span.set_attribute("format", format) 

150 

151 export = await self.export_user_data(user_id, username, email) 

152 

153 if format == "json": 

154 # JSON export (machine-readable) 

155 data = export.model_dump_json(indent=2).encode("utf-8") 

156 content_type = "application/json" 

157 

158 elif format == "csv": 

159 # CSV export (human-readable) 

160 data = self._convert_to_csv(export) 

161 content_type = "text/csv" 

162 

163 else: 

164 msg = f"Unsupported export format: {format}" 

165 raise ValueError(msg) 

166 

167 logger.info( 

168 "Portable data export completed", 

169 extra={"user_id": user_id, "format": format, "size_bytes": len(data)}, 

170 ) 

171 

172 return data, content_type 

173 

174 def _convert_to_csv(self, export: UserDataExport) -> bytes: 

175 """Convert export data to CSV format""" 

176 output = io.StringIO() 

177 writer = csv.writer(output) 

178 

179 # Write header 

180 writer.writerow(["Export Metadata"]) 

181 writer.writerow(["Export ID", export.export_id]) 

182 writer.writerow(["Export Timestamp", export.export_timestamp]) 

183 writer.writerow(["User ID", export.user_id]) 

184 writer.writerow(["Username", export.username]) 

185 writer.writerow(["Email", export.email]) 

186 writer.writerow([]) 

187 

188 # Write profile 

189 writer.writerow(["Profile"]) 

190 writer.writerow(["Key", "Value"]) 

191 for key, value in export.profile.items(): 

192 writer.writerow([key, str(value)]) 

193 writer.writerow([]) 

194 

195 # Write sessions 

196 writer.writerow(["Sessions"]) 

197 if export.sessions: 197 ↛ 199line 197 didn't jump to line 199 because the condition on line 197 was never true

198 # Get all unique keys from sessions 

199 keys = set() # type: ignore 

200 for session in export.sessions: 

201 keys.update(session.keys()) 

202 writer.writerow(list(keys)) 

203 for session in export.sessions: 

204 writer.writerow([session.get(key, "") for key in keys]) 

205 else: 

206 writer.writerow(["No sessions found"]) 

207 writer.writerow([]) 

208 

209 # Write conversations 

210 writer.writerow(["Conversations"]) 

211 if export.conversations: 211 ↛ 212line 211 didn't jump to line 212 because the condition on line 211 was never true

212 keys = set() 

213 for conv in export.conversations: 

214 keys.update(conv.keys()) 

215 writer.writerow(list(keys)) 

216 for conv in export.conversations: 

217 writer.writerow([conv.get(key, "") for key in keys]) 

218 else: 

219 writer.writerow(["No conversations found"]) 

220 writer.writerow([]) 

221 

222 # Write preferences 

223 writer.writerow(["Preferences"]) 

224 writer.writerow(["Key", "Value"]) 

225 for key, value in export.preferences.items(): 225 ↛ 226line 225 didn't jump to line 226 because the loop on line 225 never started

226 writer.writerow([key, str(value)]) 

227 writer.writerow([]) 

228 

229 # Write consents 

230 writer.writerow(["Consents"]) 

231 if export.consents: 231 ↛ 232line 231 didn't jump to line 232 because the condition on line 231 was never true

232 keys = set() 

233 for consent in export.consents: 

234 keys.update(consent.keys()) 

235 writer.writerow(list(keys)) 

236 for consent in export.consents: 

237 writer.writerow([consent.get(key, "") for key in keys]) 

238 else: 

239 writer.writerow(["No consent records found"]) 

240 

241 return output.getvalue().encode("utf-8") 

242 

243 async def _get_user_profile(self, user_id: str) -> dict[str, Any]: 

244 """Get user profile data""" 

245 if not self.gdpr_storage: 

246 # Return minimal data if no storage configured 

247 return { 

248 "user_id": user_id, 

249 "created_at": datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

250 "last_updated": datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

251 } 

252 

253 try: 

254 profile = await self.gdpr_storage.user_profiles.get(user_id) 

255 if profile: 255 ↛ 256line 255 didn't jump to line 256 because the condition on line 255 was never true

256 return profile.model_dump() 

257 else: 

258 # User exists but no profile data 

259 return { 

260 "user_id": user_id, 

261 "created_at": datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

262 "last_updated": datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

263 } 

264 except Exception as e: 

265 logger.error(f"Failed to retrieve user profile: {e}", exc_info=True) 

266 return {"user_id": user_id, "error": "Failed to retrieve profile"} 

267 

268 async def _get_user_sessions(self, user_id: str) -> list[dict[str, Any]]: 

269 """Get all user sessions""" 

270 if not self.session_store: 270 ↛ 271line 270 didn't jump to line 271 because the condition on line 270 was never true

271 return [] 

272 

273 try: 

274 sessions = await self.session_store.list_user_sessions(user_id) 

275 return [ 

276 { 

277 "session_id": s.session_id, 

278 "username": s.username, 

279 "roles": s.roles, 

280 "created_at": s.created_at, 

281 "last_accessed": s.last_accessed, 

282 "expires_at": s.expires_at, 

283 "metadata": s.metadata, 

284 } 

285 for s in sessions 

286 ] 

287 except Exception as e: 

288 logger.error(f"Failed to retrieve user sessions: {e}", exc_info=True) 

289 return [] 

290 

291 async def _get_user_conversations(self, user_id: str) -> list[dict[str, Any]]: 

292 """Get user conversation history""" 

293 if not self.gdpr_storage: 

294 return [] 

295 

296 try: 

297 conversations = await self.gdpr_storage.conversations.list_user_conversations(user_id) 

298 return [conv.model_dump() for conv in conversations] 

299 except Exception as e: 

300 logger.error(f"Failed to retrieve user conversations: {e}", exc_info=True) 

301 return [] 

302 

303 async def _get_user_preferences(self, user_id: str) -> dict[str, Any]: 

304 """Get user preferences""" 

305 if not self.gdpr_storage: 

306 return {} 

307 

308 try: 

309 preferences = await self.gdpr_storage.preferences.get(user_id) 

310 if preferences: 310 ↛ 311line 310 didn't jump to line 311 because the condition on line 310 was never true

311 return preferences.preferences 

312 return {} 

313 except Exception as e: 

314 logger.error(f"Failed to retrieve user preferences: {e}", exc_info=True) 

315 return {} 

316 

317 async def _get_user_audit_log(self, user_id: str) -> list[dict[str, Any]]: 

318 """Get user audit log entries""" 

319 if not self.gdpr_storage: 

320 return [] 

321 

322 try: 

323 logs = await self.gdpr_storage.audit_logs.list_user_logs(user_id, limit=1000) 

324 return [log.model_dump() for log in logs] 

325 except Exception as e: 

326 logger.error(f"Failed to retrieve user audit logs: {e}", exc_info=True) 

327 return [] 

328 

329 async def _get_user_consents(self, user_id: str) -> list[dict[str, Any]]: 

330 """Get user consent records""" 

331 if not self.gdpr_storage: 

332 return [] 

333 

334 try: 

335 consents = await self.gdpr_storage.consents.get_user_consents(user_id) 

336 return [consent.model_dump() for consent in consents] 

337 except Exception as e: 

338 logger.error(f"Failed to retrieve user consents: {e}", exc_info=True) 

339 return []