Coverage for src / mcp_server_langgraph / compliance / gdpr / postgres_storage.py: 28%
184 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2PostgreSQL Storage Implementation for GDPR/HIPAA/SOC2 Compliance
4Implements persistent storage for:
5- User profiles
6- User preferences
7- Consent records (7-year retention)
8- Conversations (90-day retention)
9- Audit logs (7-year retention)
11Compliance Requirements:
12- GDPR Articles 5, 15, 17 (retention, access, erasure)
13- HIPAA §164.312(b), §164.316(b)(2)(i) (audit controls, 7-year retention)
14- SOC2 CC6.6, PI1.4 (audit logs, data retention)
16Design: Pure PostgreSQL (not hybrid) for:
17- Simplicity (one source of truth)
18- ACID guarantees (atomic GDPR operations)
19- Cost efficiency (14x cheaper than Redis for 7-year data)
20- Compliance auditing (single system to audit)
21"""
23import json
24from datetime import datetime, UTC
25from typing import Any
27import asyncpg
29from mcp_server_langgraph.compliance.gdpr.storage import (
30 AuditLogEntry,
31 AuditLogStore,
32 ConsentRecord,
33 ConsentStore,
34 Conversation,
35 ConversationStore,
36 PreferencesStore,
37 UserPreferences,
38 UserProfile,
39 UserProfileStore,
40)
42# ============================================================================
43# PostgreSQL User Profile Store
44# ============================================================================
47class PostgresUserProfileStore(UserProfileStore):
48 """
49 PostgreSQL implementation of user profile storage
51 GDPR Compliance:
52 - Article 15: Right to access (get method)
53 - Article 17: Right to erasure (delete method)
54 - Automatic last_updated timestamp via trigger
55 """
57 def __init__(self, pool: asyncpg.Pool):
58 """
59 Initialize PostgreSQL user profile store
61 Args:
62 pool: asyncpg connection pool
63 """
64 self.pool = pool
66 async def create(self, profile: UserProfile) -> bool:
67 """
68 Create a new user profile
70 Args:
71 profile: User profile to create
73 Returns:
74 True if created successfully, False if user already exists
75 """
76 try:
77 async with self.pool.acquire() as conn:
78 await conn.execute(
79 """
80 INSERT INTO user_profiles
81 (user_id, username, email, full_name, created_at, last_updated, metadata)
82 VALUES ($1, $2, $3, $4, $5, $6, $7)
83 """,
84 profile.user_id,
85 profile.username,
86 profile.email,
87 profile.full_name,
88 # Convert ISO string timestamps to datetime objects for asyncpg
89 datetime.fromisoformat(profile.created_at),
90 datetime.fromisoformat(profile.last_updated),
91 json.dumps(profile.metadata),
92 )
93 return True
94 except asyncpg.UniqueViolationError:
95 # User already exists
96 return False
98 async def get(self, user_id: str) -> UserProfile | None:
99 """
100 Get user profile by ID (GDPR Article 15 - Right to access)
102 Args:
103 user_id: User identifier
105 Returns:
106 User profile if found, None otherwise
107 """
108 async with self.pool.acquire() as conn:
109 row = await conn.fetchrow(
110 """
111 SELECT user_id, username, email, full_name, created_at, last_updated, metadata
112 FROM user_profiles
113 WHERE user_id = $1
114 """,
115 user_id,
116 )
118 if row is None:
119 return None
121 return UserProfile(
122 user_id=row["user_id"],
123 username=row["username"],
124 email=row["email"],
125 full_name=row["full_name"],
126 created_at=row["created_at"].isoformat().replace("+00:00", "Z"),
127 last_updated=row["last_updated"].isoformat().replace("+00:00", "Z"),
128 metadata=json.loads(row["metadata"]) if row["metadata"] else {},
129 )
131 async def update(self, user_id: str, updates: dict[str, Any]) -> bool:
132 """
133 Update user profile
135 Args:
136 user_id: User identifier
137 updates: Dictionary of fields to update
139 Returns:
140 True if updated successfully, False if user not found
142 Note:
143 last_updated is automatically updated by database trigger
144 """
145 # Build SET clause dynamically
146 set_clauses = []
147 values = []
148 param_num = 1
150 for key, value in updates.items():
151 if key in ["email", "full_name", "username"]:
152 set_clauses.append(f"{key} = ${param_num}")
153 values.append(value)
154 param_num += 1
155 elif key == "metadata": 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true
156 set_clauses.append(f"metadata = ${param_num}")
157 values.append(json.dumps(value))
158 param_num += 1
160 if not set_clauses:
161 return False
163 # Add user_id as final parameter
164 values.append(user_id)
166 # SECURITY: Safe from SQL injection because:
167 # 1. Field names are validated against allowlist (lines 150-157)
168 # 2. All values are passed as parameterized queries ($1, $2, etc.) via *values
169 # 3. Only SQL structure (not data) is constructed via f-string
170 # SET clause is built from validated field names, values are passed as parameters
171 query = f"""
172 UPDATE user_profiles
173 SET {", ".join(set_clauses)}
174 WHERE user_id = ${param_num}
175 """ # nosec B608 - See security comment above
177 async with self.pool.acquire() as conn:
178 # SECURITY: Query is safe - field names validated above (set_clauses uses ALLOWED_FIELDS),
179 # values are parameterized. See nosec B608 comment on query string.
180 # nosemgrep: python.sqlalchemy.security.sqlalchemy-execute-raw-query.sqlalchemy-execute-raw-query
181 result = await conn.execute(query, *values)
182 # Check if any rows were updated (result is "UPDATE N")
183 row_count: str = result.split()[-1]
184 return row_count == "1"
186 async def delete(self, user_id: str) -> bool:
187 """
188 Delete user profile (GDPR Article 17 - Right to erasure)
190 Args:
191 user_id: User identifier
193 Returns:
194 True if deleted successfully, False if user not found
196 Note:
197 Cascades to preferences, conversations, consents via foreign key
198 """
199 async with self.pool.acquire() as conn:
200 result = await conn.execute(
201 "DELETE FROM user_profiles WHERE user_id = $1",
202 user_id,
203 )
204 # Check if any rows were deleted (result is "DELETE N")
205 row_count: str = result.split()[-1]
206 return row_count == "1"
209# ============================================================================
210# PostgreSQL Preferences Store
211# ============================================================================
214class PostgresPreferencesStore(PreferencesStore):
215 """
216 PostgreSQL implementation of user preferences storage
218 Features:
219 - UPSERT (INSERT ... ON CONFLICT UPDATE) for set operations
220 - JSON merge for partial updates
221 - Automatic updated_at timestamp via trigger
222 """
224 def __init__(self, pool: asyncpg.Pool):
225 """
226 Initialize PostgreSQL preferences store
228 Args:
229 pool: asyncpg connection pool
230 """
231 self.pool = pool
233 async def get(self, user_id: str) -> UserPreferences | None:
234 """Get user preferences"""
235 async with self.pool.acquire() as conn:
236 row = await conn.fetchrow(
237 """
238 SELECT user_id, preferences, updated_at
239 FROM user_preferences
240 WHERE user_id = $1
241 """,
242 user_id,
243 )
245 if row is None:
246 return None
248 return UserPreferences(
249 user_id=row["user_id"],
250 preferences=json.loads(row["preferences"]) if row["preferences"] else {},
251 updated_at=row["updated_at"].isoformat().replace("+00:00", "Z"),
252 )
254 async def set(self, user_id: str, preferences: dict[str, Any]) -> bool:
255 """
256 Set user preferences (UPSERT - insert or replace)
258 Args:
259 user_id: User identifier
260 preferences: Preferences dictionary
262 Returns:
263 True if successful
264 """
265 now = datetime.now(UTC)
267 async with self.pool.acquire() as conn:
268 await conn.execute(
269 """
270 INSERT INTO user_preferences (user_id, preferences, updated_at)
271 VALUES ($1, $2, $3)
272 ON CONFLICT (user_id)
273 DO UPDATE SET
274 preferences = EXCLUDED.preferences,
275 updated_at = EXCLUDED.updated_at
276 """,
277 user_id,
278 json.dumps(preferences),
279 now,
280 )
281 return True
283 async def update(self, user_id: str, updates: dict[str, Any]) -> bool:
284 """
285 Update specific preferences (merge with existing)
287 Args:
288 user_id: User identifier
289 updates: Partial preferences to update
291 Returns:
292 True if successful, False if user has no preferences
293 """
294 # Get existing preferences
295 existing = await self.get(user_id)
296 if existing is None:
297 return False
299 # Merge updates
300 merged = {**existing.preferences, **updates}
302 # Set merged preferences
303 return await self.set(user_id, merged)
305 async def delete(self, user_id: str) -> bool:
306 """Delete user preferences"""
307 async with self.pool.acquire() as conn:
308 result = await conn.execute(
309 "DELETE FROM user_preferences WHERE user_id = $1",
310 user_id,
311 )
312 row_count: str = result.split()[-1]
313 return row_count == "1"
316# ============================================================================
317# PostgreSQL Consent Store
318# ============================================================================
321class PostgresConsentStore(ConsentStore):
322 """
323 PostgreSQL implementation of consent record storage
325 GDPR Article 7: Conditions for consent
326 - Immutable consent records (append-only audit trail)
327 - 7-year retention requirement
328 - Latest consent determines current status
329 """
331 def __init__(self, pool: asyncpg.Pool):
332 """
333 Initialize PostgreSQL consent store
335 Args:
336 pool: asyncpg connection pool
337 """
338 self.pool = pool
340 async def create(self, record: ConsentRecord) -> str:
341 """
342 Create a consent record (append-only for audit trail)
344 Args:
345 record: Consent record to create
347 Returns:
348 consent_id of created record
349 """
350 async with self.pool.acquire() as conn:
351 await conn.execute(
352 """
353 INSERT INTO consent_records
354 (consent_id, user_id, consent_type, granted, timestamp, ip_address, user_agent, metadata)
355 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
356 """,
357 record.consent_id,
358 record.user_id,
359 record.consent_type,
360 record.granted,
361 # Convert ISO string timestamp to datetime object for asyncpg
362 datetime.fromisoformat(record.timestamp),
363 record.ip_address,
364 record.user_agent,
365 json.dumps(record.metadata),
366 )
367 return record.consent_id
369 async def get_user_consents(self, user_id: str) -> list[ConsentRecord]:
370 """
371 Get all consent records for a user (entire audit trail)
373 Args:
374 user_id: User identifier
376 Returns:
377 List of all consent records, ordered by timestamp descending
378 """
379 async with self.pool.acquire() as conn:
380 rows = await conn.fetch(
381 """
382 SELECT consent_id, user_id, consent_type, granted, timestamp, ip_address, user_agent, metadata
383 FROM consent_records
384 WHERE user_id = $1
385 ORDER BY timestamp DESC
386 """,
387 user_id,
388 )
390 return [
391 ConsentRecord(
392 consent_id=row["consent_id"],
393 user_id=row["user_id"],
394 consent_type=row["consent_type"],
395 granted=row["granted"],
396 timestamp=row["timestamp"].isoformat().replace("+00:00", "Z"),
397 ip_address=row["ip_address"],
398 user_agent=row["user_agent"],
399 metadata=json.loads(row["metadata"]) if row["metadata"] else {},
400 )
401 for row in rows
402 ]
404 async def get_latest_consent(self, user_id: str, consent_type: str) -> ConsentRecord | None:
405 """
406 Get the latest consent record for a specific type
408 Args:
409 user_id: User identifier
410 consent_type: Type of consent (analytics, marketing, etc.)
412 Returns:
413 Latest consent record if found, None otherwise
414 """
415 async with self.pool.acquire() as conn:
416 row = await conn.fetchrow(
417 """
418 SELECT consent_id, user_id, consent_type, granted, timestamp, ip_address, user_agent, metadata
419 FROM consent_records
420 WHERE user_id = $1 AND consent_type = $2
421 ORDER BY timestamp DESC
422 LIMIT 1
423 """,
424 user_id,
425 consent_type,
426 )
428 if row is None:
429 return None
431 return ConsentRecord(
432 consent_id=row["consent_id"],
433 user_id=row["user_id"],
434 consent_type=row["consent_type"],
435 granted=row["granted"],
436 timestamp=row["timestamp"].isoformat().replace("+00:00", "Z"),
437 ip_address=row["ip_address"],
438 user_agent=row["user_agent"],
439 metadata=json.loads(row["metadata"]) if row["metadata"] else {},
440 )
442 async def delete_user_consents(self, user_id: str) -> int:
443 """
444 Delete all consent records for a user
446 Note: Normally consents are kept for 7 years even after user deletion
447 This is only for GDPR Right to Erasure in special cases
449 Args:
450 user_id: User identifier
452 Returns:
453 Number of consent records deleted
454 """
455 async with self.pool.acquire() as conn:
456 result = await conn.execute(
457 "DELETE FROM consent_records WHERE user_id = $1",
458 user_id,
459 )
460 return int(result.split()[-1])
463# ============================================================================
464# PostgreSQL Conversation Store
465# ============================================================================
468class PostgresConversationStore(ConversationStore):
469 """
470 PostgreSQL implementation of conversation storage
472 GDPR Article 5(1)(e): Storage limitation (90-day retention)
473 - Messages stored as JSONB array
474 - Automatic cleanup view for retention enforcement
475 """
477 def __init__(self, pool: asyncpg.Pool):
478 """
479 Initialize PostgreSQL conversation store
481 Args:
482 pool: asyncpg connection pool
483 """
484 self.pool = pool
486 async def create(self, conversation: Conversation) -> str:
487 """Create a new conversation"""
488 async with self.pool.acquire() as conn:
489 await conn.execute(
490 """
491 INSERT INTO conversations
492 (conversation_id, user_id, title, messages, created_at, last_message_at, archived, metadata)
493 VALUES ($1, $2, $3, $4, $5, $6, $7, $8)
494 """,
495 conversation.conversation_id,
496 conversation.user_id,
497 conversation.title,
498 json.dumps(conversation.messages),
499 # Convert ISO string timestamps to datetime objects for asyncpg
500 datetime.fromisoformat(conversation.created_at),
501 datetime.fromisoformat(conversation.last_message_at),
502 conversation.archived,
503 json.dumps(conversation.metadata),
504 )
505 return conversation.conversation_id
507 async def get(self, conversation_id: str) -> Conversation | None:
508 """Get conversation by ID"""
509 async with self.pool.acquire() as conn:
510 row = await conn.fetchrow(
511 """
512 SELECT conversation_id, user_id, title, messages, created_at, last_message_at, archived, metadata
513 FROM conversations
514 WHERE conversation_id = $1
515 """,
516 conversation_id,
517 )
519 if row is None:
520 return None
522 return Conversation(
523 conversation_id=row["conversation_id"],
524 user_id=row["user_id"],
525 title=row["title"],
526 messages=json.loads(row["messages"]) if row["messages"] else [],
527 created_at=row["created_at"].isoformat().replace("+00:00", "Z"),
528 last_message_at=row["last_message_at"].isoformat().replace("+00:00", "Z"),
529 archived=row["archived"],
530 metadata=json.loads(row["metadata"]) if row["metadata"] else {},
531 )
533 async def list_user_conversations(self, user_id: str, archived: bool | None = None) -> list[Conversation]:
534 """
535 List all conversations for a user
537 Args:
538 user_id: User identifier
539 archived: Filter by archived status (None = all, True = archived only, False = active only)
541 Returns:
542 List of conversations ordered by last_message_at descending
543 """
544 if archived is None:
545 query = """
546 SELECT conversation_id, user_id, title, messages, created_at, last_message_at, archived, metadata
547 FROM conversations
548 WHERE user_id = $1
549 ORDER BY last_message_at DESC NULLS LAST
550 """
551 params: list[str | bool] = [user_id]
552 else:
553 query = """
554 SELECT conversation_id, user_id, title, messages, created_at, last_message_at, archived, metadata
555 FROM conversations
556 WHERE user_id = $1 AND archived = $2
557 ORDER BY last_message_at DESC NULLS LAST
558 """
559 params = [user_id, archived]
561 async with self.pool.acquire() as conn:
562 rows = await conn.fetch(query, *params)
564 return [
565 Conversation(
566 conversation_id=row["conversation_id"],
567 user_id=row["user_id"],
568 title=row["title"],
569 messages=json.loads(row["messages"]) if row["messages"] else [],
570 created_at=row["created_at"].isoformat().replace("+00:00", "Z"),
571 last_message_at=row["last_message_at"].isoformat().replace("+00:00", "Z"),
572 archived=row["archived"],
573 metadata=json.loads(row["metadata"]) if row["metadata"] else {},
574 )
575 for row in rows
576 ]
578 async def update(self, conversation_id: str, updates: dict[str, Any]) -> bool:
579 """
580 Update conversation
582 Args:
583 conversation_id: Conversation identifier
584 updates: Dictionary of fields to update (title, messages, archived, last_message_at, metadata)
586 Returns:
587 True if updated, False if not found
588 """
589 set_clauses = []
590 values: list[Any] = []
591 param_num = 1
593 for key, value in updates.items():
594 if key == "last_message_at":
595 set_clauses.append(f"{key} = ${param_num}")
596 # Convert ISO string timestamp to datetime object for asyncpg
597 values.append(datetime.fromisoformat(value))
598 param_num += 1
599 elif key in ["title", "archived"]:
600 set_clauses.append(f"{key} = ${param_num}")
601 values.append(value)
602 param_num += 1
603 elif key in ["messages", "metadata"]:
604 set_clauses.append(f"{key} = ${param_num}")
605 values.append(json.dumps(value))
606 param_num += 1
608 if not set_clauses:
609 return False
611 values.append(conversation_id)
612 # SECURITY: Safe from SQL injection because:
613 # 1. Field names are validated against allowlist (lines 584-595)
614 # 2. All values are passed as parameterized queries ($1, $2, etc.) via *values
615 # 3. Only SQL structure (not data) is constructed via f-string
616 # Parameterized query, field names validated, values as parameters
617 query = f"""
618 UPDATE conversations
619 SET {", ".join(set_clauses)}
620 WHERE conversation_id = ${param_num}
621 """ # nosec B608 - Safe: field names validated (lines 587-595), values parameterized
623 async with self.pool.acquire() as conn:
624 # SECURITY: Query is safe - field names validated above (set_clauses uses ALLOWED_FIELDS),
625 # values are parameterized. See nosec B608 comment on query string.
626 # nosemgrep: python.sqlalchemy.security.sqlalchemy-execute-raw-query.sqlalchemy-execute-raw-query
627 result = await conn.execute(query, *values)
628 row_count: str = result.split()[-1]
629 return row_count == "1"
631 async def delete(self, conversation_id: str) -> bool:
632 """Delete conversation"""
633 async with self.pool.acquire() as conn:
634 result = await conn.execute(
635 "DELETE FROM conversations WHERE conversation_id = $1",
636 conversation_id,
637 )
638 row_count: str = result.split()[-1]
639 return row_count == "1"
641 async def delete_user_conversations(self, user_id: str) -> int:
642 """
643 Delete all conversations for a user (GDPR Article 17)
645 Args:
646 user_id: User identifier
648 Returns:
649 Number of conversations deleted
650 """
651 async with self.pool.acquire() as conn:
652 result = await conn.execute(
653 "DELETE FROM conversations WHERE user_id = $1",
654 user_id,
655 )
656 return int(result.split()[-1])
659# ============================================================================
660# PostgreSQL Audit Log Store
661# ============================================================================
664class PostgresAuditLogStore(AuditLogStore):
665 """
666 PostgreSQL implementation of audit log storage
668 HIPAA §164.312(b): Audit controls (7-year retention)
669 SOC2 CC6.6: Audit logging and monitoring
671 Features:
672 - Immutable (append-only) audit trail
673 - Anonymization support for GDPR Article 17
674 - Time-series queries with optimized indexes
675 """
677 def __init__(self, pool: asyncpg.Pool):
678 """
679 Initialize PostgreSQL audit log store
681 Args:
682 pool: asyncpg connection pool
683 """
684 self.pool = pool
686 async def log(self, entry: AuditLogEntry) -> str:
687 """
688 Log an audit entry (append-only for compliance)
690 Args:
691 entry: Audit log entry to create
693 Returns:
694 log_id of created entry
695 """
696 # Handle empty user_id (system events)
697 user_id = entry.user_id if entry.user_id else None
699 async with self.pool.acquire() as conn:
700 await conn.execute(
701 """
702 INSERT INTO audit_logs
703 (log_id, user_id, action, resource_type, resource_id, timestamp, ip_address, user_agent, metadata)
704 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
705 """,
706 entry.log_id,
707 user_id,
708 entry.action,
709 entry.resource_type,
710 entry.resource_id,
711 # Convert ISO string timestamp to datetime object for asyncpg
712 datetime.fromisoformat(entry.timestamp),
713 entry.ip_address,
714 entry.user_agent,
715 json.dumps(entry.metadata),
716 )
717 return entry.log_id
719 async def get(self, log_id: str) -> AuditLogEntry | None:
720 """Get audit log entry by ID"""
721 async with self.pool.acquire() as conn:
722 row = await conn.fetchrow(
723 """
724 SELECT log_id, user_id, action, resource_type, resource_id, timestamp, ip_address, user_agent, metadata
725 FROM audit_logs
726 WHERE log_id = $1
727 """,
728 log_id,
729 )
731 if row is None:
732 return None
734 return AuditLogEntry(
735 log_id=row["log_id"],
736 user_id=row["user_id"] or "",
737 action=row["action"],
738 resource_type=row["resource_type"] or "",
739 resource_id=row["resource_id"],
740 timestamp=row["timestamp"].isoformat().replace("+00:00", "Z"),
741 ip_address=row["ip_address"],
742 user_agent=row["user_agent"],
743 metadata=json.loads(row["metadata"]) if row["metadata"] else {},
744 )
746 async def list_user_logs(
747 self, user_id: str, start_date: datetime | None = None, end_date: datetime | None = None, limit: int = 100
748 ) -> list[AuditLogEntry]:
749 """
750 List audit logs for a user (HIPAA/SOC2 compliance queries)
752 Args:
753 user_id: User identifier
754 start_date: Optional start date filter
755 end_date: Optional end date filter
756 limit: Maximum number of logs to return
758 Returns:
759 List of audit log entries ordered by timestamp descending
760 """
761 # Build query dynamically based on filters
762 conditions = ["user_id = $1"]
763 params: list[str | datetime | int] = [user_id]
764 param_num = 2
766 if start_date is not None:
767 conditions.append(f"timestamp >= ${param_num}")
768 params.append(start_date)
769 param_num += 1
771 if end_date is not None:
772 conditions.append(f"timestamp <= ${param_num}")
773 params.append(end_date)
774 param_num += 1
776 # SECURITY: Safe from SQL injection because:
777 # 1. WHERE conditions are built programmatically (not from user input) with fixed field names
778 # 2. All values are passed as parameterized queries ($1, $2, etc.) via params list
779 # 3. Only SQL structure (not data) is constructed via f-string
780 # Parameterized query with validated conditions and parameter placeholders
781 query = f"""
782 SELECT log_id, user_id, action, resource_type, resource_id, timestamp, ip_address, user_agent, metadata
783 FROM audit_logs
784 WHERE {" AND ".join(conditions)}
785 ORDER BY timestamp DESC
786 LIMIT ${param_num}
787 """ # nosec B608 - Safe: conditions programmatic (lines 747-759), values parameterized
788 params.append(limit)
790 async with self.pool.acquire() as conn:
791 rows = await conn.fetch(query, *params)
793 return [
794 AuditLogEntry(
795 log_id=row["log_id"],
796 user_id=row["user_id"] or "",
797 action=row["action"],
798 resource_type=row["resource_type"] or "",
799 resource_id=row["resource_id"],
800 timestamp=row["timestamp"].isoformat().replace("+00:00", "Z"),
801 ip_address=row["ip_address"],
802 user_agent=row["user_agent"],
803 metadata=json.loads(row["metadata"]) if row["metadata"] else {},
804 )
805 for row in rows
806 ]
808 async def anonymize_user_logs(self, user_id: str) -> int:
809 """
810 Anonymize audit logs for a user (GDPR Article 17 - Right to Erasure)
812 Preserves audit trail for compliance while removing user identification.
813 Logs are kept for 7 years (HIPAA/SOC2) but user_id is anonymized.
815 Args:
816 user_id: User identifier to anonymize
818 Returns:
819 Number of audit logs anonymized
820 """
821 async with self.pool.acquire() as conn:
822 result = await conn.execute(
823 """
824 UPDATE audit_logs
825 SET user_id = 'anonymized'
826 WHERE user_id = $1
827 """,
828 user_id,
829 )
830 return int(result.split()[-1])