Coverage for src / mcp_server_langgraph / compliance / gdpr / storage.py: 90%
221 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2Storage Backend Interfaces for Compliance Data
4Provides abstract interfaces for storing compliance-related data:
5- User profiles
6- Conversations
7- Preferences
8- Audit logs
9- Consent records
11Implementations can be backed by:
12- PostgreSQL
13- MongoDB
14- Redis
15- File system
16- In-memory (for testing)
17"""
19from abc import ABC, abstractmethod
20from datetime import datetime, UTC
21from typing import Any
23from pydantic import BaseModel, ConfigDict, Field
24import contextlib
26# ============================================================================
27# Data Models
28# ============================================================================
31class UserProfile(BaseModel):
32 """User profile data model"""
34 user_id: str = Field(..., description="Unique user identifier")
35 username: str = Field(..., description="Username")
36 email: str = Field(..., description="Email address")
37 full_name: str | None = Field(None, description="Full name")
38 created_at: str = Field(..., description="Account creation timestamp (ISO format)")
39 last_updated: str = Field(..., description="Last update timestamp (ISO format)")
40 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional profile data")
42 model_config = ConfigDict(
43 json_schema_extra={
44 "example": {
45 "user_id": "user:alice",
46 "username": "alice",
47 "email": "alice@acme.com",
48 "full_name": "Alice Smith",
49 "created_at": "2025-01-01T00:00:00Z",
50 "last_updated": "2025-01-01T00:00:00Z",
51 "metadata": {"department": "Engineering"},
52 }
53 }
54 )
57class Conversation(BaseModel):
58 """Conversation data model"""
60 conversation_id: str = Field(..., description="Unique conversation identifier")
61 user_id: str = Field(..., description="User who owns this conversation")
62 title: str | None = Field(None, description="Conversation title")
63 messages: list[dict[str, Any]] = Field(default_factory=list, description="List of messages")
64 created_at: str = Field(..., description="Creation timestamp (ISO format)")
65 last_message_at: str = Field(..., description="Last message timestamp (ISO format)")
66 archived: bool = Field(default=False, description="Whether conversation is archived")
67 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional metadata")
69 model_config = ConfigDict(
70 json_schema_extra={
71 "example": {
72 "conversation_id": "conv_123",
73 "user_id": "user:alice",
74 "title": "Project Discussion",
75 "messages": [{"role": "user", "content": "Hello"}],
76 "created_at": "2025-01-01T00:00:00Z",
77 "last_message_at": "2025-01-01T00:05:00Z",
78 "archived": False,
79 "metadata": {},
80 }
81 }
82 )
85class UserPreferences(BaseModel):
86 """User preferences data model"""
88 user_id: str = Field(..., description="User identifier")
89 preferences: dict[str, Any] = Field(default_factory=dict, description="User preferences")
90 updated_at: str = Field(..., description="Last update timestamp (ISO format)")
92 model_config = ConfigDict(
93 json_schema_extra={
94 "example": {
95 "user_id": "user:alice",
96 "preferences": {"theme": "dark", "language": "en", "notifications": {"email": True, "sms": False}},
97 "updated_at": "2025-01-01T00:00:00Z",
98 }
99 }
100 )
103class AuditLogEntry(BaseModel):
104 """Audit log entry data model"""
106 log_id: str = Field(..., description="Unique log entry identifier")
107 user_id: str = Field(..., description="User who performed the action")
108 action: str = Field(..., description="Action performed")
109 resource_type: str = Field(..., description="Type of resource affected")
110 resource_id: str | None = Field(None, description="Identifier of resource affected")
111 timestamp: str = Field(..., description="Action timestamp (ISO format)")
112 ip_address: str | None = Field(None, description="IP address of request")
113 user_agent: str | None = Field(None, description="User agent string")
114 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional context")
116 model_config = ConfigDict(
117 json_schema_extra={
118 "example": {
119 "log_id": "log_123",
120 "user_id": "user:alice",
121 "action": "profile.update",
122 "resource_type": "user_profile",
123 "resource_id": "user:alice",
124 "timestamp": "2025-01-01T00:00:00Z",
125 "ip_address": "192.168.1.1",
126 "user_agent": "Mozilla/5.0...",
127 "metadata": {"fields_updated": ["email"]},
128 }
129 }
130 )
133class ConsentRecord(BaseModel):
134 """Consent record data model"""
136 consent_id: str = Field(..., description="Unique consent record identifier")
137 user_id: str = Field(..., description="User identifier")
138 consent_type: str = Field(..., description="Type of consent (analytics, marketing, etc.)")
139 granted: bool = Field(..., description="Whether consent is granted")
140 timestamp: str = Field(..., description="Consent timestamp (ISO format)")
141 ip_address: str | None = Field(None, description="IP address when consent was given")
142 user_agent: str | None = Field(None, description="User agent when consent was given")
143 metadata: dict[str, Any] = Field(default_factory=dict, description="Additional context")
145 model_config = ConfigDict(
146 json_schema_extra={
147 "example": {
148 "consent_id": "consent_123",
149 "user_id": "user:alice",
150 "consent_type": "analytics",
151 "granted": True,
152 "timestamp": "2025-01-01T00:00:00Z",
153 "ip_address": "192.168.1.1",
154 "user_agent": "Mozilla/5.0...",
155 "metadata": {},
156 }
157 }
158 )
161# ============================================================================
162# Storage Interfaces
163# ============================================================================
166class UserProfileStore(ABC):
167 """Abstract interface for user profile storage"""
169 @abstractmethod
170 async def create(self, profile: UserProfile) -> bool:
171 """Create a new user profile"""
173 @abstractmethod
174 async def get(self, user_id: str) -> UserProfile | None:
175 """Get user profile by ID"""
177 @abstractmethod
178 async def update(self, user_id: str, updates: dict[str, Any]) -> bool:
179 """Update user profile"""
181 @abstractmethod
182 async def delete(self, user_id: str) -> bool:
183 """Delete user profile"""
186class ConversationStore(ABC):
187 """Abstract interface for conversation storage"""
189 @abstractmethod
190 async def create(self, conversation: Conversation) -> str:
191 """Create a new conversation and return its ID"""
193 @abstractmethod
194 async def get(self, conversation_id: str) -> Conversation | None:
195 """Get conversation by ID"""
197 @abstractmethod
198 async def list_user_conversations(self, user_id: str, archived: bool | None = None) -> list[Conversation]:
199 """List all conversations for a user"""
201 @abstractmethod
202 async def update(self, conversation_id: str, updates: dict[str, Any]) -> bool:
203 """Update conversation"""
205 @abstractmethod
206 async def delete(self, conversation_id: str) -> bool:
207 """Delete conversation"""
209 @abstractmethod
210 async def delete_user_conversations(self, user_id: str) -> int:
211 """Delete all conversations for a user"""
214class PreferencesStore(ABC):
215 """Abstract interface for user preferences storage"""
217 @abstractmethod
218 async def get(self, user_id: str) -> UserPreferences | None:
219 """Get user preferences"""
221 @abstractmethod
222 async def set(self, user_id: str, preferences: dict[str, Any]) -> bool:
223 """Set user preferences"""
225 @abstractmethod
226 async def update(self, user_id: str, updates: dict[str, Any]) -> bool:
227 """Update specific preferences"""
229 @abstractmethod
230 async def delete(self, user_id: str) -> bool:
231 """Delete user preferences"""
234class AuditLogStore(ABC):
235 """Abstract interface for audit log storage"""
237 @abstractmethod
238 async def log(self, entry: AuditLogEntry) -> str:
239 """Log an audit entry and return its ID"""
241 @abstractmethod
242 async def get(self, log_id: str) -> AuditLogEntry | None:
243 """Get audit log entry by ID"""
245 @abstractmethod
246 async def list_user_logs(
247 self, user_id: str, start_date: datetime | None = None, end_date: datetime | None = None, limit: int = 100
248 ) -> list[AuditLogEntry]:
249 """List audit logs for a user"""
251 @abstractmethod
252 async def anonymize_user_logs(self, user_id: str) -> int:
253 """Anonymize audit logs for a user (GDPR compliance)"""
256class ConsentStore(ABC):
257 """Abstract interface for consent record storage"""
259 @abstractmethod
260 async def create(self, record: ConsentRecord) -> str:
261 """Create a consent record and return its ID"""
263 @abstractmethod
264 async def get_user_consents(self, user_id: str) -> list[ConsentRecord]:
265 """Get all consent records for a user"""
267 @abstractmethod
268 async def get_latest_consent(self, user_id: str, consent_type: str) -> ConsentRecord | None:
269 """Get the latest consent record for a specific type"""
271 @abstractmethod
272 async def delete_user_consents(self, user_id: str) -> int:
273 """Delete all consent records for a user"""
276# ============================================================================
277# In-Memory Implementations (for testing/development)
278# ============================================================================
281class InMemoryUserProfileStore(UserProfileStore):
282 """In-memory implementation of user profile storage"""
284 def __init__(self) -> None:
285 self.profiles: dict[str, UserProfile] = {}
287 async def create(self, profile: UserProfile) -> bool:
288 if profile.user_id in self.profiles:
289 return False
290 self.profiles[profile.user_id] = profile
291 return True
293 async def get(self, user_id: str) -> UserProfile | None:
294 return self.profiles.get(user_id)
296 async def update(self, user_id: str, updates: dict[str, Any]) -> bool:
297 if user_id not in self.profiles: 297 ↛ 298line 297 didn't jump to line 298 because the condition on line 297 was never true
298 return False
300 profile = self.profiles[user_id]
301 for key, value in updates.items():
302 if hasattr(profile, key): 302 ↛ 301line 302 didn't jump to line 301 because the condition on line 302 was always true
303 setattr(profile, key, value)
305 profile.last_updated = datetime.now(UTC).isoformat().replace("+00:00", "Z")
306 return True
308 async def delete(self, user_id: str) -> bool:
309 if user_id in self.profiles:
310 del self.profiles[user_id]
311 return True
312 return False
315class InMemoryConversationStore(ConversationStore):
316 """In-memory implementation of conversation storage"""
318 def __init__(self) -> None:
319 self.conversations: dict[str, Conversation] = {}
320 self.user_conversations: dict[str, list[str]] = {}
322 async def create(self, conversation: Conversation) -> str:
323 self.conversations[conversation.conversation_id] = conversation
325 if conversation.user_id not in self.user_conversations:
326 self.user_conversations[conversation.user_id] = []
327 self.user_conversations[conversation.user_id].append(conversation.conversation_id)
329 return conversation.conversation_id
331 async def get(self, conversation_id: str) -> Conversation | None:
332 return self.conversations.get(conversation_id)
334 async def list_user_conversations(self, user_id: str, archived: bool | None = None) -> list[Conversation]:
335 if user_id not in self.user_conversations:
336 return []
338 conversations = []
339 for conv_id in self.user_conversations[user_id]:
340 conv = self.conversations.get(conv_id)
341 if conv and (archived is None or conv.archived == archived):
342 conversations.append(conv)
344 return conversations
346 async def update(self, conversation_id: str, updates: dict[str, Any]) -> bool:
347 if conversation_id not in self.conversations:
348 return False
350 conversation = self.conversations[conversation_id]
351 for key, value in updates.items():
352 if hasattr(conversation, key):
353 setattr(conversation, key, value)
355 return True
357 async def delete(self, conversation_id: str) -> bool:
358 if conversation_id in self.conversations: 358 ↛ 364line 358 didn't jump to line 364 because the condition on line 358 was always true
359 conv = self.conversations.pop(conversation_id)
360 if conv.user_id in self.user_conversations: 360 ↛ 363line 360 didn't jump to line 363 because the condition on line 360 was always true
361 with contextlib.suppress(ValueError):
362 self.user_conversations[conv.user_id].remove(conversation_id)
363 return True
364 return False
366 async def delete_user_conversations(self, user_id: str) -> int:
367 if user_id not in self.user_conversations:
368 return 0
370 conv_ids = self.user_conversations[user_id][:]
371 count = 0
373 for conv_id in conv_ids:
374 if await self.delete(conv_id): 374 ↛ 373line 374 didn't jump to line 373 because the condition on line 374 was always true
375 count += 1
377 return count
380class InMemoryPreferencesStore(PreferencesStore):
381 """In-memory implementation of preferences storage"""
383 def __init__(self) -> None:
384 self.preferences: dict[str, UserPreferences] = {}
386 async def get(self, user_id: str) -> UserPreferences | None:
387 return self.preferences.get(user_id)
389 async def set(self, user_id: str, preferences: dict[str, Any]) -> bool:
390 self.preferences[user_id] = UserPreferences(
391 user_id=user_id, preferences=preferences, updated_at=datetime.now(UTC).isoformat().replace("+00:00", "Z")
392 )
393 return True
395 async def update(self, user_id: str, updates: dict[str, Any]) -> bool:
396 if user_id not in self.preferences:
397 # Create new preferences if they don't exist
398 return await self.set(user_id, updates)
400 prefs = self.preferences[user_id]
401 prefs.preferences.update(updates)
402 prefs.updated_at = datetime.now(UTC).isoformat().replace("+00:00", "Z")
403 return True
405 async def delete(self, user_id: str) -> bool:
406 if user_id in self.preferences:
407 del self.preferences[user_id]
408 return True
409 return False
412class InMemoryAuditLogStore(AuditLogStore):
413 """In-memory implementation of audit log storage"""
415 def __init__(self) -> None:
416 self.logs: dict[str, AuditLogEntry] = {}
417 self.user_logs: dict[str, list[str]] = {}
419 async def log(self, entry: AuditLogEntry) -> str:
420 self.logs[entry.log_id] = entry
422 if entry.user_id not in self.user_logs:
423 self.user_logs[entry.user_id] = []
424 self.user_logs[entry.user_id].append(entry.log_id)
426 return entry.log_id
428 async def get(self, log_id: str) -> AuditLogEntry | None:
429 return self.logs.get(log_id)
431 async def list_user_logs(
432 self, user_id: str, start_date: datetime | None = None, end_date: datetime | None = None, limit: int = 100
433 ) -> list[AuditLogEntry]:
434 if user_id not in self.user_logs:
435 return []
437 logs = []
438 for log_id in self.user_logs[user_id]:
439 log = self.logs.get(log_id)
440 if log: 440 ↛ 438line 440 didn't jump to line 438 because the condition on line 440 was always true
441 # Filter by date if specified
442 log_timestamp = datetime.fromisoformat(log.timestamp.replace("Z", ""))
444 if start_date and log_timestamp < start_date:
445 continue
446 if end_date and log_timestamp > end_date: 446 ↛ 447line 446 didn't jump to line 447 because the condition on line 446 was never true
447 continue
449 logs.append(log)
451 if len(logs) >= limit: 451 ↛ 452line 451 didn't jump to line 452 because the condition on line 451 was never true
452 break
454 return logs
456 async def anonymize_user_logs(self, user_id: str) -> int:
457 """Replace user_id with hash in audit logs"""
458 if user_id not in self.user_logs:
459 return 0
461 count = 0
462 anonymized_id = f"anonymized_{hash(user_id)}"
464 for log_id in self.user_logs[user_id]:
465 if log_id in self.logs: 465 ↛ 464line 465 didn't jump to line 464 because the condition on line 465 was always true
466 self.logs[log_id].user_id = anonymized_id
467 count += 1
469 # Move to anonymized tracking
470 self.user_logs[anonymized_id] = self.user_logs.pop(user_id)
472 return count
475class InMemoryConsentStore(ConsentStore):
476 """In-memory implementation of consent storage"""
478 def __init__(self) -> None:
479 self.consents: dict[str, ConsentRecord] = {}
480 self.user_consents: dict[str, list[str]] = {}
482 async def create(self, record: ConsentRecord) -> str:
483 self.consents[record.consent_id] = record
485 if record.user_id not in self.user_consents:
486 self.user_consents[record.user_id] = []
487 self.user_consents[record.user_id].append(record.consent_id)
489 return record.consent_id
491 async def get_user_consents(self, user_id: str) -> list[ConsentRecord]:
492 if user_id not in self.user_consents:
493 return []
495 consents = []
496 for consent_id in self.user_consents[user_id]:
497 consent = self.consents.get(consent_id)
498 if consent: 498 ↛ 496line 498 didn't jump to line 496 because the condition on line 498 was always true
499 consents.append(consent)
501 return consents
503 async def get_latest_consent(self, user_id: str, consent_type: str) -> ConsentRecord | None:
504 consents = await self.get_user_consents(user_id)
506 # Filter by type
507 type_consents = [c for c in consents if c.consent_type == consent_type]
509 if not type_consents: 509 ↛ 510line 509 didn't jump to line 510 because the condition on line 509 was never true
510 return None
512 # Sort by timestamp descending and return latest
513 type_consents.sort(key=lambda c: c.timestamp, reverse=True)
514 return type_consents[0]
516 async def delete_user_consents(self, user_id: str) -> int:
517 if user_id not in self.user_consents:
518 return 0
520 consent_ids = self.user_consents[user_id][:]
521 count = 0
523 for consent_id in consent_ids:
524 if consent_id in self.consents: 524 ↛ 523line 524 didn't jump to line 523 because the condition on line 524 was always true
525 del self.consents[consent_id]
526 count += 1
528 del self.user_consents[user_id]
529 return count