Python同态加密健康数据教程:隐私保护机器学习
概述
健康数据是高度敏感的个人信息。传统的数据保护方法需要在数据使用时解密,这带来了隐私泄露风险。
同态加密(Homomorphic Encryption)是一种革命性的加密技术,允许在加密数据上直接进行计算,而无需先解密。
核心优势:
- 云端计算,本地解密
- 数据永不以明文形式暴露
- 符合HIPAA、GDPR等法规要求
同态加密基础
什么是同态加密?
code
传统加密:
明文数据 → 加密 → 密文存储
使用时: 密文 → 解密 → 明文 → 计算 → 结果
↑ 隐私泄露风险
同态加密:
明文数据 → 加密 → 密文
计算: 密文 + 密文 → 加密结果 → 解密 → 明文结果
↑ 全程加密保护
Code collapsed
同态加密类型
| 类型 | 支持运算 | 限制 | 应用场景 |
|---|---|---|---|
| 部分同态 | 加法或乘法之一 | 效率高 | 简单统计、投票系统 |
| 些许同态 | 加法和有限次乘法 | 乘法次数受限 | 简单机器学习 |
| 全同态(FHE) | 无限次加法和乘法 | 计算开销大 | 任意复杂计算 |
环境设置
安装依赖
code
# Microsoft SEAL(全同态加密库)
pip install seal
# PySyft(PyTorch隐私保护)
pip install syft
# TenSEAL( SEAL的Python接口)
pip install tenseal
# PyPaillier(加法同态)
pip install phe
Code collapsed
基础配置
code
import tenseal as ts
import numpy as np
import pandas as pd
from typing import List
# 配置同态加密参数
def setup_ckks_context():
"""
CKKS方案配置(支持浮点数运算)
适合机器学习应用
"""
# 参数说明:
# poly_modulus_degree: 多项式模数度(越大越安全但越慢)
# coeff_mod_bit_sizes: 系数模数位大小
# scale: 缩放因子(控制精度)
context = ts.context(
ts.SCHEME_TYPE.CKKS,
poly_modulus_degree=8192,
coeff_mod_bit_sizes=[60, 40, 40, 60]
)
# 设置全局缩放
context.global_scale = 2**40
# 生成密钥
context.generate_galois_keys()
return context
# 初始化加密上下文
enc_context = setup_ckks_context()
Code collapsed
基础加密操作
加密与解密
code
class HealthDataEncryptor:
def __init__(self, context):
self.context = context
# 生成公钥/私钥对
self.public_key = context
self.secret_key = context.secret_key()
def encrypt_value(self, value: float) -> ts.CKKSVector:
"""加密单个数值"""
encrypted = ts.ckks_vector(
self.context,
[value]
)
return encrypted
def encrypt_array(self, values: List[float]) -> ts.CKKSVector:
"""加密数值数组"""
encrypted = ts.ckks_vector(
self.context,
values
)
return encrypted
def decrypt(self, encrypted: ts.CKKSVector) -> np.ndarray:
"""解密"""
decrypted = encrypted.decrypt(self.secret_key)
return np.array(decrypted)
def save_keys(self, public_path: str, secret_path: str):
"""保存密钥到文件"""
self.public_key.save(public_path)
self.secret_key.save(secret_path)
def load_keys(self, public_path: str, secret_path: str):
"""从文件加载密钥"""
self.public_key = ts.context_from(public_path)
self.secret_key = ts.secret_key_from(secret_path)
# 使用示例
encryptor = HealthDataEncryptor(enc_context)
# 加密健康指标
heart_rate = encryptor.encrypt_value(72.5)
blood_pressure = encryptor.encrypt_array([120, 80])
# 解密验证
decrypted_hr = encryptor.decrypt(heart_rate)
print(f"心率: {decrypted_hr[0]:.1f} bpm")
Code collapsed
密文计算
基础运算
code
class SecureHealthCalculator:
"""安全健康计算器"""
def __init__(self, encryptor):
self.encryptor = encryptor
def calculate_bmi(self, height_cm: float, weight_kg: float):
"""
计算BMI(密文计算)
BMI = weight(kg) / height(m)²
"""
# 加密输入
enc_height = self.encryptor.encrypt_value(height_cm / 100) # 转为米
enc_weight = self.encryptor.encrypt_value(weight_kg)
# 密文计算
enc_height_squared = enc_height * enc_height
enc_bmi = enc_weight / enc_height_squared
# 解密结果
bmi = self.encryptor.decrypt(enc_bmi)[0]
return bmi
def calculate_average_heart_rate(self, heart_rates: List[float]):
"""计算平均心率(密文)"""
# 加密所有心率值
enc_rates = [
self.encryptor.encrypt_value(hr)
for hr in heart_rates
]
# 密文求和
enc_sum = enc_rates[0]
for rate in enc_rates[1:]:
enc_sum = enc_sum + rate
# 密文除法
enc_count = self.encryptor.encrypt_value(len(heart_rates))
enc_avg = enc_sum / enc_count
# 解密
avg = self.encryptor.decrypt(enc_avg)[0]
return avg
def calculate_daily_calories(self, steps: float, weight_kg: float):
"""
计算每日消耗卡路里
简化公式: calories = steps * 0.04 + weight * 0.5
"""
enc_steps = self.encryptor.encrypt_value(steps)
enc_weight = self.encryptor.encrypt_value(weight_kg)
# 密文计算
enc_step_cals = enc_steps * 0.04
enc_weight_cals = enc_weight * 0.5
enc_total = enc_step_cals + enc_weight_cals
# 解密
total_cals = self.encryptor.decrypt(enc_total)[0]
return total_cals
# 使用计算器
calculator = SecureHealthCalculator(encryptor)
# 计算BMI
bmi = calculator.calculate_bmi(175, 70)
print(f"BMI: {bmi:.1f}")
# 计算平均心率
avg_hr = calculator.calculate_average_heart_rate([72, 75, 70, 73, 71])
print(f"平均心率: {avg_hr:.1f} bpm")
# 计算卡路里
cals = calculator.calculate_daily_calories(10000, 70)
print(f"消耗卡路里: {cals:.0f} kcal")
Code collapsed
聚合统计
code
class SecureHealthStatistics:
"""安全健康统计分析"""
def __init__(self, encryptor):
self.encryptor = encryptor
def calculate_statistics(self, values: List[float]):
"""
计算均值、方差、标准差(密文)
"""
n = len(values)
# 加密所有值
enc_values = [
self.encryptor.encrypt_value(v)
for v in values
]
# 计算均值
enc_sum = enc_values[0]
for v in enc_values[1:]:
enc_sum = enc_sum + v
enc_n = self.encryptor.encrypt_value(n)
enc_mean = enc_sum / enc_n
# 计算方差: E[X²] - (E[X])²
enc_squared = [v * v for v in enc_values]
enc_sum_squared = enc_squared[0]
for s in enc_squared[1:]:
enc_sum_squared = enc_sum_squared + s
enc_mean_squared = enc_sum_squared / enc_n
enc_variance = enc_mean_squared - (enc_mean * enc_mean)
# 标准差
enc_std = ts.CKKSVector(
enc_variance.context(),
[0] # 占位
)
# 注意: CKKS不支持平方根,需要近似或使用其他方案
# 解密
mean = self.encryptor.decrypt(enc_mean)[0]
variance = self.encryptor.decrypt(enc_variance)[0]
return {
'mean': mean,
'variance': variance,
'std': np.sqrt(variance) # 明文计算平方根
}
# 使用统计模块
stats = SecureHealthStatistics(encryptor)
# 分析血压数据
blood_pressures = [120, 118, 122, 119, 121, 120, 123]
results = stats.calculate_statistics(blood_pressures)
print(f"收缩压统计:")
print(f" 均值: {results['mean']:.1f} mmHg")
print(f" 方差: {results['variance']:.2f}")
print(f" 标准差: {results['std']:.2f}")
Code collapsed
隐私保护机器学习
加密线性回归
code
class SecureLinearRegression:
"""加密线性回归"""
def __init__(self, encryptor):
self.encryptor = encryptor
self.weights = None
def train(self, X: np.ndarray, y: np.ndarray):
"""
训练线性回归模型(明文)
实际应用中应在可信环境中训练
"""
# 最小二乘法
X_with_bias = np.column_stack([np.ones(len(X)), X])
self.weights = np.linalg.inv(X_with_bias.T @ X_with_bias) @ X_with_bias.T @ y
return self.weights
def encrypt_model(self):
"""加密模型参数"""
if self.weights is None:
raise ValueError("模型未训练")
enc_weights = [
self.encryptor.encrypt_value(w)
for w in self.weights
]
return enc_weights
def predict_encrypted(self, enc_weights, x_new: float):
"""
使用加密模型进行预测
y = w0 + w1 * x
"""
enc_x = self.encryptor.encrypt_value(x_new)
# y = w0 + w1 * x
enc_prediction = enc_weights[0] + enc_weights[1] * enc_x
return enc_prediction
def predict(self, x_new: float):
"""明文预测(用于验证)"""
if self.weights is None:
raise ValueError("模型未训练")
return self.weights[0] + self.weights[1] * x_new
# 示例:预测卡路里消耗 vs 步数
regression = SecureLinearRegression(encryptor)
# 训练数据(明文)
steps = np.array([5000, 7000, 10000, 12000, 15000])
calories = np.array([250, 320, 450, 530, 640])
# 训练模型
weights = regression.train(steps, calories)
print(f"模型权重: {weights}")
# 加密模型
enc_weights = regression.encrypt_model()
# 使用加密模型预测
new_steps = 8000
enc_prediction = regression.predict_encrypted(enc_weights, new_steps)
predicted_cals = encryptor.decrypt(enc_prediction)[0]
print(f"预测: {new_steps}步 → {predicted_cals:.0f}卡路里")
Code collapsed
安全模型部署
code
class SecureHealthModelServer:
"""安全健康模型服务器"""
def __init__(self, model_path: str, context):
self.context = context
self.secret_key = context.secret_key()
self.load_model(model_path)
def load_model(self, path: str):
"""加载加密模型"""
# 实际应用中从文件加载
pass
def predict(self, encrypted_data: ts.CKKSVector):
"""
对加密数据进行预测
服务器始终不接触明文数据
"""
# 使用加密模型进行计算
encrypted_result = self._compute(encrypted_data)
return encrypted_result
def _compute(self, data):
"""实际计算逻辑"""
# 示例: 简单线性变换
# result = data * 2 + 10
return data * 2 + 10
# 客户端使用
class SecureHealthClient:
"""安全健康客户端"""
def __init__(self, server: SecureHealthModelServer, context):
self.server = server
self.context = context
self.public_key = context
self.secret_key = context.secret_key()
def query_prediction(self, health_value: float):
"""
查询预测(隐私保护)
"""
# 1. 本地加密数据
encrypted_data = ts.ckks_vector(
self.context,
[health_value]
)
# 2. 发送加密数据到服务器
encrypted_result = self.server.predict(encrypted_data)
# 3. 本地解密结果
result = encrypted_result.decrypt(self.secret_key)[0]
return result
# 使用示例
server = SecureHealthModelServer("model.bin", enc_context)
client = SecureHealthClient(server, enc_context)
# 客户端查询(服务器无法看到实际值)
prediction = client.query_prediction(80.5)
print(f"预测结果: {prediction:.1f}")
Code collapsed
实际应用场景
场景1:云端健康分析
code
class CloudHealthAnalyzer:
"""云端健康数据分析服务"""
def __init__(self):
self.context = setup_ckks_context()
self.encryptor = HealthDataEncryptor(self.context)
def analyze_user_data(self, encrypted_data: dict):
"""
分析用户健康数据(全程加密)
输入: 加密的健康指标
输出: 加密的分析结果
"""
results = {}
# 1. 计算BMI
if 'height' in encrypted_data and 'weight' in encrypted_data:
enc_bmi = self._calculate_bmi_encrypted(
encrypted_data['height'],
encrypted_data['weight']
)
results['bmi'] = enc_bmi
# 2. 计算健康评分
# (简化版,实际应更复杂)
if 'steps' in encrypted_data:
enc_score = self._calculate_activity_score(
encrypted_data['steps']
)
results['activity_score'] = enc_score
return results
def _calculate_bmi_encrypted(self, enc_height, enc_weight):
"""加密BMI计算"""
enc_height_m = enc_height * 0.01 # cm to m
enc_height_sq = enc_height_m * enc_height_m
return enc_weight / enc_height_sq
def _calculate_activity_score(self, enc_steps):
"""加密活动评分"""
# 简化评分: 步数 / 100
return enc_steps / 100
# 使用示例
analyzer = CloudHealthAnalyzer()
# 用户端加密数据
user_height = encryptor.encrypt_value(175)
user_weight = encryptor.encrypt_value(70)
user_steps = encryptor.encrypt_value(8500)
encrypted_data = {
'height': user_height,
'weight': user_weight,
'steps': user_steps
}
# 发送到云端分析(数据全程加密)
results = analyzer.analyze_user_data(encrypted_data)
# 用户端解密结果
bmi = encryptor.decrypt(results['bmi'])[0]
score = encryptor.decrypt(results['activity_score'])[0]
print(f"分析结果:")
print(f" BMI: {bmi:.1f}")
print(f" 活动评分: {score:.1f}")
Code collapsed
场景2:多方安全研究
code
class MultiPartyHealthResearch:
"""
多方安全健康研究
场景: 多家医院协作研究,但无法共享原始数据
"""
def __init__(self, context):
self.context = context
self.encryptor = HealthDataEncryptor(context)
self.participant_data = {}
def add_participant_data(self, hospital_id: str,
encrypted_stats: ts.CKKSVector):
"""
参与方添加加密统计数据
每个医院只贡献聚合后的加密统计
"""
self.participant_data[hospital_id] = encrypted_stats
def compute_global_statistics(self):
"""
计算全局统计(所有医院)
输入各医院的加密统计,输出加密的全局统计
"""
if not self.participant_data:
raise ValueError("没有参与者数据")
# 获取第一个参与者的数据
enc_global_sum = list(self.participant_data.values())[0]
# 密文聚合(加法同态)
for enc_data in list(self.participant_data.values())[1:]:
enc_global_sum = enc_global_sum + enc_data
# 计算均值
n = len(self.participant_data)
enc_n = self.encryptor.encrypt_value(n)
enc_mean = enc_global_sum / enc_n
return enc_mean
def participant_contribute(self, hospital_id: str,
local_data: List[float]):
"""
参与方贡献本地数据
"""
# 本地聚合统计
local_sum = sum(local_data)
# 加密后上传
enc_sum = self.encryptor.encrypt_value(local_sum)
self.add_participant_data(hospital_id, enc_sum)
# 使用示例
research = MultiPartyHealthResearch(enc_context)
# 各医院贡献加密数据
hospital_a = [70, 72, 68, 71, 69] # 平均血压
hospital_b = [122, 118, 125, 120, 119]
hospital_c = [75, 78, 73, 76, 74]
# 各医院本地加密并上传
research.participant_contribute('hospital_a', hospital_a)
research.participant_contribute('hospital_b', hospital_b)
research.participant_contribute('hospital_c', hospital_c)
# 计算全局统计(全程加密)
enc_global_mean = research.compute_global_statistics()
# 解密全局结果(可以由独立第三方解密)
global_mean = encryptor.decrypt(enc_global_mean)[0]
print(f"全局平均血压: {global_mean:.1f} mmHg")
# 服务器/其他医院无法知道各医院的原始数据
Code collapsed
关键要点
- 同态加密实现隐私计算:数据全程加密
- CKKS方案适合机器学习:支持浮点数运算
- 计算效率是挑战:比明文计算慢100-1000倍
- 需要密钥管理:安全的密钥存储和分发
- 应用场景特定:适合高隐私要求场景
常见问题
同态加密有多慢?
计算开销约100-1000倍明文计算。优化策略:
- 减少计算深度
- 使用批处理(SIMD)
- 选择合适的参数配置
为什么不用标准加密(AES)?
标准加密不支持密文计算。使用同态加密可以:
- 云端分析,本地解密
- 多方协作,互不信任
- 符合隐私法规要求
适合哪些应用?
高价值场景:
- 医疗研究数据分析
- 金融风险评估
- 投票系统
- 隐私保护AI
不适合:
- 实时性要求高
- 数据不敏感
- 成本敏感
参考资料
- Microsoft SEAL文档
- TenSEAL教程
- 同态加密标准(ISO/IEC)
- 隐私保护机器学习综述
发布日期:2026年3月8日 最后更新:2026年3月8日