Coverage for src / mcp_server_langgraph / compliance / gdpr / postgres_storage.py: 28%

184 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2PostgreSQL Storage Implementation for GDPR/HIPAA/SOC2 Compliance 

3 

4Implements persistent storage for: 

5- User profiles 

6- User preferences 

7- Consent records (7-year retention) 

8- Conversations (90-day retention) 

9- Audit logs (7-year retention) 

10 

11Compliance Requirements: 

12- GDPR Articles 5, 15, 17 (retention, access, erasure) 

13- HIPAA §164.312(b), §164.316(b)(2)(i) (audit controls, 7-year retention) 

14- SOC2 CC6.6, PI1.4 (audit logs, data retention) 

15 

16Design: Pure PostgreSQL (not hybrid) for: 

17- Simplicity (one source of truth) 

18- ACID guarantees (atomic GDPR operations) 

19- Cost efficiency (14x cheaper than Redis for 7-year data) 

20- Compliance auditing (single system to audit) 

21""" 

22 

23import json 

24from datetime import datetime, UTC 

25from typing import Any 

26 

27import asyncpg 

28 

29from mcp_server_langgraph.compliance.gdpr.storage import ( 

30 AuditLogEntry, 

31 AuditLogStore, 

32 ConsentRecord, 

33 ConsentStore, 

34 Conversation, 

35 ConversationStore, 

36 PreferencesStore, 

37 UserPreferences, 

38 UserProfile, 

39 UserProfileStore, 

40) 

41 

42# ============================================================================ 

43# PostgreSQL User Profile Store 

44# ============================================================================ 

45 

46 

47class PostgresUserProfileStore(UserProfileStore): 

48 """ 

49 PostgreSQL implementation of user profile storage 

50 

51 GDPR Compliance: 

52 - Article 15: Right to access (get method) 

53 - Article 17: Right to erasure (delete method) 

54 - Automatic last_updated timestamp via trigger 

55 """ 

56 

57 def __init__(self, pool: asyncpg.Pool): 

58 """ 

59 Initialize PostgreSQL user profile store 

60 

61 Args: 

62 pool: asyncpg connection pool 

63 """ 

64 self.pool = pool 

65 

66 async def create(self, profile: UserProfile) -> bool: 

67 """ 

68 Create a new user profile 

69 

70 Args: 

71 profile: User profile to create 

72 

73 Returns: 

74 True if created successfully, False if user already exists 

75 """ 

76 try: 

77 async with self.pool.acquire() as conn: 

78 await conn.execute( 

79 """ 

80 INSERT INTO user_profiles 

81 (user_id, username, email, full_name, created_at, last_updated, metadata) 

82 VALUES ($1, $2, $3, $4, $5, $6, $7) 

83 """, 

84 profile.user_id, 

85 profile.username, 

86 profile.email, 

87 profile.full_name, 

88 # Convert ISO string timestamps to datetime objects for asyncpg 

89 datetime.fromisoformat(profile.created_at), 

90 datetime.fromisoformat(profile.last_updated), 

91 json.dumps(profile.metadata), 

92 ) 

93 return True 

94 except asyncpg.UniqueViolationError: 

95 # User already exists 

96 return False 

97 

98 async def get(self, user_id: str) -> UserProfile | None: 

99 """ 

100 Get user profile by ID (GDPR Article 15 - Right to access) 

101 

102 Args: 

103 user_id: User identifier 

104 

105 Returns: 

106 User profile if found, None otherwise 

107 """ 

108 async with self.pool.acquire() as conn: 

109 row = await conn.fetchrow( 

110 """ 

111 SELECT user_id, username, email, full_name, created_at, last_updated, metadata 

112 FROM user_profiles 

113 WHERE user_id = $1 

114 """, 

115 user_id, 

116 ) 

117 

118 if row is None: 

119 return None 

120 

121 return UserProfile( 

122 user_id=row["user_id"], 

123 username=row["username"], 

124 email=row["email"], 

125 full_name=row["full_name"], 

126 created_at=row["created_at"].isoformat().replace("+00:00", "Z"), 

127 last_updated=row["last_updated"].isoformat().replace("+00:00", "Z"), 

128 metadata=json.loads(row["metadata"]) if row["metadata"] else {}, 

129 ) 

130 

131 async def update(self, user_id: str, updates: dict[str, Any]) -> bool: 

132 """ 

133 Update user profile 

134 

135 Args: 

136 user_id: User identifier 

137 updates: Dictionary of fields to update 

138 

139 Returns: 

140 True if updated successfully, False if user not found 

141 

142 Note: 

143 last_updated is automatically updated by database trigger 

144 """ 

145 # Build SET clause dynamically 

146 set_clauses = [] 

147 values = [] 

148 param_num = 1 

149 

150 for key, value in updates.items(): 

151 if key in ["email", "full_name", "username"]: 

152 set_clauses.append(f"{key} = ${param_num}") 

153 values.append(value) 

154 param_num += 1 

155 elif key == "metadata": 155 ↛ 156line 155 didn't jump to line 156 because the condition on line 155 was never true

156 set_clauses.append(f"metadata = ${param_num}") 

157 values.append(json.dumps(value)) 

158 param_num += 1 

159 

160 if not set_clauses: 

161 return False 

162 

163 # Add user_id as final parameter 

164 values.append(user_id) 

165 

166 # SECURITY: Safe from SQL injection because: 

167 # 1. Field names are validated against allowlist (lines 150-157) 

168 # 2. All values are passed as parameterized queries ($1, $2, etc.) via *values 

169 # 3. Only SQL structure (not data) is constructed via f-string 

170 # SET clause is built from validated field names, values are passed as parameters 

171 query = f""" 

172 UPDATE user_profiles 

173 SET {", ".join(set_clauses)} 

174 WHERE user_id = ${param_num} 

175 """ # nosec B608 - See security comment above 

176 

177 async with self.pool.acquire() as conn: 

178 # SECURITY: Query is safe - field names validated above (set_clauses uses ALLOWED_FIELDS), 

179 # values are parameterized. See nosec B608 comment on query string. 

180 # nosemgrep: python.sqlalchemy.security.sqlalchemy-execute-raw-query.sqlalchemy-execute-raw-query 

181 result = await conn.execute(query, *values) 

182 # Check if any rows were updated (result is "UPDATE N") 

183 row_count: str = result.split()[-1] 

184 return row_count == "1" 

185 

186 async def delete(self, user_id: str) -> bool: 

187 """ 

188 Delete user profile (GDPR Article 17 - Right to erasure) 

189 

190 Args: 

191 user_id: User identifier 

192 

193 Returns: 

194 True if deleted successfully, False if user not found 

195 

196 Note: 

197 Cascades to preferences, conversations, consents via foreign key 

198 """ 

199 async with self.pool.acquire() as conn: 

200 result = await conn.execute( 

201 "DELETE FROM user_profiles WHERE user_id = $1", 

202 user_id, 

203 ) 

204 # Check if any rows were deleted (result is "DELETE N") 

205 row_count: str = result.split()[-1] 

206 return row_count == "1" 

207 

208 

209# ============================================================================ 

210# PostgreSQL Preferences Store 

211# ============================================================================ 

212 

213 

214class PostgresPreferencesStore(PreferencesStore): 

215 """ 

216 PostgreSQL implementation of user preferences storage 

217 

218 Features: 

219 - UPSERT (INSERT ... ON CONFLICT UPDATE) for set operations 

220 - JSON merge for partial updates 

221 - Automatic updated_at timestamp via trigger 

222 """ 

223 

224 def __init__(self, pool: asyncpg.Pool): 

225 """ 

226 Initialize PostgreSQL preferences store 

227 

228 Args: 

229 pool: asyncpg connection pool 

230 """ 

231 self.pool = pool 

232 

233 async def get(self, user_id: str) -> UserPreferences | None: 

234 """Get user preferences""" 

235 async with self.pool.acquire() as conn: 

236 row = await conn.fetchrow( 

237 """ 

238 SELECT user_id, preferences, updated_at 

239 FROM user_preferences 

240 WHERE user_id = $1 

241 """, 

242 user_id, 

243 ) 

244 

245 if row is None: 

246 return None 

247 

248 return UserPreferences( 

249 user_id=row["user_id"], 

250 preferences=json.loads(row["preferences"]) if row["preferences"] else {}, 

251 updated_at=row["updated_at"].isoformat().replace("+00:00", "Z"), 

252 ) 

253 

254 async def set(self, user_id: str, preferences: dict[str, Any]) -> bool: 

255 """ 

256 Set user preferences (UPSERT - insert or replace) 

257 

258 Args: 

259 user_id: User identifier 

260 preferences: Preferences dictionary 

261 

262 Returns: 

263 True if successful 

264 """ 

265 now = datetime.now(UTC) 

266 

267 async with self.pool.acquire() as conn: 

268 await conn.execute( 

269 """ 

270 INSERT INTO user_preferences (user_id, preferences, updated_at) 

271 VALUES ($1, $2, $3) 

272 ON CONFLICT (user_id) 

273 DO UPDATE SET 

274 preferences = EXCLUDED.preferences, 

275 updated_at = EXCLUDED.updated_at 

276 """, 

277 user_id, 

278 json.dumps(preferences), 

279 now, 

280 ) 

281 return True 

282 

283 async def update(self, user_id: str, updates: dict[str, Any]) -> bool: 

284 """ 

285 Update specific preferences (merge with existing) 

286 

287 Args: 

288 user_id: User identifier 

289 updates: Partial preferences to update 

290 

291 Returns: 

292 True if successful, False if user has no preferences 

293 """ 

294 # Get existing preferences 

295 existing = await self.get(user_id) 

296 if existing is None: 

297 return False 

298 

299 # Merge updates 

300 merged = {**existing.preferences, **updates} 

301 

302 # Set merged preferences 

303 return await self.set(user_id, merged) 

304 

305 async def delete(self, user_id: str) -> bool: 

306 """Delete user preferences""" 

307 async with self.pool.acquire() as conn: 

308 result = await conn.execute( 

309 "DELETE FROM user_preferences WHERE user_id = $1", 

310 user_id, 

311 ) 

312 row_count: str = result.split()[-1] 

313 return row_count == "1" 

314 

315 

316# ============================================================================ 

317# PostgreSQL Consent Store 

318# ============================================================================ 

319 

320 

321class PostgresConsentStore(ConsentStore): 

322 """ 

323 PostgreSQL implementation of consent record storage 

324 

325 GDPR Article 7: Conditions for consent 

326 - Immutable consent records (append-only audit trail) 

327 - 7-year retention requirement 

328 - Latest consent determines current status 

329 """ 

330 

331 def __init__(self, pool: asyncpg.Pool): 

332 """ 

333 Initialize PostgreSQL consent store 

334 

335 Args: 

336 pool: asyncpg connection pool 

337 """ 

338 self.pool = pool 

339 

340 async def create(self, record: ConsentRecord) -> str: 

341 """ 

342 Create a consent record (append-only for audit trail) 

343 

344 Args: 

345 record: Consent record to create 

346 

347 Returns: 

348 consent_id of created record 

349 """ 

350 async with self.pool.acquire() as conn: 

351 await conn.execute( 

352 """ 

353 INSERT INTO consent_records 

354 (consent_id, user_id, consent_type, granted, timestamp, ip_address, user_agent, metadata) 

355 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 

356 """, 

357 record.consent_id, 

358 record.user_id, 

359 record.consent_type, 

360 record.granted, 

361 # Convert ISO string timestamp to datetime object for asyncpg 

362 datetime.fromisoformat(record.timestamp), 

363 record.ip_address, 

364 record.user_agent, 

365 json.dumps(record.metadata), 

366 ) 

367 return record.consent_id 

368 

369 async def get_user_consents(self, user_id: str) -> list[ConsentRecord]: 

370 """ 

371 Get all consent records for a user (entire audit trail) 

372 

373 Args: 

374 user_id: User identifier 

375 

376 Returns: 

377 List of all consent records, ordered by timestamp descending 

378 """ 

379 async with self.pool.acquire() as conn: 

380 rows = await conn.fetch( 

381 """ 

382 SELECT consent_id, user_id, consent_type, granted, timestamp, ip_address, user_agent, metadata 

383 FROM consent_records 

384 WHERE user_id = $1 

385 ORDER BY timestamp DESC 

386 """, 

387 user_id, 

388 ) 

389 

390 return [ 

391 ConsentRecord( 

392 consent_id=row["consent_id"], 

393 user_id=row["user_id"], 

394 consent_type=row["consent_type"], 

395 granted=row["granted"], 

396 timestamp=row["timestamp"].isoformat().replace("+00:00", "Z"), 

397 ip_address=row["ip_address"], 

398 user_agent=row["user_agent"], 

399 metadata=json.loads(row["metadata"]) if row["metadata"] else {}, 

400 ) 

401 for row in rows 

402 ] 

403 

404 async def get_latest_consent(self, user_id: str, consent_type: str) -> ConsentRecord | None: 

405 """ 

406 Get the latest consent record for a specific type 

407 

408 Args: 

409 user_id: User identifier 

410 consent_type: Type of consent (analytics, marketing, etc.) 

411 

412 Returns: 

413 Latest consent record if found, None otherwise 

414 """ 

415 async with self.pool.acquire() as conn: 

416 row = await conn.fetchrow( 

417 """ 

418 SELECT consent_id, user_id, consent_type, granted, timestamp, ip_address, user_agent, metadata 

419 FROM consent_records 

420 WHERE user_id = $1 AND consent_type = $2 

421 ORDER BY timestamp DESC 

422 LIMIT 1 

423 """, 

424 user_id, 

425 consent_type, 

426 ) 

427 

428 if row is None: 

429 return None 

430 

431 return ConsentRecord( 

432 consent_id=row["consent_id"], 

433 user_id=row["user_id"], 

434 consent_type=row["consent_type"], 

435 granted=row["granted"], 

436 timestamp=row["timestamp"].isoformat().replace("+00:00", "Z"), 

437 ip_address=row["ip_address"], 

438 user_agent=row["user_agent"], 

439 metadata=json.loads(row["metadata"]) if row["metadata"] else {}, 

440 ) 

441 

442 async def delete_user_consents(self, user_id: str) -> int: 

443 """ 

444 Delete all consent records for a user 

445 

446 Note: Normally consents are kept for 7 years even after user deletion 

447 This is only for GDPR Right to Erasure in special cases 

448 

449 Args: 

450 user_id: User identifier 

451 

452 Returns: 

453 Number of consent records deleted 

454 """ 

455 async with self.pool.acquire() as conn: 

456 result = await conn.execute( 

457 "DELETE FROM consent_records WHERE user_id = $1", 

458 user_id, 

459 ) 

460 return int(result.split()[-1]) 

461 

462 

463# ============================================================================ 

464# PostgreSQL Conversation Store 

465# ============================================================================ 

466 

467 

468class PostgresConversationStore(ConversationStore): 

469 """ 

470 PostgreSQL implementation of conversation storage 

471 

472 GDPR Article 5(1)(e): Storage limitation (90-day retention) 

473 - Messages stored as JSONB array 

474 - Automatic cleanup view for retention enforcement 

475 """ 

476 

477 def __init__(self, pool: asyncpg.Pool): 

478 """ 

479 Initialize PostgreSQL conversation store 

480 

481 Args: 

482 pool: asyncpg connection pool 

483 """ 

484 self.pool = pool 

485 

486 async def create(self, conversation: Conversation) -> str: 

487 """Create a new conversation""" 

488 async with self.pool.acquire() as conn: 

489 await conn.execute( 

490 """ 

491 INSERT INTO conversations 

492 (conversation_id, user_id, title, messages, created_at, last_message_at, archived, metadata) 

493 VALUES ($1, $2, $3, $4, $5, $6, $7, $8) 

494 """, 

495 conversation.conversation_id, 

496 conversation.user_id, 

497 conversation.title, 

498 json.dumps(conversation.messages), 

499 # Convert ISO string timestamps to datetime objects for asyncpg 

500 datetime.fromisoformat(conversation.created_at), 

501 datetime.fromisoformat(conversation.last_message_at), 

502 conversation.archived, 

503 json.dumps(conversation.metadata), 

504 ) 

505 return conversation.conversation_id 

506 

507 async def get(self, conversation_id: str) -> Conversation | None: 

508 """Get conversation by ID""" 

509 async with self.pool.acquire() as conn: 

510 row = await conn.fetchrow( 

511 """ 

512 SELECT conversation_id, user_id, title, messages, created_at, last_message_at, archived, metadata 

513 FROM conversations 

514 WHERE conversation_id = $1 

515 """, 

516 conversation_id, 

517 ) 

518 

519 if row is None: 

520 return None 

521 

522 return Conversation( 

523 conversation_id=row["conversation_id"], 

524 user_id=row["user_id"], 

525 title=row["title"], 

526 messages=json.loads(row["messages"]) if row["messages"] else [], 

527 created_at=row["created_at"].isoformat().replace("+00:00", "Z"), 

528 last_message_at=row["last_message_at"].isoformat().replace("+00:00", "Z"), 

529 archived=row["archived"], 

530 metadata=json.loads(row["metadata"]) if row["metadata"] else {}, 

531 ) 

532 

533 async def list_user_conversations(self, user_id: str, archived: bool | None = None) -> list[Conversation]: 

534 """ 

535 List all conversations for a user 

536 

537 Args: 

538 user_id: User identifier 

539 archived: Filter by archived status (None = all, True = archived only, False = active only) 

540 

541 Returns: 

542 List of conversations ordered by last_message_at descending 

543 """ 

544 if archived is None: 

545 query = """ 

546 SELECT conversation_id, user_id, title, messages, created_at, last_message_at, archived, metadata 

547 FROM conversations 

548 WHERE user_id = $1 

549 ORDER BY last_message_at DESC NULLS LAST 

550 """ 

551 params: list[str | bool] = [user_id] 

552 else: 

553 query = """ 

554 SELECT conversation_id, user_id, title, messages, created_at, last_message_at, archived, metadata 

555 FROM conversations 

556 WHERE user_id = $1 AND archived = $2 

557 ORDER BY last_message_at DESC NULLS LAST 

558 """ 

559 params = [user_id, archived] 

560 

561 async with self.pool.acquire() as conn: 

562 rows = await conn.fetch(query, *params) 

563 

564 return [ 

565 Conversation( 

566 conversation_id=row["conversation_id"], 

567 user_id=row["user_id"], 

568 title=row["title"], 

569 messages=json.loads(row["messages"]) if row["messages"] else [], 

570 created_at=row["created_at"].isoformat().replace("+00:00", "Z"), 

571 last_message_at=row["last_message_at"].isoformat().replace("+00:00", "Z"), 

572 archived=row["archived"], 

573 metadata=json.loads(row["metadata"]) if row["metadata"] else {}, 

574 ) 

575 for row in rows 

576 ] 

577 

578 async def update(self, conversation_id: str, updates: dict[str, Any]) -> bool: 

579 """ 

580 Update conversation 

581 

582 Args: 

583 conversation_id: Conversation identifier 

584 updates: Dictionary of fields to update (title, messages, archived, last_message_at, metadata) 

585 

586 Returns: 

587 True if updated, False if not found 

588 """ 

589 set_clauses = [] 

590 values: list[Any] = [] 

591 param_num = 1 

592 

593 for key, value in updates.items(): 

594 if key == "last_message_at": 

595 set_clauses.append(f"{key} = ${param_num}") 

596 # Convert ISO string timestamp to datetime object for asyncpg 

597 values.append(datetime.fromisoformat(value)) 

598 param_num += 1 

599 elif key in ["title", "archived"]: 

600 set_clauses.append(f"{key} = ${param_num}") 

601 values.append(value) 

602 param_num += 1 

603 elif key in ["messages", "metadata"]: 

604 set_clauses.append(f"{key} = ${param_num}") 

605 values.append(json.dumps(value)) 

606 param_num += 1 

607 

608 if not set_clauses: 

609 return False 

610 

611 values.append(conversation_id) 

612 # SECURITY: Safe from SQL injection because: 

613 # 1. Field names are validated against allowlist (lines 584-595) 

614 # 2. All values are passed as parameterized queries ($1, $2, etc.) via *values 

615 # 3. Only SQL structure (not data) is constructed via f-string 

616 # Parameterized query, field names validated, values as parameters 

617 query = f""" 

618 UPDATE conversations 

619 SET {", ".join(set_clauses)} 

620 WHERE conversation_id = ${param_num} 

621 """ # nosec B608 - Safe: field names validated (lines 587-595), values parameterized 

622 

623 async with self.pool.acquire() as conn: 

624 # SECURITY: Query is safe - field names validated above (set_clauses uses ALLOWED_FIELDS), 

625 # values are parameterized. See nosec B608 comment on query string. 

626 # nosemgrep: python.sqlalchemy.security.sqlalchemy-execute-raw-query.sqlalchemy-execute-raw-query 

627 result = await conn.execute(query, *values) 

628 row_count: str = result.split()[-1] 

629 return row_count == "1" 

630 

631 async def delete(self, conversation_id: str) -> bool: 

632 """Delete conversation""" 

633 async with self.pool.acquire() as conn: 

634 result = await conn.execute( 

635 "DELETE FROM conversations WHERE conversation_id = $1", 

636 conversation_id, 

637 ) 

638 row_count: str = result.split()[-1] 

639 return row_count == "1" 

640 

641 async def delete_user_conversations(self, user_id: str) -> int: 

642 """ 

643 Delete all conversations for a user (GDPR Article 17) 

644 

645 Args: 

646 user_id: User identifier 

647 

648 Returns: 

649 Number of conversations deleted 

650 """ 

651 async with self.pool.acquire() as conn: 

652 result = await conn.execute( 

653 "DELETE FROM conversations WHERE user_id = $1", 

654 user_id, 

655 ) 

656 return int(result.split()[-1]) 

657 

658 

659# ============================================================================ 

660# PostgreSQL Audit Log Store 

661# ============================================================================ 

662 

663 

664class PostgresAuditLogStore(AuditLogStore): 

665 """ 

666 PostgreSQL implementation of audit log storage 

667 

668 HIPAA §164.312(b): Audit controls (7-year retention) 

669 SOC2 CC6.6: Audit logging and monitoring 

670 

671 Features: 

672 - Immutable (append-only) audit trail 

673 - Anonymization support for GDPR Article 17 

674 - Time-series queries with optimized indexes 

675 """ 

676 

677 def __init__(self, pool: asyncpg.Pool): 

678 """ 

679 Initialize PostgreSQL audit log store 

680 

681 Args: 

682 pool: asyncpg connection pool 

683 """ 

684 self.pool = pool 

685 

686 async def log(self, entry: AuditLogEntry) -> str: 

687 """ 

688 Log an audit entry (append-only for compliance) 

689 

690 Args: 

691 entry: Audit log entry to create 

692 

693 Returns: 

694 log_id of created entry 

695 """ 

696 # Handle empty user_id (system events) 

697 user_id = entry.user_id if entry.user_id else None 

698 

699 async with self.pool.acquire() as conn: 

700 await conn.execute( 

701 """ 

702 INSERT INTO audit_logs 

703 (log_id, user_id, action, resource_type, resource_id, timestamp, ip_address, user_agent, metadata) 

704 VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) 

705 """, 

706 entry.log_id, 

707 user_id, 

708 entry.action, 

709 entry.resource_type, 

710 entry.resource_id, 

711 # Convert ISO string timestamp to datetime object for asyncpg 

712 datetime.fromisoformat(entry.timestamp), 

713 entry.ip_address, 

714 entry.user_agent, 

715 json.dumps(entry.metadata), 

716 ) 

717 return entry.log_id 

718 

719 async def get(self, log_id: str) -> AuditLogEntry | None: 

720 """Get audit log entry by ID""" 

721 async with self.pool.acquire() as conn: 

722 row = await conn.fetchrow( 

723 """ 

724 SELECT log_id, user_id, action, resource_type, resource_id, timestamp, ip_address, user_agent, metadata 

725 FROM audit_logs 

726 WHERE log_id = $1 

727 """, 

728 log_id, 

729 ) 

730 

731 if row is None: 

732 return None 

733 

734 return AuditLogEntry( 

735 log_id=row["log_id"], 

736 user_id=row["user_id"] or "", 

737 action=row["action"], 

738 resource_type=row["resource_type"] or "", 

739 resource_id=row["resource_id"], 

740 timestamp=row["timestamp"].isoformat().replace("+00:00", "Z"), 

741 ip_address=row["ip_address"], 

742 user_agent=row["user_agent"], 

743 metadata=json.loads(row["metadata"]) if row["metadata"] else {}, 

744 ) 

745 

746 async def list_user_logs( 

747 self, user_id: str, start_date: datetime | None = None, end_date: datetime | None = None, limit: int = 100 

748 ) -> list[AuditLogEntry]: 

749 """ 

750 List audit logs for a user (HIPAA/SOC2 compliance queries) 

751 

752 Args: 

753 user_id: User identifier 

754 start_date: Optional start date filter 

755 end_date: Optional end date filter 

756 limit: Maximum number of logs to return 

757 

758 Returns: 

759 List of audit log entries ordered by timestamp descending 

760 """ 

761 # Build query dynamically based on filters 

762 conditions = ["user_id = $1"] 

763 params: list[str | datetime | int] = [user_id] 

764 param_num = 2 

765 

766 if start_date is not None: 

767 conditions.append(f"timestamp >= ${param_num}") 

768 params.append(start_date) 

769 param_num += 1 

770 

771 if end_date is not None: 

772 conditions.append(f"timestamp <= ${param_num}") 

773 params.append(end_date) 

774 param_num += 1 

775 

776 # SECURITY: Safe from SQL injection because: 

777 # 1. WHERE conditions are built programmatically (not from user input) with fixed field names 

778 # 2. All values are passed as parameterized queries ($1, $2, etc.) via params list 

779 # 3. Only SQL structure (not data) is constructed via f-string 

780 # Parameterized query with validated conditions and parameter placeholders 

781 query = f""" 

782 SELECT log_id, user_id, action, resource_type, resource_id, timestamp, ip_address, user_agent, metadata 

783 FROM audit_logs 

784 WHERE {" AND ".join(conditions)} 

785 ORDER BY timestamp DESC 

786 LIMIT ${param_num} 

787 """ # nosec B608 - Safe: conditions programmatic (lines 747-759), values parameterized 

788 params.append(limit) 

789 

790 async with self.pool.acquire() as conn: 

791 rows = await conn.fetch(query, *params) 

792 

793 return [ 

794 AuditLogEntry( 

795 log_id=row["log_id"], 

796 user_id=row["user_id"] or "", 

797 action=row["action"], 

798 resource_type=row["resource_type"] or "", 

799 resource_id=row["resource_id"], 

800 timestamp=row["timestamp"].isoformat().replace("+00:00", "Z"), 

801 ip_address=row["ip_address"], 

802 user_agent=row["user_agent"], 

803 metadata=json.loads(row["metadata"]) if row["metadata"] else {}, 

804 ) 

805 for row in rows 

806 ] 

807 

808 async def anonymize_user_logs(self, user_id: str) -> int: 

809 """ 

810 Anonymize audit logs for a user (GDPR Article 17 - Right to Erasure) 

811 

812 Preserves audit trail for compliance while removing user identification. 

813 Logs are kept for 7 years (HIPAA/SOC2) but user_id is anonymized. 

814 

815 Args: 

816 user_id: User identifier to anonymize 

817 

818 Returns: 

819 Number of audit logs anonymized 

820 """ 

821 async with self.pool.acquire() as conn: 

822 result = await conn.execute( 

823 """ 

824 UPDATE audit_logs 

825 SET user_id = 'anonymized' 

826 WHERE user_id = $1 

827 """, 

828 user_id, 

829 ) 

830 return int(result.split()[-1])