Coverage for src / mcp_server_langgraph / compliance / gdpr / storage.py: 90%

221 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2Storage Backend Interfaces for Compliance Data 

3 

4Provides abstract interfaces for storing compliance-related data: 

5- User profiles 

6- Conversations 

7- Preferences 

8- Audit logs 

9- Consent records 

10 

11Implementations can be backed by: 

12- PostgreSQL 

13- MongoDB 

14- Redis 

15- File system 

16- In-memory (for testing) 

17""" 

18 

19from abc import ABC, abstractmethod 

20from datetime import datetime, UTC 

21from typing import Any 

22 

23from pydantic import BaseModel, ConfigDict, Field 

24import contextlib 

25 

26# ============================================================================ 

27# Data Models 

28# ============================================================================ 

29 

30 

31class UserProfile(BaseModel): 

32 """User profile data model""" 

33 

34 user_id: str = Field(..., description="Unique user identifier") 

35 username: str = Field(..., description="Username") 

36 email: str = Field(..., description="Email address") 

37 full_name: str | None = Field(None, description="Full name") 

38 created_at: str = Field(..., description="Account creation timestamp (ISO format)") 

39 last_updated: str = Field(..., description="Last update timestamp (ISO format)") 

40 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional profile data") 

41 

42 model_config = ConfigDict( 

43 json_schema_extra={ 

44 "example": { 

45 "user_id": "user:alice", 

46 "username": "alice", 

47 "email": "alice@acme.com", 

48 "full_name": "Alice Smith", 

49 "created_at": "2025-01-01T00:00:00Z", 

50 "last_updated": "2025-01-01T00:00:00Z", 

51 "metadata": {"department": "Engineering"}, 

52 } 

53 } 

54 ) 

55 

56 

57class Conversation(BaseModel): 

58 """Conversation data model""" 

59 

60 conversation_id: str = Field(..., description="Unique conversation identifier") 

61 user_id: str = Field(..., description="User who owns this conversation") 

62 title: str | None = Field(None, description="Conversation title") 

63 messages: list[dict[str, Any]] = Field(default_factory=list, description="List of messages") 

64 created_at: str = Field(..., description="Creation timestamp (ISO format)") 

65 last_message_at: str = Field(..., description="Last message timestamp (ISO format)") 

66 archived: bool = Field(default=False, description="Whether conversation is archived") 

67 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata") 

68 

69 model_config = ConfigDict( 

70 json_schema_extra={ 

71 "example": { 

72 "conversation_id": "conv_123", 

73 "user_id": "user:alice", 

74 "title": "Project Discussion", 

75 "messages": [{"role": "user", "content": "Hello"}], 

76 "created_at": "2025-01-01T00:00:00Z", 

77 "last_message_at": "2025-01-01T00:05:00Z", 

78 "archived": False, 

79 "metadata": {}, 

80 } 

81 } 

82 ) 

83 

84 

85class UserPreferences(BaseModel): 

86 """User preferences data model""" 

87 

88 user_id: str = Field(..., description="User identifier") 

89 preferences: dict[str, Any] = Field(default_factory=dict, description="User preferences") 

90 updated_at: str = Field(..., description="Last update timestamp (ISO format)") 

91 

92 model_config = ConfigDict( 

93 json_schema_extra={ 

94 "example": { 

95 "user_id": "user:alice", 

96 "preferences": {"theme": "dark", "language": "en", "notifications": {"email": True, "sms": False}}, 

97 "updated_at": "2025-01-01T00:00:00Z", 

98 } 

99 } 

100 ) 

101 

102 

103class AuditLogEntry(BaseModel): 

104 """Audit log entry data model""" 

105 

106 log_id: str = Field(..., description="Unique log entry identifier") 

107 user_id: str = Field(..., description="User who performed the action") 

108 action: str = Field(..., description="Action performed") 

109 resource_type: str = Field(..., description="Type of resource affected") 

110 resource_id: str | None = Field(None, description="Identifier of resource affected") 

111 timestamp: str = Field(..., description="Action timestamp (ISO format)") 

112 ip_address: str | None = Field(None, description="IP address of request") 

113 user_agent: str | None = Field(None, description="User agent string") 

114 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional context") 

115 

116 model_config = ConfigDict( 

117 json_schema_extra={ 

118 "example": { 

119 "log_id": "log_123", 

120 "user_id": "user:alice", 

121 "action": "profile.update", 

122 "resource_type": "user_profile", 

123 "resource_id": "user:alice", 

124 "timestamp": "2025-01-01T00:00:00Z", 

125 "ip_address": "192.168.1.1", 

126 "user_agent": "Mozilla/5.0...", 

127 "metadata": {"fields_updated": ["email"]}, 

128 } 

129 } 

130 ) 

131 

132 

133class ConsentRecord(BaseModel): 

134 """Consent record data model""" 

135 

136 consent_id: str = Field(..., description="Unique consent record identifier") 

137 user_id: str = Field(..., description="User identifier") 

138 consent_type: str = Field(..., description="Type of consent (analytics, marketing, etc.)") 

139 granted: bool = Field(..., description="Whether consent is granted") 

140 timestamp: str = Field(..., description="Consent timestamp (ISO format)") 

141 ip_address: str | None = Field(None, description="IP address when consent was given") 

142 user_agent: str | None = Field(None, description="User agent when consent was given") 

143 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional context") 

144 

145 model_config = ConfigDict( 

146 json_schema_extra={ 

147 "example": { 

148 "consent_id": "consent_123", 

149 "user_id": "user:alice", 

150 "consent_type": "analytics", 

151 "granted": True, 

152 "timestamp": "2025-01-01T00:00:00Z", 

153 "ip_address": "192.168.1.1", 

154 "user_agent": "Mozilla/5.0...", 

155 "metadata": {}, 

156 } 

157 } 

158 ) 

159 

160 

161# ============================================================================ 

162# Storage Interfaces 

163# ============================================================================ 

164 

165 

166class UserProfileStore(ABC): 

167 """Abstract interface for user profile storage""" 

168 

169 @abstractmethod 

170 async def create(self, profile: UserProfile) -> bool: 

171 """Create a new user profile""" 

172 

173 @abstractmethod 

174 async def get(self, user_id: str) -> UserProfile | None: 

175 """Get user profile by ID""" 

176 

177 @abstractmethod 

178 async def update(self, user_id: str, updates: dict[str, Any]) -> bool: 

179 """Update user profile""" 

180 

181 @abstractmethod 

182 async def delete(self, user_id: str) -> bool: 

183 """Delete user profile""" 

184 

185 

186class ConversationStore(ABC): 

187 """Abstract interface for conversation storage""" 

188 

189 @abstractmethod 

190 async def create(self, conversation: Conversation) -> str: 

191 """Create a new conversation and return its ID""" 

192 

193 @abstractmethod 

194 async def get(self, conversation_id: str) -> Conversation | None: 

195 """Get conversation by ID""" 

196 

197 @abstractmethod 

198 async def list_user_conversations(self, user_id: str, archived: bool | None = None) -> list[Conversation]: 

199 """List all conversations for a user""" 

200 

201 @abstractmethod 

202 async def update(self, conversation_id: str, updates: dict[str, Any]) -> bool: 

203 """Update conversation""" 

204 

205 @abstractmethod 

206 async def delete(self, conversation_id: str) -> bool: 

207 """Delete conversation""" 

208 

209 @abstractmethod 

210 async def delete_user_conversations(self, user_id: str) -> int: 

211 """Delete all conversations for a user""" 

212 

213 

214class PreferencesStore(ABC): 

215 """Abstract interface for user preferences storage""" 

216 

217 @abstractmethod 

218 async def get(self, user_id: str) -> UserPreferences | None: 

219 """Get user preferences""" 

220 

221 @abstractmethod 

222 async def set(self, user_id: str, preferences: dict[str, Any]) -> bool: 

223 """Set user preferences""" 

224 

225 @abstractmethod 

226 async def update(self, user_id: str, updates: dict[str, Any]) -> bool: 

227 """Update specific preferences""" 

228 

229 @abstractmethod 

230 async def delete(self, user_id: str) -> bool: 

231 """Delete user preferences""" 

232 

233 

234class AuditLogStore(ABC): 

235 """Abstract interface for audit log storage""" 

236 

237 @abstractmethod 

238 async def log(self, entry: AuditLogEntry) -> str: 

239 """Log an audit entry and return its ID""" 

240 

241 @abstractmethod 

242 async def get(self, log_id: str) -> AuditLogEntry | None: 

243 """Get audit log entry by ID""" 

244 

245 @abstractmethod 

246 async def list_user_logs( 

247 self, user_id: str, start_date: datetime | None = None, end_date: datetime | None = None, limit: int = 100 

248 ) -> list[AuditLogEntry]: 

249 """List audit logs for a user""" 

250 

251 @abstractmethod 

252 async def anonymize_user_logs(self, user_id: str) -> int: 

253 """Anonymize audit logs for a user (GDPR compliance)""" 

254 

255 

256class ConsentStore(ABC): 

257 """Abstract interface for consent record storage""" 

258 

259 @abstractmethod 

260 async def create(self, record: ConsentRecord) -> str: 

261 """Create a consent record and return its ID""" 

262 

263 @abstractmethod 

264 async def get_user_consents(self, user_id: str) -> list[ConsentRecord]: 

265 """Get all consent records for a user""" 

266 

267 @abstractmethod 

268 async def get_latest_consent(self, user_id: str, consent_type: str) -> ConsentRecord | None: 

269 """Get the latest consent record for a specific type""" 

270 

271 @abstractmethod 

272 async def delete_user_consents(self, user_id: str) -> int: 

273 """Delete all consent records for a user""" 

274 

275 

276# ============================================================================ 

277# In-Memory Implementations (for testing/development) 

278# ============================================================================ 

279 

280 

281class InMemoryUserProfileStore(UserProfileStore): 

282 """In-memory implementation of user profile storage""" 

283 

284 def __init__(self) -> None: 

285 self.profiles: dict[str, UserProfile] = {} 

286 

287 async def create(self, profile: UserProfile) -> bool: 

288 if profile.user_id in self.profiles: 

289 return False 

290 self.profiles[profile.user_id] = profile 

291 return True 

292 

293 async def get(self, user_id: str) -> UserProfile | None: 

294 return self.profiles.get(user_id) 

295 

296 async def update(self, user_id: str, updates: dict[str, Any]) -> bool: 

297 if user_id not in self.profiles: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true

298 return False 

299 

300 profile = self.profiles[user_id] 

301 for key, value in updates.items(): 

302 if hasattr(profile, key): 302 ↛ 301line 302 didn't jump to line 301 because the condition on line 302 was always true

303 setattr(profile, key, value) 

304 

305 profile.last_updated = datetime.now(UTC).isoformat().replace("+00:00", "Z") 

306 return True 

307 

308 async def delete(self, user_id: str) -> bool: 

309 if user_id in self.profiles: 

310 del self.profiles[user_id] 

311 return True 

312 return False 

313 

314 

315class InMemoryConversationStore(ConversationStore): 

316 """In-memory implementation of conversation storage""" 

317 

318 def __init__(self) -> None: 

319 self.conversations: dict[str, Conversation] = {} 

320 self.user_conversations: dict[str, list[str]] = {} 

321 

322 async def create(self, conversation: Conversation) -> str: 

323 self.conversations[conversation.conversation_id] = conversation 

324 

325 if conversation.user_id not in self.user_conversations: 

326 self.user_conversations[conversation.user_id] = [] 

327 self.user_conversations[conversation.user_id].append(conversation.conversation_id) 

328 

329 return conversation.conversation_id 

330 

331 async def get(self, conversation_id: str) -> Conversation | None: 

332 return self.conversations.get(conversation_id) 

333 

334 async def list_user_conversations(self, user_id: str, archived: bool | None = None) -> list[Conversation]: 

335 if user_id not in self.user_conversations: 

336 return [] 

337 

338 conversations = [] 

339 for conv_id in self.user_conversations[user_id]: 

340 conv = self.conversations.get(conv_id) 

341 if conv and (archived is None or conv.archived == archived): 

342 conversations.append(conv) 

343 

344 return conversations 

345 

346 async def update(self, conversation_id: str, updates: dict[str, Any]) -> bool: 

347 if conversation_id not in self.conversations: 

348 return False 

349 

350 conversation = self.conversations[conversation_id] 

351 for key, value in updates.items(): 

352 if hasattr(conversation, key): 

353 setattr(conversation, key, value) 

354 

355 return True 

356 

357 async def delete(self, conversation_id: str) -> bool: 

358 if conversation_id in self.conversations: 358 ↛ 364line 358 didn't jump to line 364 because the condition on line 358 was always true

359 conv = self.conversations.pop(conversation_id) 

360 if conv.user_id in self.user_conversations: 360 ↛ 363line 360 didn't jump to line 363 because the condition on line 360 was always true

361 with contextlib.suppress(ValueError): 

362 self.user_conversations[conv.user_id].remove(conversation_id) 

363 return True 

364 return False 

365 

366 async def delete_user_conversations(self, user_id: str) -> int: 

367 if user_id not in self.user_conversations: 

368 return 0 

369 

370 conv_ids = self.user_conversations[user_id][:] 

371 count = 0 

372 

373 for conv_id in conv_ids: 

374 if await self.delete(conv_id): 374 ↛ 373line 374 didn't jump to line 373 because the condition on line 374 was always true

375 count += 1 

376 

377 return count 

378 

379 

380class InMemoryPreferencesStore(PreferencesStore): 

381 """In-memory implementation of preferences storage""" 

382 

383 def __init__(self) -> None: 

384 self.preferences: dict[str, UserPreferences] = {} 

385 

386 async def get(self, user_id: str) -> UserPreferences | None: 

387 return self.preferences.get(user_id) 

388 

389 async def set(self, user_id: str, preferences: dict[str, Any]) -> bool: 

390 self.preferences[user_id] = UserPreferences( 

391 user_id=user_id, preferences=preferences, updated_at=datetime.now(UTC).isoformat().replace("+00:00", "Z") 

392 ) 

393 return True 

394 

395 async def update(self, user_id: str, updates: dict[str, Any]) -> bool: 

396 if user_id not in self.preferences: 

397 # Create new preferences if they don't exist 

398 return await self.set(user_id, updates) 

399 

400 prefs = self.preferences[user_id] 

401 prefs.preferences.update(updates) 

402 prefs.updated_at = datetime.now(UTC).isoformat().replace("+00:00", "Z") 

403 return True 

404 

405 async def delete(self, user_id: str) -> bool: 

406 if user_id in self.preferences: 

407 del self.preferences[user_id] 

408 return True 

409 return False 

410 

411 

412class InMemoryAuditLogStore(AuditLogStore): 

413 """In-memory implementation of audit log storage""" 

414 

415 def __init__(self) -> None: 

416 self.logs: dict[str, AuditLogEntry] = {} 

417 self.user_logs: dict[str, list[str]] = {} 

418 

419 async def log(self, entry: AuditLogEntry) -> str: 

420 self.logs[entry.log_id] = entry 

421 

422 if entry.user_id not in self.user_logs: 

423 self.user_logs[entry.user_id] = [] 

424 self.user_logs[entry.user_id].append(entry.log_id) 

425 

426 return entry.log_id 

427 

428 async def get(self, log_id: str) -> AuditLogEntry | None: 

429 return self.logs.get(log_id) 

430 

431 async def list_user_logs( 

432 self, user_id: str, start_date: datetime | None = None, end_date: datetime | None = None, limit: int = 100 

433 ) -> list[AuditLogEntry]: 

434 if user_id not in self.user_logs: 

435 return [] 

436 

437 logs = [] 

438 for log_id in self.user_logs[user_id]: 

439 log = self.logs.get(log_id) 

440 if log: 440 ↛ 438line 440 didn't jump to line 438 because the condition on line 440 was always true

441 # Filter by date if specified 

442 log_timestamp = datetime.fromisoformat(log.timestamp.replace("Z", "")) 

443 

444 if start_date and log_timestamp < start_date: 

445 continue 

446 if end_date and log_timestamp > end_date: 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true

447 continue 

448 

449 logs.append(log) 

450 

451 if len(logs) >= limit: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true

452 break 

453 

454 return logs 

455 

456 async def anonymize_user_logs(self, user_id: str) -> int: 

457 """Replace user_id with hash in audit logs""" 

458 if user_id not in self.user_logs: 

459 return 0 

460 

461 count = 0 

462 anonymized_id = f"anonymized_{hash(user_id)}" 

463 

464 for log_id in self.user_logs[user_id]: 

465 if log_id in self.logs: 465 ↛ 464line 465 didn't jump to line 464 because the condition on line 465 was always true

466 self.logs[log_id].user_id = anonymized_id 

467 count += 1 

468 

469 # Move to anonymized tracking 

470 self.user_logs[anonymized_id] = self.user_logs.pop(user_id) 

471 

472 return count 

473 

474 

475class InMemoryConsentStore(ConsentStore): 

476 """In-memory implementation of consent storage""" 

477 

478 def __init__(self) -> None: 

479 self.consents: dict[str, ConsentRecord] = {} 

480 self.user_consents: dict[str, list[str]] = {} 

481 

482 async def create(self, record: ConsentRecord) -> str: 

483 self.consents[record.consent_id] = record 

484 

485 if record.user_id not in self.user_consents: 

486 self.user_consents[record.user_id] = [] 

487 self.user_consents[record.user_id].append(record.consent_id) 

488 

489 return record.consent_id 

490 

491 async def get_user_consents(self, user_id: str) -> list[ConsentRecord]: 

492 if user_id not in self.user_consents: 

493 return [] 

494 

495 consents = [] 

496 for consent_id in self.user_consents[user_id]: 

497 consent = self.consents.get(consent_id) 

498 if consent: 498 ↛ 496line 498 didn't jump to line 496 because the condition on line 498 was always true

499 consents.append(consent) 

500 

501 return consents 

502 

503 async def get_latest_consent(self, user_id: str, consent_type: str) -> ConsentRecord | None: 

504 consents = await self.get_user_consents(user_id) 

505 

506 # Filter by type 

507 type_consents = [c for c in consents if c.consent_type == consent_type] 

508 

509 if not type_consents: 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true

510 return None 

511 

512 # Sort by timestamp descending and return latest 

513 type_consents.sort(key=lambda c: c.timestamp, reverse=True) 

514 return type_consents[0] 

515 

516 async def delete_user_consents(self, user_id: str) -> int: 

517 if user_id not in self.user_consents: 

518 return 0 

519 

520 consent_ids = self.user_consents[user_id][:] 

521 count = 0 

522 

523 for consent_id in consent_ids: 

524 if consent_id in self.consents: 524 ↛ 523line 524 didn't jump to line 523 because the condition on line 524 was always true

525 del self.consents[consent_id] 

526 count += 1 

527 

528 del self.user_consents[user_id] 

529 return count