构建HIPAA合规数据处理管道 (FastAPI + Vault)
在医疗健康领域,处理受保护健康信息(PHI)必须符合HIPAA(Health Insurance Portability and Accountability Act)要求。本文将教你如何使用FastAPI和HashiCorp Vault构建一个安全、合规的数据处理管道。
HIPAA合规要求
HIPAA安全规则包含三大核心部分:
| 要求 | 说明 | 实现方案 |
|---|---|---|
| 行政保障 | 政策和程序 | 访问控制、培训、审计 |
| 物理保障 | 设施和设备安全 | 数据中心、设备管理 |
| 技术保障 | 技术安全措施 | 加密、认证、审计 |
技术保障关键点:
- 访问控制 - 唯一用户ID、紧急访问程序
- 审计控制 - 记录所有PHI访问活动
- 完整性 - 防止PHI被不当修改
- 传输安全 - 加密网络传输
- 加密存储 - 加密静止数据
项目架构
code
hipaa-pipeline/
├── app/
│ ├── main.py # FastAPI应用
│ ├── config/
│ │ ├── vault.py # Vault配置
│ │ └── settings.py # 应用设置
│ ├── security/
│ │ ├── vault_client.py # Vault客户端
│ │ ├── encryption.py # 加密服务
│ │ ├── auth.py # 认证中间件
│ │ └── audit.py # 审计日志
│ ├── api/
│ │ ├── phi/ # PHI数据API
│ │ └── health/ # 健康检查
│ ├── models/
│ │ ├── phi.py # PHI数据模型
│ │ └── audit.py # 审计日志模型
│ └── utils/
│ ├── validators.py # 数据验证
│ └── transformers.py # 数据转换
├── docker/
│ ├── Dockerfile
│ └── docker-compose.yml
├── scripts/
│ ├── init_vault.sh # Vault初始化脚本
│ └── setup_audit.sh # 审计设置脚本
├── requirements.txt
└── .env.example
Code collapsed
1. 项目依赖
requirements.txt
code
fastapi==0.104.1
uvicorn[standard]==0.24.0
pydantic==2.5.0
python-dotenv==1.0.0
python-jose[cryptography]==3.3.0
passlib[bcrypt]==1.7.4
pydantic-settings==2.1.0
# HashiCorp Vault
hvac==2.1.0
# 数据库
sqlalchemy==2.0.23
asyncpg==0.29.0
alembic==1.13.0
# 审计和日志
structlog==23.2.0
python-json-logger==2.0.7
# 加密
cryptography==41.0.7
# HTTP客户端
httpx==0.25.1
# 验证
email-validator==2.1.0
Code collapsed
docker-compose.yml
code
version: '3.8'
services:
vault:
image: hashicorp/vault:latest
ports:
- "8200:8200"
environment:
- VAULT_DEV_ROOT_TOKEN_ID=dev-only-token
- VAULT_DEV_LISTEN_ADDRESS=0.0.0.0:8200
cap_add:
- IPC_LOCK
volumes:
- ./vault/data:/vault/data
- ./vault/config:/vault/config
- ./scripts:/scripts
command: server -dev
api:
build: ./docker
ports:
- "8000:8000"
environment:
- VAULT_ADDR=http://vault:8200
- VAULT_TOKEN=dev-only-token
- DATABASE_URL=postgresql+asyncpg://user:pass@db:5432/phi_db
depends_on:
- vault
- db
volumes:
- ./app:/app
db:
image: postgres:15-alpine
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=pass
- POSTGRES_DB=phi_db
volumes:
- postgres_data:/var/lib/postgresql/data
ports:
- "5432:5432"
volumes:
postgres_data:
Code collapsed
2. Vault配置和初始化
scripts/init_vault.sh
code
#!/bin/bash
# Vault初始化脚本(开发环境)
export VAULT_ADDR='http://localhost:8200'
export VAULT_TOKEN='dev-only-token'
echo "初始化Vault..."
# 启用audit设备(记录所有操作)
vault audit enable file file_path=/vault/logs/audit.log
# 启用Transit加密引擎
vault secrets enable -path=transit transit
# 创建加密密钥
vault write -f transit/keys/phi-data
# 启用KV secrets引擎
vault secrets enable -path=secret kv-v2
# 配置数据库加密密钥
vault kv put secret/phi/db \
encryption_key=$(openssl rand -base64 32) \
hmac_key=$(openssl rand -base64 32)
# 创建访问策略
vault policy write phi-admin - <<EOF
# 允许管理PHI数据
path "transit/encrypt/phi-data" {
capabilities = ["create", "update"]
}
path "transit/decrypt/phi-data" {
capabilities = ["create", "update"]
}
path "secret/data/phi/*" {
capabilities = ["create", "read", "update", "delete", "list"]
}
path "sys/audit" {
capabilities = ["read", "list"]
}
EOF
# 创建应用角色
vault write auth/token/roles/phi-app \
allowed_policies=phi-admin \
token_ttl=1h \
token_max_ttl=4h
echo "Vault初始化完成!"
Code collapsed
3. Vault客户端
app/security/vault_client.py
code
import os
import hvac
from typing import Optional, Dict, Any
from app.config.settings import settings
from app.utils.logger import get_logger
logger = get_logger(__name__)
class VaultClient:
"""HashiCorp Vault客户端"""
def __init__(self):
self.client = None
self._connect()
def _connect(self):
"""连接到Vault"""
try:
self.client = hvac.Client(
url=settings.VAULT_ADDR,
token=settings.VAULT_TOKEN
)
# 验证连接
if self.client.is_authenticated():
logger.info("成功连接到Vault")
else:
raise Exception("Vault认证失败")
except Exception as e:
logger.error(f"Vault连接失败: {e}")
raise
def encrypt_data(self, plaintext: str, mount_point: str = "transit") -> str:
"""
使用Vault Transit加密数据
Args:
plaintext: 明文数据
mount_point: Transit引擎挂载点
Returns:
Base64编码的密文
"""
try:
response = self.client.secrets.transit.encrypt_data(
name=f"{mount_point}/phi-data",
plaintext=plaintext.encode('utf-8'),
context=settings.VAULT_TOKEN.encode('utf-8') # 使用token作为context
)
return response['data']['ciphertext']
except Exception as e:
logger.error(f"加密失败: {e}")
raise
def decrypt_data(self, ciphertext: str, mount_point: str = "transit") -> str:
"""
使用Vault Transit解密数据
Args:
ciphertext: 密文
mount_point: Transit引擎挂载点
Returns:
解密后的明文
"""
try:
response = self.client.secrets.transit.decrypt_data(
name=f"{mount_point}/phi-data",
ciphertext=ciphertext,
context=settings.VAULT_TOKEN.encode('utf-8')
)
return response['data']['plaintext'].decode('utf-8')
except Exception as e:
logger.error(f"解密失败: {e}")
raise
def get_database_credentials(self) -> Dict[str, str]:
"""获取数据库加密密钥"""
try:
response = self.client.secrets.kv.v2.read_secret_version(
path='phi/db',
mount_point='secret'
)
return response['data']['data']
except Exception as e:
logger.error(f"获取数据库密钥失败: {e}")
raise
def store_secret(self, path: str, data: Dict[str, Any]) -> bool:
"""存储敏感数据到Vault"""
try:
self.client.secrets.kv.v2.create_or_update_secret(
path=path,
secret=data,
mount_point='secret'
)
logger.info(f"密钥已存储到Vault: {path}")
return True
except Exception as e:
logger.error(f"存储密钥失败: {e}")
return False
def revoke_token(self, token: Optional[str] = None) -> bool:
"""撤销Vault token"""
try:
if token:
self.client.auth.token.revoke_self(token)
else:
self.client.auth.token.revoke_self()
logger.info("Vault token已撤销")
return True
except Exception as e:
logger.error(f"撤销token失败: {e}")
return False
def get_audit_logs(self) -> list:
"""获取审计日志(需要相应权限)"""
try:
# 实际需要从审计设备文件读取
# 这里返回占位符
return []
except Exception as e:
logger.error(f"获取审计日志失败: {e}")
return []
# 单例实例
vault_client = VaultClient()
Code collapsed
4. 数据加密服务
app/security/encryption.py
code
import json
import base64
from typing import Any, Dict
from cryptography.fernet import Fernet
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2
from app.security.vault_client import vault_client
from app.utils.logger import get_logger
logger = get_logger(__name__)
class EncryptionService:
"""数据加密服务"""
def __init__(self):
self.vault = vault_client
self._fernet = None
self._initialize_cipher()
def _initialize_cipher(self):
"""初始化加密器"""
try:
# 从Vault获取密钥
creds = self.vault.get_database_credentials()
key = creds['encryption_key'].encode()
# 使用Fernet对称加密
self._fernet = Fernet(key)
logger.info("加密器初始化成功")
except Exception as e:
logger.error(f"加密器初始化失败: {e}")
raise
def encrypt_phi(self, phi_data: Dict[str, Any]) -> str:
"""
加密PHI数据
Args:
phi_data: PHI数据字典
Returns:
加密后的字符串
"""
try:
# 转换为JSON
json_data = json.dumps(phi_data, ensure_ascii=False)
# 使用Fernet加密
encrypted = self._fernet.encrypt(json_data.encode())
# 返回Base64编码
return base64.b64encode(encrypted).decode()
except Exception as e:
logger.error(f"PHI加密失败: {e}")
raise
def decrypt_phi(self, encrypted_data: str) -> Dict[str, Any]:
"""
解密PHI数据
Args:
encrypted_data: 加密的数据
Returns:
解密后的PHI数据字典
"""
try:
# Base64解码
encrypted_bytes = base64.b64decode(encrypted_data)
# Fernet解密
decrypted = self._fernet.decrypt(encrypted_bytes)
# 转换回字典
return json.loads(decrypted.decode())
except Exception as e:
logger.error(f"PHI解密失败: {e}")
raise
def encrypt_field(self, value: str) -> str:
"""加密单个字段"""
encrypted = self._fernet.encrypt(value.encode())
return base64.b64encode(encrypted).decode()
def decrypt_field(self, encrypted_value: str) -> str:
"""解密单个字段"""
encrypted_bytes = base64.b64decode(encrypted_value)
decrypted = self._fernet.decrypt(encrypted_bytes)
return decrypted.decode()
# 单例实例
encryption_service = EncryptionService()
Code collapsed
5. 审计服务
app/security/audit.py
code
from datetime import datetime
from typing import Optional, Dict, Any
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.models.audit import AuditLog
from app.config.database import get_db
from app.utils.logger import get_logger
logger = get_logger(__name__)
class AuditService:
"""HIPAA审计服务"""
@staticmethod
async def log_access(
db: AsyncSession,
user_id: str,
resource_type: str,
resource_id: str,
action: str,
metadata: Optional[Dict[str, Any]] = None
):
"""
记录PHI访问日志
符合HIPAA 164.312(b)审计控制要求
"""
try:
audit_log = AuditLog(
user_id=user_id,
resource_type=resource_type,
resource_id=resource_id,
action=action,
timestamp=datetime.utcnow(),
ip_address=metadata.get('ip_address') if metadata else None,
user_agent=metadata.get('user_agent') if metadata else None,
success=metadata.get('success', True) if metadata else True,
details=metadata
)
db.add(audit_log)
await db.commit()
logger.info(
f"审计日志已记录: {user_id} {action} {resource_type}/{resource_id}"
)
except Exception as e:
logger.error(f"记录审计日志失败: {e}")
await db.rollback()
@staticmethod
async def log_disclosure(
db: AsyncSession,
user_id: str,
disclosure_type: str,
disclosed_to: str,
purpose: str,
phi_description: str
):
"""
记录PHI披露日志
符合HIPAA 164.312(a)(1)(i)披露要求
"""
try:
audit_log = AuditLog(
user_id=user_id,
resource_type='disclosure',
resource_id=disclosed_to,
action='disclose',
timestamp=datetime.utcnow(),
details={
'disclosure_type': disclosure_type,
'disclosed_to': disclosed_to,
'purpose': purpose,
'phi_description': phi_description
}
)
db.add(audit_log)
await db.commit()
logger.info(f"PHI披露已记录: {user_id} -> {disclosed_to}")
except Exception as e:
logger.error(f"记录披露日志失败: {e}")
await db.rollback()
@staticmethod
async def log_security_incident(
db: AsyncSession,
incident_type: str,
severity: str,
description: str,
affected_phi: Optional[str] = None
):
"""记录安全事件"""
try:
audit_log = AuditLog(
user_id='system',
resource_type='security_incident',
resource_id=incident_type,
action='incident',
timestamp=datetime.utcnow(),
details={
'severity': severity,
'description': description,
'affected_phi': affected_phi
}
)
db.add(audit_log)
await db.commit()
logger.warning(f"安全事件已记录: {incident_type} - {severity}")
except Exception as e:
logger.error(f"记录安全事件失败: {e}")
@staticmethod
async def get_user_audit_trail(
db: AsyncSession,
user_id: str,
start_date: datetime,
end_date: datetime
) -> list:
"""获取用户审计追踪"""
try:
result = await db.execute(
select(AuditLog).where(
AuditLog.user_id == user_id,
AuditLog.timestamp >= start_date,
AuditLog.timestamp <= end_date
).order_by(AuditLog.timestamp.desc())
)
return result.scalars().all()
except Exception as e:
logger.error(f"获取审计追踪失败: {e}")
return []
Code collapsed
6. 认证中间件
app/security/auth.py
code
from fastapi import Request, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from typing import Optional
import jwt
from datetime import datetime, timedelta
from app.config.settings import settings
from app.security.audit import AuditService
from app.utils.logger import get_logger
logger = get_logger(__name__)
security = HTTPBearer()
class AuthMiddleware:
"""HIPAA合规认证中间件"""
def __init__(self):
self.secret_key = settings.JWT_SECRET_KEY
self.algorithm = "HS256"
self.access_token_expire_minutes = 30
async def verify_token(
self,
request: Request,
credentials: HTTPAuthorizationCredentials
) -> Dict[str, Any]:
"""
验证JWT token
实现HIPAA唯一用户标识要求
"""
try:
token = credentials.credentials
payload = jwt.decode(
token,
self.secret_key,
algorithms=[self.algorithm]
)
# 检查token过期
if payload.get('exp', 0) < datetime.utcnow().timestamp():
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token已过期"
)
# 记录访问
await self._log_access(request, payload)
return payload
except jwt.ExpiredSignatureError:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="Token已过期"
)
except jwt.InvalidTokenError as e:
logger.warning(f"无效token: {e}")
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无效的认证凭证"
)
async def _log_access(self, request: Request, payload: Dict[str, Any]):
"""记录API访问"""
# 实际实现中应记录到审计日志
logger.info(
f"用户访问: {payload.get('user_id')} "
f"{request.method} {request.url.path}"
)
def create_token(
self,
user_id: str,
roles: list,
additional_claims: Optional[Dict[str, Any]] = None
) -> str:
"""
创建访问token
包含HIPAA要求的用户标识
"""
now = datetime.utcnow()
payload = {
'user_id': user_id,
'roles': roles,
'iat': now,
'exp': now + timedelta(minutes=self.access_token_expire_minutes),
'iss': 'phi-api',
'aud': 'phi-app',
**(additional_claims or {})
}
token = jwt.encode(payload, self.secret_key, algorithm=self.algorithm)
logger.info(f"为用户 {user_id} 创建token")
return token
# 全局实例
auth_middleware = AuthMiddleware()
Code collapsed
7. PHI数据模型
app/models/phi.py
code
from sqlalchemy import Column, String, DateTime, Text, Boolean
from sqlalchemy.dialects.postgresql import UUID, JSONB
from sqlalchemy.ext.declarative import declarative_base
from datetime import datetime
import uuid
Base = declarative_base()
class PatientPHI(Base):
"""患者PHI数据表"""
__tablename__ = 'patient_phi'
# 主键
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
# 加密的PHI字段
encrypted_phi = Column(Text, nullable=False) # 加密的完整PHI
encrypted_name = Column(String(255)) # 加密的姓名
encrypted_ssn = Column(String(255)) # 加密的SSN
# 哈希字段(用于查询,不存储实际值)
name_hash = Column(String(64), index=True) # 姓名哈希
ssn_hash = Column(String(64), index=True) # SSN哈希
# 访问控制
access_level = Column(String(50)) # 访问级别
created_by = Column(UUID(as_uuid=True)) # 创建者
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 审计
last_accessed_at = Column(DateTime)
last_accessed_by = Column(UUID(as_uuid=True))
access_count = Column(Integer, default=0)
# 元数据
metadata = Column(JSONB)
# 软删除
is_deleted = Column(Boolean, default=False)
deleted_at = Column(DateTime)
class AuditLog(Base):
"""审计日志表"""
__tablename__ = 'audit_logs'
id = Column(UUID(as_uuid=True), primary_key=True, default=uuid.uuid4)
user_id = Column(String(255), nullable=False, index=True)
resource_type = Column(String(100))
resource_id = Column(String(255))
action = Column(String(50))
timestamp = Column(DateTime, default=datetime.utcnow, index=True)
ip_address = Column(String(45))
user_agent = Column(Text)
success = Column(Boolean, default=True)
details = Column(JSONB)
Code collapsed
8. PHI API路由
app/api/phi/patients.py
code
from fastapi import APIRouter, Depends, HTTPException, status, Request
from sqlalchemy.ext.asyncio import AsyncSession
from typing import List
from app.config.database import get_db
from app.models.phi import PatientPHI
from app.security.auth import auth_middleware, security
from app.security.encryption import encryption_service
from app.security.audit import AuditService
from app.utils.validators import validate_phi_data
from app.schemas.phi import PatientPHICreate, PatientPHIResponse
router = APIRouter(prefix="/api/v1/phi", tags=["PHI管理"])
@router.post("/patients", response_model=PatientPHIResponse)
async def create_patient_phi(
request: Request,
patient_data: PatientPHICreate,
db: AsyncSession = Depends(get_db),
credentials=Depends(security)
):
"""
创建患者PHI记录
符合HIPAA 164.312(a)(1)访问控制和164.312(e)(1)加密要求
"""
# 验证token
user = await auth_middleware.verify_token(request, credentials)
# 验证数据
validate_phi_data(patient_data.dict())
try:
# 加密PHI数据
encrypted_phi = encryption_service.encrypt_phi({
'name': patient_data.name,
'ssn': patient_data.ssn,
'dob': patient_data.dob.isoformat(),
'address': patient_data.address,
'phone': patient_data.phone,
'email': patient_data.email,
'medical_info': patient_data.medical_info
})
# 创建记录
patient = PatientPHI(
encrypted_phi=encrypted_phi,
encrypted_name=encryption_service.encrypt_field(patient_data.name),
encrypted_ssn=encryption_service.encrypt_field(patient_data.ssn),
name_hash=hash_name(patient_data.name),
ssn_hash=hash_ssn(patient_data.ssn),
access_level='standard',
created_by=user['user_id']
)
db.add(patient)
await db.commit()
# 记录审计日志
await AuditService.log_access(
db,
user_id=user['user_id'],
resource_type='patient_phi',
resource_id=str(patient.id),
action='create',
metadata={
'ip_address': request.client.host,
'user_agent': request.headers.get('user-agent')
}
)
return PatientPHIResponse(
id=str(patient.id),
created_at=patient.created_at,
message="PHI记录已安全创建"
)
except Exception as e:
await db.rollback()
logger.error(f"创建PHI记录失败: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="创建PHI记录失败"
)
@router.get("/patients/{patient_id}")
async def get_patient_phi(
patient_id: str,
request: Request,
db: AsyncSession = Depends(get_db),
credentials=Depends(security)
):
"""读取患者PHI记录"""
# 验证token
user = await auth_middleware.verify_token(request, credentials)
# 检查访问权限
if not await check_access_permission(user['user_id'], patient_id):
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="无权访问此PHI记录"
)
try:
# 获取记录
result = await db.execute(
select(PatientPHI).where(
PatientPHI.id == patient_id,
PatientPHI.is_deleted == False
)
)
patient = result.scalar_one_or_none()
if not patient:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="PHI记录不存在"
)
# 解密PHI
phi_data = encryption_service.decrypt_phi(patient.encrypted_phi)
# 更新访问统计
patient.last_accessed_at = datetime.utcnow()
patient.last_accessed_by = user['user_id']
patient.access_count += 1
await db.commit()
# 记录访问
await AuditService.log_access(
db,
user_id=user['user_id'],
resource_type='patient_phi',
resource_id=patient_id,
action='read',
metadata={
'ip_address': request.client.host,
'user_agent': request.headers.get('user-agent')
}
)
return phi_data
except HTTPException:
raise
except Exception as e:
logger.error(f"读取PHI记录失败: {e}")
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail="读取PHI记录失败"
)
Code collapsed
部署检查清单
HIPAA合规部署
- 所有PHI数据使用Vault Transit加密
- 启用Vault审计日志
- 配置TLS 1.3进行传输加密
- 实施基于角色的访问控制(RBAC)
- 启用完整的审计日志记录
- 定期备份加密密钥
- 实施最小权限原则
- 配置自动密钥轮换
- 建立业务连续性计划
- 签署BAA协议(如使用第三方服务)
通过本教程,你已掌握使用FastAPI和HashiCorp Vault构建符合HIPAA标准的数据处理管道的核心技术。这个架构可确保PHI数据的安全性和合规性。