Coverage for src / mcp_server_langgraph / api / gdpr.py: 83%
141 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2GDPR Compliance API Endpoints
4Implements data subject rights under GDPR:
5- Article 15: Right to Access
6- Article 16: Right to Rectification
7- Article 17: Right to Erasure
8- Article 20: Right to Data Portability
9- Article 21: Right to Object (Consent Management)
10"""
12from datetime import datetime, UTC
13from enum import Enum
14from typing import Any
16from fastapi import APIRouter, Depends, HTTPException, Query, Request, Response, status
17from pydantic import BaseModel, ConfigDict, Field
19from mcp_server_langgraph.auth.session import SessionStore, get_session_store
20from mcp_server_langgraph.compliance.gdpr.data_deletion import DataDeletionService
21from mcp_server_langgraph.compliance.gdpr.data_export import DataExportService, UserDataExport
22from mcp_server_langgraph.compliance.gdpr.factory import GDPRStorage, get_gdpr_storage_dependency
23from mcp_server_langgraph.compliance.gdpr.storage import ConsentRecord as GDPRConsentRecord
24from mcp_server_langgraph.core.security import sanitize_header_value
25from mcp_server_langgraph.observability.telemetry import logger, tracer
27router = APIRouter(prefix="/api/v1/users", tags=["GDPR Compliance"])
30# ==================== Models ====================
33class UserProfileUpdate(BaseModel):
34 """User profile update model (GDPR Article 16 - Right to Rectification)"""
36 name: str | None = Field(None, min_length=1, max_length=100, description="User's full name")
37 email: str | None = Field(None, description="User's email address")
38 preferences: dict[str, Any] | None = Field(None, description="User preferences")
40 model_config = ConfigDict(
41 json_schema_extra={
42 "example": {
43 "name": "Alice Smith",
44 "email": "alice.smith@acme.com",
45 "preferences": {"theme": "dark", "language": "en"},
46 }
47 }
48 )
51class ConsentType(str, Enum):
52 """Types of consent that can be granted or revoked"""
54 ANALYTICS = "analytics"
55 MARKETING = "marketing"
56 THIRD_PARTY = "third_party"
57 PROFILING = "profiling"
60class ConsentRecord(BaseModel):
61 """Consent record for GDPR Article 21"""
63 consent_type: ConsentType = Field(..., description="Type of consent")
64 granted: bool = Field(..., description="Whether consent is granted")
65 timestamp: str | None = Field(None, description="ISO timestamp (auto-generated)")
66 ip_address: str | None = Field(None, description="IP address (auto-captured)")
67 user_agent: str | None = Field(None, description="User agent (auto-captured)")
69 model_config = ConfigDict(
70 json_schema_extra={
71 "example": {
72 "consent_type": "analytics",
73 "granted": True,
74 }
75 }
76 )
79class ConsentResponse(BaseModel):
80 """Response for consent operations"""
82 user_id: str
83 consents: dict[str, dict[str, Any]] = Field(description="Current consent status for all types")
85 model_config = ConfigDict(
86 json_schema_extra={
87 "example": {
88 "user_id": "user:alice",
89 "consents": {
90 "analytics": {
91 "granted": True,
92 "timestamp": "2025-01-01T12:00:00Z",
93 "ip_address": "192.168.1.1",
94 },
95 "marketing": {"granted": False, "timestamp": "2025-01-01T12:00:00Z"},
96 },
97 }
98 }
99 )
102# PRODUCTION READINESS CHECK
103# Storage is now managed by factory (initialized on app startup)
104# Production guard enforced by factory configuration
106# ==================== Endpoints ====================
109@router.get("/me/data")
110async def get_user_data(
111 request: Request,
112 session_store: SessionStore = Depends(get_session_store),
113 gdpr_storage: GDPRStorage = Depends(get_gdpr_storage_dependency),
114) -> UserDataExport:
115 """
116 Export all user data (GDPR Article 15 - Right to Access)
118 Returns all personal data associated with the authenticated user.
120 **GDPR Article 15**: The data subject shall have the right to obtain from the
121 controller confirmation as to whether or not personal data concerning him or
122 her are being processed, and access to the personal data.
124 **Response**: Complete JSON export of all user data including:
125 - User profile
126 - Sessions
127 - Conversations
128 - Preferences
129 - Audit log
130 - Consents
131 """
132 with tracer.start_as_current_span("gdpr.get_user_data"):
133 # Get authenticated user from request state (set by AuthRequestMiddleware)
134 user = getattr(request.state, "user", None)
135 if not user:
136 raise HTTPException(
137 status_code=status.HTTP_401_UNAUTHORIZED,
138 detail="Authentication required",
139 headers={"WWW-Authenticate": "Bearer"},
140 )
142 user_id = user.get("user_id")
143 username = user.get("username")
145 if not user_id or not username: 145 ↛ 146line 145 didn't jump to line 146 because the condition on line 145 was never true
146 raise HTTPException(
147 status_code=status.HTTP_400_BAD_REQUEST,
148 detail="Missing user_id or username in token",
149 )
151 email = user.get("email", f"{username}@example.com")
153 # Create export service with storage backend
154 export_service = DataExportService(session_store=session_store, gdpr_storage=gdpr_storage)
156 # Export all user data
157 export = await export_service.export_user_data(user_id=user_id, username=username, email=email)
159 # Log the data access request (GDPR requirement)
160 logger.info(
161 "User data access request",
162 extra={
163 "user_id": user_id,
164 "export_id": export.export_id,
165 "gdpr_article": "15",
166 },
167 )
169 return export
172@router.get("/me/export", response_model=None)
173async def export_user_data(
174 request: Request,
175 format: str = Query("json", pattern="^(json|csv)$", description="Export format: json or csv"),
176 session_store: SessionStore = Depends(get_session_store),
177 gdpr_storage: GDPRStorage = Depends(get_gdpr_storage_dependency),
178) -> Response:
179 """
180 Export user data in portable format (GDPR Article 20 - Right to Data Portability)
182 **GDPR Article 20**: The data subject shall have the right to receive the personal
183 data concerning him or her in a structured, commonly used and machine-readable format.
185 **Query Parameters**:
186 - `format`: Export format (json or csv)
188 **Response**: File download in requested format
189 """
190 with tracer.start_as_current_span("gdpr.export_user_data"):
191 # Get authenticated user from request state (set by AuthRequestMiddleware)
192 user = getattr(request.state, "user", None)
193 if not user: 193 ↛ 194line 193 didn't jump to line 194 because the condition on line 193 was never true
194 raise HTTPException(
195 status_code=status.HTTP_401_UNAUTHORIZED,
196 detail="Authentication required",
197 headers={"WWW-Authenticate": "Bearer"},
198 )
200 user_id = str(user.get("user_id") or "")
201 username = str(user.get("username") or "")
202 email = str(user.get("email", f"{username}@example.com"))
204 # Create export service with storage backend
205 export_service = DataExportService(session_store=session_store, gdpr_storage=gdpr_storage)
207 # Export data in requested format
208 data_bytes, content_type = await export_service.export_user_data_portable(
209 user_id=user_id, username=username, email=email, format=format
210 )
212 # Log the export request
213 logger.info(
214 "User data export request",
215 extra={
216 "user_id": user_id,
217 "format": format,
218 "size_bytes": len(data_bytes),
219 "gdpr_article": "20",
220 },
221 )
223 # Return as downloadable file
224 # SECURITY: Sanitize username to prevent CWE-113 (HTTP Response Splitting)
225 # Username from JWT could contain CR/LF if IdP allows special characters
226 safe_username = sanitize_header_value(username)
227 filename = f"user_data_{safe_username}_{datetime.now(UTC).strftime('%Y%m%d')}.{format}"
228 headers = {"Content-Disposition": f'attachment; filename="{filename}"'}
230 return Response(content=data_bytes, media_type=content_type, headers=headers)
233@router.patch("/me")
234async def update_user_profile(
235 request: Request,
236 profile_update: UserProfileUpdate,
237) -> dict[str, Any]:
238 """
239 Update user profile (GDPR Article 16 - Right to Rectification)
241 **GDPR Article 16**: The data subject shall have the right to obtain from the
242 controller without undue delay the rectification of inaccurate personal data
243 concerning him or her.
245 **Request Body**: Profile fields to update (only provided fields are updated)
247 **Response**: Updated user profile
248 """
249 with tracer.start_as_current_span("gdpr.update_user_profile"):
250 # Get authenticated user from request state (set by AuthRequestMiddleware)
251 user = getattr(request.state, "user", None)
252 if not user: 252 ↛ 253line 252 didn't jump to line 253 because the condition on line 252 was never true
253 raise HTTPException(
254 status_code=status.HTTP_401_UNAUTHORIZED,
255 detail="Authentication required",
256 headers={"WWW-Authenticate": "Bearer"},
257 )
259 user_id = user.get("user_id")
260 username = user.get("username")
262 # Get fields to update (exclude unset fields)
263 update_data = profile_update.model_dump(exclude_unset=True)
265 if not update_data:
266 raise HTTPException(status_code=400, detail="No fields provided for update")
268 # Log the update request
269 logger.info(
270 "User profile update request",
271 extra={
272 "user_id": user_id,
273 "fields_updated": list(update_data.keys()),
274 "gdpr_article": "16",
275 },
276 )
278 # Integrate with user profile storage
279 # Note: User profiles can be stored in:
280 # - Redis (fast, session-like data)
281 # - PostgreSQL (persistent, relational)
282 # - User provider backend (if supported)
283 # For now, we validate the update and return confirmation
284 # Production: Integrate with your user storage backend
285 updated_profile = {
286 "user_id": user_id,
287 "username": username,
288 **update_data,
289 "updated_at": datetime.now(UTC).isoformat().replace("+00:00", "Z"),
290 "storage_note": "Configure user profile storage backend for persistence",
291 }
293 logger.info("User profile updated successfully", extra={"user_id": user_id})
295 return updated_profile
298@router.delete("/me")
299async def delete_user_account(
300 request: Request,
301 confirm: bool = Query(..., description="Must be true to confirm account deletion"),
302 session_store: SessionStore = Depends(get_session_store),
303 gdpr_storage: GDPRStorage = Depends(get_gdpr_storage_dependency),
304) -> dict[str, Any]:
305 """
306 Delete user account and all data (GDPR Article 17 - Right to Erasure)
308 **WARNING**: This is an irreversible operation that permanently deletes all user data.
310 **GDPR Article 17**: The data subject shall have the right to obtain from the
311 controller the erasure of personal data concerning him or her without undue delay.
313 **Query Parameters**:
314 - `confirm`: Must be set to `true` to confirm deletion
316 **What gets deleted**:
317 - User profile and account
318 - All sessions
319 - All conversations and messages
320 - All preferences and settings
321 - All authorization tuples
323 **What gets anonymized** (retained for compliance):
324 - Audit logs (user_id replaced with hash)
326 **Response**: Deletion result with details
327 """
328 with tracer.start_as_current_span("gdpr.delete_user_account"):
329 if not confirm:
330 raise HTTPException(
331 status_code=400,
332 detail="Account deletion requires confirmation. Set confirm=true to proceed.",
333 )
335 # Get authenticated user from request state (set by AuthRequestMiddleware)
336 user = getattr(request.state, "user", None)
337 if not user: 337 ↛ 338line 337 didn't jump to line 338 because the condition on line 337 was never true
338 raise HTTPException(
339 status_code=status.HTTP_401_UNAUTHORIZED,
340 detail="Authentication required",
341 headers={"WWW-Authenticate": "Bearer"},
342 )
344 user_id = str(user.get("user_id") or "")
345 username = str(user.get("username") or "")
347 # Log deletion request (before deletion)
348 logger.warning(
349 "User account deletion requested",
350 extra={
351 "user_id": user_id,
352 "username": username,
353 "gdpr_article": "17",
354 },
355 )
357 # Create deletion service with storage backend
358 # Note: OpenFGA client should be passed from FastAPI app state for proper lifecycle
359 # For production, add OpenFGA client to app startup and inject via Depends()
360 # Example: openfga_client = Depends(get_openfga_client)
361 deletion_service = DataDeletionService(
362 session_store=session_store,
363 gdpr_storage=gdpr_storage,
364 openfga_client=None, # Configured via dependency injection in production
365 )
367 # Delete all user data
368 result = await deletion_service.delete_user_account(
369 user_id=user_id, username=username, reason="user_request_gdpr_article_17"
370 )
372 if not result.success: 372 ↛ 373line 372 didn't jump to line 373 because the condition on line 372 was never true
373 logger.error(
374 "User account deletion failed",
375 extra={"user_id": user_id, "errors": result.errors},
376 )
377 raise HTTPException(
378 status_code=500,
379 detail=f"Account deletion completed with errors: {', '.join(result.errors)}",
380 )
382 logger.warning(
383 "User account deletion completed",
384 extra={
385 "user_id": user_id,
386 "deleted_items": result.deleted_items,
387 "anonymized_items": result.anonymized_items,
388 },
389 )
391 return {
392 "message": "Account deleted successfully",
393 "deletion_timestamp": result.deletion_timestamp,
394 "deleted_items": result.deleted_items,
395 "anonymized_items": result.anonymized_items,
396 "audit_record_id": result.audit_record_id,
397 }
400@router.post("/me/consent")
401async def update_consent(
402 request: Request,
403 consent: ConsentRecord,
404 gdpr_storage: GDPRStorage = Depends(get_gdpr_storage_dependency),
405) -> ConsentResponse:
406 """
407 Update user consent preferences (GDPR Article 21 - Right to Object)
409 **GDPR Article 21**: The data subject shall have the right to object at any time
410 to processing of personal data concerning him or her.
412 **Request Body**: Consent type and whether it's granted
414 **Response**: Current consent status for all types
415 """
416 with tracer.start_as_current_span("gdpr.update_consent"):
417 # Get authenticated user from request state (set by AuthRequestMiddleware)
418 user = getattr(request.state, "user", None)
419 if not user: 419 ↛ 420line 419 didn't jump to line 420 because the condition on line 419 was never true
420 raise HTTPException(
421 status_code=status.HTTP_401_UNAUTHORIZED,
422 detail="Authentication required",
423 headers={"WWW-Authenticate": "Bearer"},
424 )
426 user_id = user.get("user_id")
427 if not user_id: 427 ↛ 428line 427 didn't jump to line 428 because the condition on line 427 was never true
428 raise HTTPException(
429 status_code=status.HTTP_400_BAD_REQUEST,
430 detail="Missing user_id in token",
431 )
433 # Capture metadata
434 timestamp = datetime.now(UTC).isoformat().replace("+00:00", "Z")
435 ip_address = None # Could capture from X-Forwarded-For if needed
436 user_agent = None # Could capture from headers if needed
438 # Create consent record
439 consent_id = f"consent_{user_id}_{consent.consent_type}_{timestamp.replace(':', '').replace('-', '')}"
440 consent_record = GDPRConsentRecord(
441 consent_id=consent_id,
442 user_id=str(user_id),
443 consent_type=consent.consent_type.value,
444 granted=consent.granted,
445 timestamp=timestamp,
446 ip_address=ip_address,
447 user_agent=user_agent,
448 )
450 # Store consent in PostgreSQL (append-only audit trail)
451 await gdpr_storage.consents.create(consent_record)
453 # Log consent change
454 logger.info(
455 "User consent updated",
456 extra={
457 "user_id": user_id,
458 "consent_type": consent.consent_type,
459 "granted": consent.granted,
460 "ip_address": ip_address,
461 "gdpr_article": "21",
462 },
463 )
465 # Get all current consents for response
466 all_consents = await gdpr_storage.consents.get_user_consents(str(user_id))
467 consents_dict = {}
468 for c in all_consents:
469 # Get latest consent for each type
470 latest = await gdpr_storage.consents.get_latest_consent(str(user_id), c.consent_type)
471 if latest: 471 ↛ 468line 471 didn't jump to line 468 because the condition on line 471 was always true
472 consents_dict[c.consent_type] = {
473 "granted": latest.granted,
474 "timestamp": latest.timestamp,
475 "ip_address": latest.ip_address,
476 "user_agent": latest.user_agent,
477 }
479 return ConsentResponse(user_id=user_id, consents=consents_dict)
482@router.get("/me/consent")
483async def get_consent_status(
484 request: Request,
485 gdpr_storage: GDPRStorage = Depends(get_gdpr_storage_dependency),
486) -> ConsentResponse:
487 """
488 Get current consent status (GDPR Article 21 - Right to Object)
490 Returns all consent preferences for the authenticated user.
492 **Response**: Current consent status for all consent types
493 """
494 with tracer.start_as_current_span("gdpr.get_consent_status"):
495 # Get authenticated user from request state (set by AuthRequestMiddleware)
496 user = getattr(request.state, "user", None)
497 if not user: 497 ↛ 498line 497 didn't jump to line 498 because the condition on line 497 was never true
498 raise HTTPException(
499 status_code=status.HTTP_401_UNAUTHORIZED,
500 detail="Authentication required",
501 headers={"WWW-Authenticate": "Bearer"},
502 )
504 user_id = user.get("user_id")
505 if not user_id: 505 ↛ 506line 505 didn't jump to line 506 because the condition on line 505 was never true
506 raise HTTPException(
507 status_code=status.HTTP_400_BAD_REQUEST,
508 detail="Missing user_id in token",
509 )
511 # Get all consent records from PostgreSQL
512 all_consents = await gdpr_storage.consents.get_user_consents(str(user_id))
514 # Build consent status dict (latest consent for each type)
515 consents_dict = {}
516 consent_types_seen = set()
518 for consent_rec in all_consents: 518 ↛ 519line 518 didn't jump to line 519 because the loop on line 518 never started
519 if consent_rec.consent_type not in consent_types_seen:
520 consent_types_seen.add(consent_rec.consent_type)
521 # Get latest consent for this type
522 latest = await gdpr_storage.consents.get_latest_consent(str(user_id), consent_rec.consent_type)
523 if latest:
524 consents_dict[consent_rec.consent_type] = {
525 "granted": latest.granted,
526 "timestamp": latest.timestamp,
527 "ip_address": latest.ip_address,
528 "user_agent": latest.user_agent,
529 }
531 logger.info("User consent status retrieved", extra={"user_id": user_id})
533 return ConsentResponse(user_id=user_id, consents=consents_dict)