Coverage for src / mcp_server_langgraph / auth / hipaa.py: 96%

128 statements  

« prev     ^ index     » next       coverage.py v7.12.0, created at 2025-12-03 00:43 +0000

1""" 

2HIPAA Compliance Controls 

3 

4Implements HIPAA Security Rule technical safeguards: 

5- 164.312(a)(1): Unique User Identification 

6- 164.312(a)(2)(i): Emergency Access Procedure 

7- 164.312(a)(2)(iii): Automatic Logoff 

8- 164.312(b): Audit Controls 

9- 164.312(c)(1): Integrity Controls 

10 

11Note: Only required if processing Protected Health Information (PHI) 

12""" 

13 

14import hashlib 

15import hmac 

16from datetime import datetime, timedelta, UTC 

17 

18from pydantic import BaseModel, Field 

19 

20from mcp_server_langgraph.auth.session import SessionStore 

21from mcp_server_langgraph.integrations.alerting import Alert, AlertCategory, AlertingService, AlertSeverity 

22from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer 

23 

24 

25class EmergencyAccessRequest(BaseModel): 

26 """Emergency access request for PHI data""" 

27 

28 user_id: str = Field(..., description="User requesting emergency access") 

29 reason: str = Field(..., min_length=10, description="Reason for emergency access (minimum 10 characters)") 

30 approver_id: str = Field(..., description="User ID of approver") 

31 duration_hours: int = Field(default=4, ge=1, le=24, description="Duration of emergency access (1-24 hours)") 

32 access_level: str = Field(default="PHI", description="Level of access granted") 

33 

34 

35class EmergencyAccessGrant(BaseModel): 

36 """Emergency access grant record""" 

37 

38 grant_id: str 

39 user_id: str 

40 reason: str 

41 approver_id: str 

42 granted_at: str # ISO timestamp 

43 expires_at: str # ISO timestamp 

44 access_level: str 

45 revoked: bool = False 

46 revoked_at: str | None = None 

47 

48 

49class PHIAuditLog(BaseModel): 

50 """HIPAA-compliant audit log for PHI access""" 

51 

52 timestamp: str # ISO format 

53 user_id: str 

54 action: str 

55 phi_accessed: bool 

56 patient_id: str | None = None 

57 resource_id: str | None = None 

58 ip_address: str 

59 user_agent: str 

60 success: bool 

61 failure_reason: str | None = None 

62 

63 

64class DataIntegrityCheck(BaseModel): 

65 """HIPAA integrity control - HMAC checksum for data""" 

66 

67 data_id: str 

68 checksum: str 

69 algorithm: str = "HMAC-SHA256" 

70 created_at: str 

71 

72 

73class HIPAAControls: 

74 """ 

75 HIPAA Security Rule technical safeguards implementation 

76 

77 Provides emergency access, audit logging, and integrity controls 

78 for systems processing Protected Health Information (PHI). 

79 """ 

80 

81 def __init__( 

82 self, 

83 session_store: SessionStore | None = None, 

84 integrity_secret: str | None = None, 

85 ): 

86 """ 

87 Initialize HIPAA controls 

88 

89 Args: 

90 session_store: Session storage for emergency access grants 

91 integrity_secret: Secret key for HMAC integrity checks (REQUIRED for production) 

92 If not provided, will attempt to load from environment/settings 

93 

94 Raises: 

95 ValueError: If integrity_secret is not provided and not in environment (fail-closed security pattern) 

96 """ 

97 self.session_store = session_store 

98 

99 # If not provided, try to load from environment/settings 

100 if not integrity_secret: 

101 import os 

102 

103 from mcp_server_langgraph.secrets.manager import get_secrets_manager 

104 

105 secrets_mgr = get_secrets_manager() 

106 integrity_secret = secrets_mgr.get_secret("HIPAA_INTEGRITY_SECRET", fallback=os.getenv("HIPAA_INTEGRITY_SECRET")) 

107 

108 # Validate integrity secret is configured (fail-closed security pattern) 

109 # HIPAA 164.312(c)(1) requires integrity controls for PHI 

110 if not integrity_secret: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true

111 msg = ( 

112 "CRITICAL: HIPAA integrity secret not configured. " 

113 "Set HIPAA_INTEGRITY_SECRET environment variable or configure via Infisical. " 

114 "HIPAA controls cannot be initialized without a secure secret key for data integrity. " 

115 "(HIPAA 164.312(c)(1) - Integrity Controls)" 

116 ) 

117 raise ValueError(msg) 

118 

119 self.integrity_secret = integrity_secret 

120 

121 # In-memory storage for emergency access grants (replace with database) 

122 self._emergency_grants: dict[str, EmergencyAccessGrant] = {} 

123 

124 async def grant_emergency_access( 

125 self, 

126 user_id: str, 

127 reason: str, 

128 approver_id: str, 

129 duration_hours: int = 4, 

130 access_level: str = "PHI", 

131 ) -> EmergencyAccessGrant: 

132 """ 

133 Grant emergency access to PHI (HIPAA 164.312(a)(2)(i)) 

134 

135 Emergency access procedure allows authorized users to access PHI 

136 in emergency situations with proper approval and audit trail. 

137 

138 Args: 

139 user_id: User requesting emergency access 

140 reason: Detailed reason for emergency access 

141 approver_id: User ID of approver (must be authorized) 

142 duration_hours: Duration of access (1-24 hours, default 4) 

143 access_level: Level of access (default "PHI") 

144 

145 Returns: 

146 EmergencyAccessGrant with grant details 

147 

148 Example: 

149 grant = await controls.grant_emergency_access( 

150 user_id="user:doctor_smith", 

151 reason="Patient emergency - cardiac arrest in ER", 

152 approver_id="user:supervisor_jones", 

153 duration_hours=2 

154 ) 

155 """ 

156 with tracer.start_as_current_span("hipaa.grant_emergency_access") as span: 

157 span.set_attribute("user_id", user_id) 

158 span.set_attribute("approver_id", approver_id) 

159 span.set_attribute("duration_hours", duration_hours) 

160 

161 # Validate request (constructor validates fields) 

162 EmergencyAccessRequest( 

163 user_id=user_id, 

164 reason=reason, 

165 approver_id=approver_id, 

166 duration_hours=duration_hours, 

167 access_level=access_level, 

168 ) 

169 

170 # Generate grant ID 

171 grant_id = f"emergency_{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}_{user_id.replace(':', '_')}" 

172 

173 # Calculate expiration 

174 granted_at = datetime.now(UTC) 

175 expires_at = granted_at + timedelta(hours=duration_hours) 

176 

177 # Create grant 

178 grant = EmergencyAccessGrant( 

179 grant_id=grant_id, 

180 user_id=user_id, 

181 reason=reason, 

182 approver_id=approver_id, 

183 granted_at=granted_at.isoformat().replace("+00:00", "Z"), 

184 expires_at=expires_at.isoformat().replace("+00:00", "Z"), 

185 access_level=access_level, 

186 ) 

187 

188 # Store grant 

189 self._emergency_grants[grant_id] = grant 

190 

191 # Log emergency access grant (HIPAA audit requirement) 

192 logger.warning( 

193 "HIPAA: Emergency access granted", 

194 extra={ 

195 "grant_id": grant_id, 

196 "user_id": user_id, 

197 "approver_id": approver_id, 

198 "reason": reason, 

199 "duration_hours": duration_hours, 

200 "expires_at": grant.expires_at, 

201 "access_level": access_level, 

202 }, 

203 ) 

204 

205 # Track metrics 

206 metrics.successful_calls.add(1, {"operation": "emergency_access_grant"}) 

207 

208 # Send alert to security team 

209 try: 

210 alerting_service = AlertingService() 

211 await alerting_service.initialize() 

212 

213 alert = Alert( 

214 title="HIPAA: Emergency Access Granted", 

215 description=f"Emergency PHI access granted to {user_id} by {approver_id}", 

216 severity=AlertSeverity.CRITICAL, 

217 category=AlertCategory.SECURITY, 

218 source="hipaa_emergency_access", 

219 metadata={ 

220 "grant_id": grant.grant_id, 

221 "user_id": user_id, 

222 "approver_id": approver_id, 

223 "reason": reason, 

224 "duration_hours": duration_hours, 

225 "expires_at": grant.expires_at, 

226 "access_level": access_level, 

227 }, 

228 ) 

229 

230 await alerting_service.send_alert(alert) 

231 logger.info("Emergency access alert sent", extra={"alert_id": alert.alert_id}) 

232 

233 except Exception as e: 

234 logger.error(f"Failed to send emergency access alert: {e}", exc_info=True) 

235 

236 return grant 

237 

238 async def revoke_emergency_access(self, grant_id: str, revoked_by: str) -> bool: 

239 """ 

240 Revoke emergency access grant 

241 

242 Args: 

243 grant_id: Grant ID to revoke 

244 revoked_by: User ID performing revocation 

245 

246 Returns: 

247 True if revoked, False if grant not found 

248 """ 

249 with tracer.start_as_current_span("hipaa.revoke_emergency_access"): 

250 grant = self._emergency_grants.get(grant_id) 

251 

252 if not grant: 

253 logger.warning(f"Emergency grant not found: {grant_id}") 

254 return False 

255 

256 grant.revoked = True 

257 grant.revoked_at = datetime.now(UTC).isoformat().replace("+00:00", "Z") 

258 

259 logger.warning( 

260 "HIPAA: Emergency access revoked", 

261 extra={ 

262 "grant_id": grant_id, 

263 "user_id": grant.user_id, 

264 "revoked_by": revoked_by, 

265 "revoked_at": grant.revoked_at, 

266 }, 

267 ) 

268 

269 return True 

270 

271 async def check_emergency_access(self, user_id: str) -> EmergencyAccessGrant | None: 

272 """ 

273 Check if user has active emergency access 

274 

275 Args: 

276 user_id: User ID to check 

277 

278 Returns: 

279 Active EmergencyAccessGrant or None 

280 """ 

281 now = datetime.now(UTC) 

282 

283 for grant in self._emergency_grants.values(): 

284 if grant.user_id == user_id and not grant.revoked: 

285 expires_at = datetime.fromisoformat(grant.expires_at.replace("Z", "+00:00")) 

286 if expires_at > now: 286 ↛ 283line 286 didn't jump to line 283 because the condition on line 286 was always true

287 return grant 

288 

289 return None 

290 

291 async def log_phi_access( # type: ignore[no-untyped-def] 

292 self, 

293 user_id: str, 

294 action: str, 

295 patient_id: str | None, 

296 resource_id: str | None, 

297 ip_address: str, 

298 user_agent: str, 

299 success: bool = True, 

300 failure_reason: str | None = None, 

301 ): 

302 """ 

303 Log PHI access (HIPAA 164.312(b) - Audit Controls) 

304 

305 All access to PHI must be logged with sufficient detail to: 

306 - Identify who accessed 

307 - What was accessed 

308 - When it was accessed 

309 - Where it was accessed from 

310 - Whether access was successful 

311 

312 Args: 

313 user_id: User accessing PHI 

314 action: Action performed (read, write, delete, etc.) 

315 patient_id: Patient identifier (if applicable) 

316 resource_id: Resource identifier 

317 ip_address: IP address of access 

318 user_agent: User agent string 

319 success: Whether access was successful 

320 failure_reason: Reason for failure (if unsuccessful) 

321 """ 

322 with tracer.start_as_current_span("hipaa.log_phi_access") as span: 

323 span.set_attribute("user_id", user_id) 

324 span.set_attribute("action", action) 

325 span.set_attribute("success", success) 

326 

327 log_entry = PHIAuditLog( 

328 timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

329 user_id=user_id, 

330 action=action, 

331 phi_accessed=True, 

332 patient_id=patient_id, 

333 resource_id=resource_id, 

334 ip_address=ip_address, 

335 user_agent=user_agent, 

336 success=success, 

337 failure_reason=failure_reason, 

338 ) 

339 

340 # Log to secure audit trail (tamper-proof) 

341 logger.warning( 

342 "HIPAA: PHI Access", 

343 extra=log_entry.model_dump(), 

344 ) 

345 

346 # Send to SIEM system via alerting service 

347 try: 

348 alerting_service = AlertingService() 

349 await alerting_service.initialize() 

350 

351 # Determine severity based on success 

352 alert_severity = AlertSeverity.INFO if success else AlertSeverity.WARNING # type: ignore[attr-defined] 

353 

354 alert = Alert( 

355 title=f"HIPAA: PHI Access {action.upper()}", 

356 description=f"PHI access {action} by {user_id}: resource {resource_id}", 

357 severity=alert_severity, 

358 category=AlertCategory.SECURITY, 

359 source="hipaa_audit", 

360 metadata=log_entry.model_dump(), 

361 ) 

362 

363 await alerting_service.send_alert(alert) 

364 

365 logger.debug("PHI access logged to SIEM", extra={"log_entry_id": log_entry.log_entry_id}) # type: ignore[attr-defined] 

366 

367 except Exception as e: 

368 logger.error(f"Failed to send PHI access to SIEM: {e}", exc_info=True) 

369 

370 # Track metrics 

371 if success: 

372 metrics.successful_calls.add(1, {"operation": "phi_access", "action": action}) 

373 else: 

374 metrics.failed_calls.add(1, {"operation": "phi_access", "action": action}) 

375 

376 def generate_checksum(self, data: str, data_id: str) -> DataIntegrityCheck: 

377 """ 

378 Generate HMAC checksum for data integrity (HIPAA 164.312(c)(1)) 

379 

380 Args: 

381 data: Data to generate checksum for 

382 data_id: Unique identifier for the data 

383 

384 Returns: 

385 DataIntegrityCheck with checksum 

386 """ 

387 checksum = hmac.new( 

388 self.integrity_secret.encode(), 

389 data.encode(), 

390 hashlib.sha256, 

391 ).hexdigest() 

392 

393 return DataIntegrityCheck( 

394 data_id=data_id, 

395 checksum=checksum, 

396 algorithm="HMAC-SHA256", 

397 created_at=datetime.now(UTC).isoformat().replace("+00:00", "Z"), 

398 ) 

399 

400 def verify_checksum(self, data: str, expected_checksum: str) -> bool: 

401 """ 

402 Verify data integrity using HMAC checksum 

403 

404 Args: 

405 data: Data to verify 

406 expected_checksum: Expected checksum 

407 

408 Returns: 

409 True if checksum matches, False otherwise 

410 

411 Raises: 

412 IntegrityError: If checksums don't match (in production) 

413 """ 

414 actual_checksum = hmac.new( 

415 self.integrity_secret.encode(), 

416 data.encode(), 

417 hashlib.sha256, 

418 ).hexdigest() 

419 

420 # Use constant-time comparison to prevent timing attacks 

421 is_valid = hmac.compare_digest(actual_checksum, expected_checksum) 

422 

423 if not is_valid: 

424 logger.error( 

425 "HIPAA: Data integrity check failed", 

426 extra={ 

427 "expected": expected_checksum[:8] + "...", 

428 "actual": actual_checksum[:8] + "...", 

429 }, 

430 ) 

431 

432 return is_valid 

433 

434 

435# Global HIPAA controls instance 

436_hipaa_controls: HIPAAControls | None = None 

437 

438 

439def get_hipaa_controls() -> HIPAAControls: 

440 """ 

441 Get or create global HIPAA controls instance. 

442 

443 Retrieves HIPAA integrity secret from settings. 

444 Will raise ValueError if secret is not configured (fail-closed pattern). 

445 

446 Returns: 

447 HIPAAControls instance 

448 

449 Raises: 

450 ValueError: If HIPAA integrity secret is not configured 

451 """ 

452 global _hipaa_controls 

453 

454 if _hipaa_controls is None: 

455 # Import here to avoid circular dependency 

456 from mcp_server_langgraph.core.config import settings 

457 

458 _hipaa_controls = HIPAAControls(integrity_secret=settings.hipaa_integrity_secret) 

459 

460 return _hipaa_controls 

461 

462 

463def set_hipaa_controls(controls: HIPAAControls) -> None: 

464 """ 

465 Set global HIPAA controls instance 

466 

467 Args: 

468 controls: HIPAAControls instance to use globally 

469 """ 

470 global _hipaa_controls 

471 _hipaa_controls = controls