Coverage for src / mcp_server_langgraph / auth / hipaa.py: 96%
128 statements
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
« prev ^ index » next coverage.py v7.12.0, created at 2025-12-03 00:43 +0000
1"""
2HIPAA Compliance Controls
4Implements HIPAA Security Rule technical safeguards:
5- 164.312(a)(1): Unique User Identification
6- 164.312(a)(2)(i): Emergency Access Procedure
7- 164.312(a)(2)(iii): Automatic Logoff
8- 164.312(b): Audit Controls
9- 164.312(c)(1): Integrity Controls
11Note: Only required if processing Protected Health Information (PHI)
12"""
14import hashlib
15import hmac
16from datetime import datetime, timedelta, UTC
18from pydantic import BaseModel, Field
20from mcp_server_langgraph.auth.session import SessionStore
21from mcp_server_langgraph.integrations.alerting import Alert, AlertCategory, AlertingService, AlertSeverity
22from mcp_server_langgraph.observability.telemetry import logger, metrics, tracer
25class EmergencyAccessRequest(BaseModel):
26 """Emergency access request for PHI data"""
28 user_id: str = Field(..., description="User requesting emergency access")
29 reason: str = Field(..., min_length=10, description="Reason for emergency access (minimum 10 characters)")
30 approver_id: str = Field(..., description="User ID of approver")
31 duration_hours: int = Field(default=4, ge=1, le=24, description="Duration of emergency access (1-24 hours)")
32 access_level: str = Field(default="PHI", description="Level of access granted")
35class EmergencyAccessGrant(BaseModel):
36 """Emergency access grant record"""
38 grant_id: str
39 user_id: str
40 reason: str
41 approver_id: str
42 granted_at: str # ISO timestamp
43 expires_at: str # ISO timestamp
44 access_level: str
45 revoked: bool = False
46 revoked_at: str | None = None
49class PHIAuditLog(BaseModel):
50 """HIPAA-compliant audit log for PHI access"""
52 timestamp: str # ISO format
53 user_id: str
54 action: str
55 phi_accessed: bool
56 patient_id: str | None = None
57 resource_id: str | None = None
58 ip_address: str
59 user_agent: str
60 success: bool
61 failure_reason: str | None = None
64class DataIntegrityCheck(BaseModel):
65 """HIPAA integrity control - HMAC checksum for data"""
67 data_id: str
68 checksum: str
69 algorithm: str = "HMAC-SHA256"
70 created_at: str
73class HIPAAControls:
74 """
75 HIPAA Security Rule technical safeguards implementation
77 Provides emergency access, audit logging, and integrity controls
78 for systems processing Protected Health Information (PHI).
79 """
81 def __init__(
82 self,
83 session_store: SessionStore | None = None,
84 integrity_secret: str | None = None,
85 ):
86 """
87 Initialize HIPAA controls
89 Args:
90 session_store: Session storage for emergency access grants
91 integrity_secret: Secret key for HMAC integrity checks (REQUIRED for production)
92 If not provided, will attempt to load from environment/settings
94 Raises:
95 ValueError: If integrity_secret is not provided and not in environment (fail-closed security pattern)
96 """
97 self.session_store = session_store
99 # If not provided, try to load from environment/settings
100 if not integrity_secret:
101 import os
103 from mcp_server_langgraph.secrets.manager import get_secrets_manager
105 secrets_mgr = get_secrets_manager()
106 integrity_secret = secrets_mgr.get_secret("HIPAA_INTEGRITY_SECRET", fallback=os.getenv("HIPAA_INTEGRITY_SECRET"))
108 # Validate integrity secret is configured (fail-closed security pattern)
109 # HIPAA 164.312(c)(1) requires integrity controls for PHI
110 if not integrity_secret: 110 ↛ 111line 110 didn't jump to line 111 because the condition on line 110 was never true
111 msg = (
112 "CRITICAL: HIPAA integrity secret not configured. "
113 "Set HIPAA_INTEGRITY_SECRET environment variable or configure via Infisical. "
114 "HIPAA controls cannot be initialized without a secure secret key for data integrity. "
115 "(HIPAA 164.312(c)(1) - Integrity Controls)"
116 )
117 raise ValueError(msg)
119 self.integrity_secret = integrity_secret
121 # In-memory storage for emergency access grants (replace with database)
122 self._emergency_grants: dict[str, EmergencyAccessGrant] = {}
124 async def grant_emergency_access(
125 self,
126 user_id: str,
127 reason: str,
128 approver_id: str,
129 duration_hours: int = 4,
130 access_level: str = "PHI",
131 ) -> EmergencyAccessGrant:
132 """
133 Grant emergency access to PHI (HIPAA 164.312(a)(2)(i))
135 Emergency access procedure allows authorized users to access PHI
136 in emergency situations with proper approval and audit trail.
138 Args:
139 user_id: User requesting emergency access
140 reason: Detailed reason for emergency access
141 approver_id: User ID of approver (must be authorized)
142 duration_hours: Duration of access (1-24 hours, default 4)
143 access_level: Level of access (default "PHI")
145 Returns:
146 EmergencyAccessGrant with grant details
148 Example:
149 grant = await controls.grant_emergency_access(
150 user_id="user:doctor_smith",
151 reason="Patient emergency - cardiac arrest in ER",
152 approver_id="user:supervisor_jones",
153 duration_hours=2
154 )
155 """
156 with tracer.start_as_current_span("hipaa.grant_emergency_access") as span:
157 span.set_attribute("user_id", user_id)
158 span.set_attribute("approver_id", approver_id)
159 span.set_attribute("duration_hours", duration_hours)
161 # Validate request (constructor validates fields)
162 EmergencyAccessRequest(
163 user_id=user_id,
164 reason=reason,
165 approver_id=approver_id,
166 duration_hours=duration_hours,
167 access_level=access_level,
168 )
170 # Generate grant ID
171 grant_id = f"emergency_{datetime.now(UTC).strftime('%Y%m%d%H%M%S')}_{user_id.replace(':', '_')}"
173 # Calculate expiration
174 granted_at = datetime.now(UTC)
175 expires_at = granted_at + timedelta(hours=duration_hours)
177 # Create grant
178 grant = EmergencyAccessGrant(
179 grant_id=grant_id,
180 user_id=user_id,
181 reason=reason,
182 approver_id=approver_id,
183 granted_at=granted_at.isoformat().replace("+00:00", "Z"),
184 expires_at=expires_at.isoformat().replace("+00:00", "Z"),
185 access_level=access_level,
186 )
188 # Store grant
189 self._emergency_grants[grant_id] = grant
191 # Log emergency access grant (HIPAA audit requirement)
192 logger.warning(
193 "HIPAA: Emergency access granted",
194 extra={
195 "grant_id": grant_id,
196 "user_id": user_id,
197 "approver_id": approver_id,
198 "reason": reason,
199 "duration_hours": duration_hours,
200 "expires_at": grant.expires_at,
201 "access_level": access_level,
202 },
203 )
205 # Track metrics
206 metrics.successful_calls.add(1, {"operation": "emergency_access_grant"})
208 # Send alert to security team
209 try:
210 alerting_service = AlertingService()
211 await alerting_service.initialize()
213 alert = Alert(
214 title="HIPAA: Emergency Access Granted",
215 description=f"Emergency PHI access granted to {user_id} by {approver_id}",
216 severity=AlertSeverity.CRITICAL,
217 category=AlertCategory.SECURITY,
218 source="hipaa_emergency_access",
219 metadata={
220 "grant_id": grant.grant_id,
221 "user_id": user_id,
222 "approver_id": approver_id,
223 "reason": reason,
224 "duration_hours": duration_hours,
225 "expires_at": grant.expires_at,
226 "access_level": access_level,
227 },
228 )
230 await alerting_service.send_alert(alert)
231 logger.info("Emergency access alert sent", extra={"alert_id": alert.alert_id})
233 except Exception as e:
234 logger.error(f"Failed to send emergency access alert: {e}", exc_info=True)
236 return grant
238 async def revoke_emergency_access(self, grant_id: str, revoked_by: str) -> bool:
239 """
240 Revoke emergency access grant
242 Args:
243 grant_id: Grant ID to revoke
244 revoked_by: User ID performing revocation
246 Returns:
247 True if revoked, False if grant not found
248 """
249 with tracer.start_as_current_span("hipaa.revoke_emergency_access"):
250 grant = self._emergency_grants.get(grant_id)
252 if not grant:
253 logger.warning(f"Emergency grant not found: {grant_id}")
254 return False
256 grant.revoked = True
257 grant.revoked_at = datetime.now(UTC).isoformat().replace("+00:00", "Z")
259 logger.warning(
260 "HIPAA: Emergency access revoked",
261 extra={
262 "grant_id": grant_id,
263 "user_id": grant.user_id,
264 "revoked_by": revoked_by,
265 "revoked_at": grant.revoked_at,
266 },
267 )
269 return True
271 async def check_emergency_access(self, user_id: str) -> EmergencyAccessGrant | None:
272 """
273 Check if user has active emergency access
275 Args:
276 user_id: User ID to check
278 Returns:
279 Active EmergencyAccessGrant or None
280 """
281 now = datetime.now(UTC)
283 for grant in self._emergency_grants.values():
284 if grant.user_id == user_id and not grant.revoked:
285 expires_at = datetime.fromisoformat(grant.expires_at.replace("Z", "+00:00"))
286 if expires_at > now: 286 ↛ 283line 286 didn't jump to line 283 because the condition on line 286 was always true
287 return grant
289 return None
291 async def log_phi_access( # type: ignore[no-untyped-def]
292 self,
293 user_id: str,
294 action: str,
295 patient_id: str | None,
296 resource_id: str | None,
297 ip_address: str,
298 user_agent: str,
299 success: bool = True,
300 failure_reason: str | None = None,
301 ):
302 """
303 Log PHI access (HIPAA 164.312(b) - Audit Controls)
305 All access to PHI must be logged with sufficient detail to:
306 - Identify who accessed
307 - What was accessed
308 - When it was accessed
309 - Where it was accessed from
310 - Whether access was successful
312 Args:
313 user_id: User accessing PHI
314 action: Action performed (read, write, delete, etc.)
315 patient_id: Patient identifier (if applicable)
316 resource_id: Resource identifier
317 ip_address: IP address of access
318 user_agent: User agent string
319 success: Whether access was successful
320 failure_reason: Reason for failure (if unsuccessful)
321 """
322 with tracer.start_as_current_span("hipaa.log_phi_access") as span:
323 span.set_attribute("user_id", user_id)
324 span.set_attribute("action", action)
325 span.set_attribute("success", success)
327 log_entry = PHIAuditLog(
328 timestamp=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
329 user_id=user_id,
330 action=action,
331 phi_accessed=True,
332 patient_id=patient_id,
333 resource_id=resource_id,
334 ip_address=ip_address,
335 user_agent=user_agent,
336 success=success,
337 failure_reason=failure_reason,
338 )
340 # Log to secure audit trail (tamper-proof)
341 logger.warning(
342 "HIPAA: PHI Access",
343 extra=log_entry.model_dump(),
344 )
346 # Send to SIEM system via alerting service
347 try:
348 alerting_service = AlertingService()
349 await alerting_service.initialize()
351 # Determine severity based on success
352 alert_severity = AlertSeverity.INFO if success else AlertSeverity.WARNING # type: ignore[attr-defined]
354 alert = Alert(
355 title=f"HIPAA: PHI Access {action.upper()}",
356 description=f"PHI access {action} by {user_id}: resource {resource_id}",
357 severity=alert_severity,
358 category=AlertCategory.SECURITY,
359 source="hipaa_audit",
360 metadata=log_entry.model_dump(),
361 )
363 await alerting_service.send_alert(alert)
365 logger.debug("PHI access logged to SIEM", extra={"log_entry_id": log_entry.log_entry_id}) # type: ignore[attr-defined]
367 except Exception as e:
368 logger.error(f"Failed to send PHI access to SIEM: {e}", exc_info=True)
370 # Track metrics
371 if success:
372 metrics.successful_calls.add(1, {"operation": "phi_access", "action": action})
373 else:
374 metrics.failed_calls.add(1, {"operation": "phi_access", "action": action})
376 def generate_checksum(self, data: str, data_id: str) -> DataIntegrityCheck:
377 """
378 Generate HMAC checksum for data integrity (HIPAA 164.312(c)(1))
380 Args:
381 data: Data to generate checksum for
382 data_id: Unique identifier for the data
384 Returns:
385 DataIntegrityCheck with checksum
386 """
387 checksum = hmac.new(
388 self.integrity_secret.encode(),
389 data.encode(),
390 hashlib.sha256,
391 ).hexdigest()
393 return DataIntegrityCheck(
394 data_id=data_id,
395 checksum=checksum,
396 algorithm="HMAC-SHA256",
397 created_at=datetime.now(UTC).isoformat().replace("+00:00", "Z"),
398 )
400 def verify_checksum(self, data: str, expected_checksum: str) -> bool:
401 """
402 Verify data integrity using HMAC checksum
404 Args:
405 data: Data to verify
406 expected_checksum: Expected checksum
408 Returns:
409 True if checksum matches, False otherwise
411 Raises:
412 IntegrityError: If checksums don't match (in production)
413 """
414 actual_checksum = hmac.new(
415 self.integrity_secret.encode(),
416 data.encode(),
417 hashlib.sha256,
418 ).hexdigest()
420 # Use constant-time comparison to prevent timing attacks
421 is_valid = hmac.compare_digest(actual_checksum, expected_checksum)
423 if not is_valid:
424 logger.error(
425 "HIPAA: Data integrity check failed",
426 extra={
427 "expected": expected_checksum[:8] + "...",
428 "actual": actual_checksum[:8] + "...",
429 },
430 )
432 return is_valid
435# Global HIPAA controls instance
436_hipaa_controls: HIPAAControls | None = None
439def get_hipaa_controls() -> HIPAAControls:
440 """
441 Get or create global HIPAA controls instance.
443 Retrieves HIPAA integrity secret from settings.
444 Will raise ValueError if secret is not configured (fail-closed pattern).
446 Returns:
447 HIPAAControls instance
449 Raises:
450 ValueError: If HIPAA integrity secret is not configured
451 """
452 global _hipaa_controls
454 if _hipaa_controls is None:
455 # Import here to avoid circular dependency
456 from mcp_server_langgraph.core.config import settings
458 _hipaa_controls = HIPAAControls(integrity_secret=settings.hipaa_integrity_secret)
460 return _hipaa_controls
463def set_hipaa_controls(controls: HIPAAControls) -> None:
464 """
465 Set global HIPAA controls instance
467 Args:
468 controls: HIPAAControls instance to use globally
469 """
470 global _hipaa_controls
471 _hipaa_controls = controls