Spaces:
Sleeping
Sleeping
| """ | |
| Security Module - HIPAA/GDPR Compliance Features | |
| Implements authentication, authorization, audit logging, and encryption | |
| """ | |
| import logging | |
| import hashlib | |
| import secrets | |
| import json | |
| from datetime import datetime, timedelta | |
| from typing import Dict, List, Any, Optional | |
| from functools import wraps | |
| import jwt | |
| from fastapi import HTTPException, Request, Depends | |
| from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials | |
| logger = logging.getLogger(__name__) | |
| # Security configuration | |
| SECRET_KEY = secrets.token_urlsafe(32) # In production, load from environment | |
| ALGORITHM = "HS256" | |
| ACCESS_TOKEN_EXPIRE_MINUTES = 30 | |
| class AuditLogger: | |
| """ | |
| HIPAA-compliant audit logging | |
| Tracks all access to PHI (Protected Health Information) | |
| """ | |
| def __init__(self): | |
| self.audit_log_path = "logs/audit.log" | |
| logger.info("Audit Logger initialized") | |
| def log_access( | |
| self, | |
| user_id: str, | |
| action: str, | |
| resource: str, | |
| ip_address: str, | |
| status: str, | |
| details: Optional[Dict[str, Any]] = None | |
| ): | |
| """Log access to medical data""" | |
| try: | |
| audit_entry = { | |
| "timestamp": datetime.utcnow().isoformat(), | |
| "user_id": user_id, | |
| "action": action, | |
| "resource": resource, | |
| "ip_address": self._anonymize_ip(ip_address), | |
| "status": status, | |
| "details": details or {} | |
| } | |
| # Log to file | |
| logger.info(f"AUDIT: {json.dumps(audit_entry)}") | |
| # In production, also store in database for long-term retention | |
| except Exception as e: | |
| logger.error(f"Audit logging failed: {str(e)}") | |
| def _anonymize_ip(self, ip_address: str) -> str: | |
| """Anonymize IP address for GDPR compliance""" | |
| # Hash the last octet for IPv4 or last 80 bits for IPv6 | |
| if ':' in ip_address: | |
| # IPv6 | |
| parts = ip_address.split(':') | |
| return ':'.join(parts[:4]) + ':xxxx' | |
| else: | |
| # IPv4 | |
| parts = ip_address.split('.') | |
| return '.'.join(parts[:3]) + '.xxx' | |
| def log_phi_access( | |
| self, | |
| user_id: str, | |
| document_id: str, | |
| action: str, | |
| ip_address: str | |
| ): | |
| """Specific logging for PHI access""" | |
| self.log_access( | |
| user_id=user_id, | |
| action=f"PHI_{action}", | |
| resource=f"document:{document_id}", | |
| ip_address=ip_address, | |
| status="SUCCESS", | |
| details={"phi_accessed": True} | |
| ) | |
| class SecurityManager: | |
| """ | |
| Manages authentication, authorization, and encryption | |
| """ | |
| def __init__(self): | |
| self.audit_logger = AuditLogger() | |
| self.security_bearer = HTTPBearer(auto_error=False) | |
| logger.info("Security Manager initialized") | |
| def create_access_token(self, user_id: str, email: str) -> str: | |
| """Create JWT access token""" | |
| expire = datetime.utcnow() + timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES) | |
| payload = { | |
| "sub": user_id, | |
| "email": email, | |
| "exp": expire, | |
| "iat": datetime.utcnow() | |
| } | |
| token = jwt.encode(payload, SECRET_KEY, algorithm=ALGORITHM) | |
| return token | |
| def verify_token(self, token: str) -> Optional[Dict[str, Any]]: | |
| """Verify and decode JWT token""" | |
| try: | |
| payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM]) | |
| return payload | |
| except jwt.ExpiredSignatureError: | |
| logger.warning("Token expired") | |
| return None | |
| except jwt.JWTError as e: | |
| logger.warning(f"Token verification failed: {str(e)}") | |
| return None | |
| async def get_current_user( | |
| self, | |
| request: Request, | |
| credentials: Optional[HTTPAuthorizationCredentials] = Depends(HTTPBearer(auto_error=False)) | |
| ) -> Dict[str, Any]: | |
| """ | |
| FastAPI dependency for protected routes | |
| Validates JWT token and returns user info | |
| """ | |
| # For development/demo, allow anonymous access but log it | |
| if not credentials: | |
| logger.warning("Anonymous access - should be restricted in production") | |
| anonymous_user = { | |
| "user_id": "anonymous", | |
| "email": "[email protected]", | |
| "is_anonymous": True | |
| } | |
| # Log anonymous access | |
| client_ip = request.client.host if request.client else "unknown" | |
| self.audit_logger.log_access( | |
| user_id="anonymous", | |
| action="API_ACCESS", | |
| resource=request.url.path, | |
| ip_address=client_ip, | |
| status="WARNING_ANONYMOUS" | |
| ) | |
| return anonymous_user | |
| # Verify token | |
| token = credentials.credentials | |
| payload = self.verify_token(token) | |
| if not payload: | |
| raise HTTPException( | |
| status_code=401, | |
| detail="Invalid or expired authentication token" | |
| ) | |
| user_info = { | |
| "user_id": payload.get("sub"), | |
| "email": payload.get("email"), | |
| "is_anonymous": False | |
| } | |
| # Log authenticated access | |
| client_ip = request.client.host if request.client else "unknown" | |
| self.audit_logger.log_access( | |
| user_id=user_info["user_id"], | |
| action="API_ACCESS", | |
| resource=request.url.path, | |
| ip_address=client_ip, | |
| status="SUCCESS" | |
| ) | |
| return user_info | |
| def hash_phi_identifier(self, identifier: str) -> str: | |
| """ | |
| Hash PHI identifiers for pseudonymization | |
| Required for GDPR compliance | |
| """ | |
| return hashlib.sha256(identifier.encode()).hexdigest() | |
| def sanitize_response(self, data: Dict[str, Any]) -> Dict[str, Any]: | |
| """ | |
| Remove or redact sensitive information from API responses | |
| """ | |
| # In production, implement comprehensive PII/PHI redaction | |
| # For now, basic sanitization | |
| if "error" in data: | |
| # Don't expose internal error details | |
| data["error"] = "An error occurred during processing" | |
| return data | |
| class DataEncryption: | |
| """ | |
| Handles encryption of data at rest and in transit | |
| Required for HIPAA/GDPR compliance | |
| """ | |
| def __init__(self): | |
| # In production, use proper key management (e.g., AWS KMS, Azure Key Vault) | |
| self.encryption_key = self._load_or_generate_key() | |
| logger.info("Data Encryption initialized") | |
| def _load_or_generate_key(self) -> bytes: | |
| """Load encryption key from secure storage""" | |
| # In production, load from secure key management system | |
| # For demo, generate a key | |
| return secrets.token_bytes(32) | |
| def encrypt_data(self, data: bytes) -> bytes: | |
| """ | |
| Encrypt sensitive data using AES-256 | |
| """ | |
| # In production, implement proper AES-256 encryption | |
| # For now, return as-is (encryption would require cryptography library) | |
| logger.warning("Encryption not fully implemented - add cryptography library") | |
| return data | |
| def decrypt_data(self, encrypted_data: bytes) -> bytes: | |
| """Decrypt data""" | |
| logger.warning("Decryption not fully implemented - add cryptography library") | |
| return encrypted_data | |
| def secure_delete(self, file_path: str): | |
| """ | |
| Securely delete files containing PHI | |
| HIPAA requires secure deletion | |
| """ | |
| import os | |
| try: | |
| # In production, overwrite file multiple times before deletion | |
| if os.path.exists(file_path): | |
| # Overwrite with random data | |
| file_size = os.path.getsize(file_path) | |
| with open(file_path, 'wb') as f: | |
| f.write(secrets.token_bytes(file_size)) | |
| # Delete file | |
| os.remove(file_path) | |
| logger.info(f"Securely deleted file: {file_path}") | |
| except Exception as e: | |
| logger.error(f"Secure deletion failed: {str(e)}") | |
| class ComplianceValidator: | |
| """ | |
| Validates compliance with HIPAA and GDPR requirements | |
| """ | |
| def __init__(self): | |
| self.required_features = { | |
| "encryption_at_rest": False, # Would be True in production | |
| "encryption_in_transit": True, # HTTPS enforced | |
| "access_logging": True, | |
| "user_authentication": True, # Available but not enforced in demo | |
| "data_retention_policy": False, # Would implement in production | |
| "right_to_erasure": False, # GDPR - would implement in production | |
| "consent_management": False # Would implement in production | |
| } | |
| def check_compliance(self) -> Dict[str, Any]: | |
| """Check current compliance status""" | |
| total_features = len(self.required_features) | |
| implemented_features = sum(1 for v in self.required_features.values() if v) | |
| return { | |
| "compliance_score": f"{implemented_features}/{total_features}", | |
| "percentage": round((implemented_features / total_features) * 100, 1), | |
| "features": self.required_features, | |
| "status": "DEMO_MODE" if implemented_features < total_features else "COMPLIANT", | |
| "recommendations": self._get_recommendations() | |
| } | |
| def _get_recommendations(self) -> List[str]: | |
| """Get compliance recommendations""" | |
| recommendations = [] | |
| for feature, implemented in self.required_features.items(): | |
| if not implemented: | |
| recommendations.append( | |
| f"Implement {feature.replace('_', ' ').title()}" | |
| ) | |
| return recommendations | |
| # Global security manager instance | |
| _security_manager = None | |
| def get_security_manager() -> SecurityManager: | |
| """Get singleton security manager instance""" | |
| global _security_manager | |
| if _security_manager is None: | |
| _security_manager = SecurityManager() | |
| return _security_manager | |
| # Decorator for protected routes | |
| def require_auth(func): | |
| """Decorator to protect endpoints with authentication""" | |
| async def wrapper(*args, **kwargs): | |
| # In production, enforce authentication | |
| # For demo, log warning and allow access | |
| logger.warning(f"Protected endpoint accessed: {func.__name__}") | |
| return await func(*args, **kwargs) | |
| return wrapper | |