Python健康数据异常检测:机器学习识别异常模式
概述
健康数据中的异常值可能预示重要事件:
- 心率突增 → 压力/心律失常
- 血压骤升 → 高血压危象
- 睡眠骤降 → 睡眠障碍
- 步数减少 → 活动下降/疾病
异常检测(Anomaly Detection)能自动识别这些偏离正常模式的数据点。
异常检测方法
常用算法
| 算法 | 适用场景 | 优点 | 缺点 |
|---|---|---|---|
| Z-Score | 单变量,正态分布 | 简单快速 | 只适用于正态分布 |
| IQR | 单变量,任意分布 | 鲁棒性强 | 不考虑时序 |
| Isolation Forest | 多变量 | 高效准确 | 需要较多数据 |
| LSTM-AE | 时间序列 | 捕捉时序依赖 | 训练复杂 |
| One-Class SVM | 少样本 | 无需正常数据 | 参数敏感 |
环境设置
安装依赖
code
pip install numpy
pip install pandas
pip install scikit-learn
pip install tensorflow
pip install matplotlib
pip install seaborn
Code collapsed
数据准备
模拟健康数据
code
# data_generation.py
import numpy as np
import pandas as pd
from datetime import datetime, timedelta
def generate_health_data(days=180, anomaly_rate=0.05):
"""
生成模拟健康数据
参数:
days: 数据天数
anomaly_rate: 异常值比例
返回:
健康数据DataFrame
"""
np.random.seed(42)
# 日期范围
dates = pd.date_range(
start=datetime.now() - timedelta(days=days),
periods=days,
freq='D'
)
# 1. 心率数据 (bpm)
# 基础心率 + 日变化 + 随机波动
base_hr = 70
daily_variation = 5 * np.sin(2 * np.pi * np.arange(days) / 7)
random_noise = np.random.normal(0, 3, days)
heart_rate = base_hr + daily_variation + random_noise
# 2. 血压数据
base_sbp = 120 # 收缩压
base_dbp = 80 # 舒张压
sbp = base_sbp + np.random.normal(0, 5, days)
dbp = base_dbp + np.random.normal(0, 3, days)
# 3. 步数数据
base_steps = 8000
weekday_steps = 10000
weekend_steps = 5000
steps = []
for i, date in enumerate(dates):
if date.dayofweek >= 5: # 周末
steps.append(weekend_steps + np.random.normal(0, 1000))
else: # 工作日
steps.append(weekday_steps + np.random.normal(0, 1500))
# 4. 睡眠时长 (小时)
base_sleep = 7.5
sleep = base_sleep + np.random.normal(0, 1, days)
# 5. 体重 (kg)
base_weight = 70
weight = base_weight + np.cumsum(np.random.normal(0, 0.1, days))
# 创建DataFrame
df = pd.DataFrame({
'date': dates,
'heart_rate': heart_rate,
'sbp': sbp,
'dbp': dbp,
'steps': steps,
'sleep_hours': sleep,
'weight': weight
})
# 添加异常值
num_anomalies = int(days * anomaly_rate)
anomaly_indices = np.random.choice(days, num_anomalies, replace=False)
for idx in anomaly_indices:
anomaly_type = np.random.choice(['hr', 'bp', 'sleep', 'steps'])
if anomaly_type == 'hr':
# 心率异常
df.loc[idx, 'heart_rate'] += np.random.choice([-30, 30])
elif anomaly_type == 'bp':
# 血压异常
df.loc[idx, 'sbp'] += np.random.choice([-20, 20])
df.loc[idx, 'dbp'] += np.random.choice([-10, 10])
elif anomaly_type == 'sleep':
# 睡眠异常
df.loc[idx, 'sleep_hours'] += np.random.choice([-3, -4, -5])
elif anomaly_type == 'steps':
# 步数异常
df.loc[idx, 'steps'] *= np.random.choice([0.2, 0.3])
return df
# 生成数据
health_data = generate_health_data(days=180, anomaly_rate=0.05)
print(f"数据形状: {health_data.shape}")
print(health_data.head())
Code collapsed
统计方法异常检测
1. Z-Score方法
code
# zscore_detector.py
from scipy import stats
import numpy as np
class ZScoreAnomalyDetector:
"""Z-Score异常检测器"""
def __init__(self, threshold: float = 3.0):
"""
参数:
threshold: Z-Score阈值(通常为3)
"""
self.threshold = threshold
self.mean = None
self.std = None
def fit(self, data: np.ndarray):
"""拟合正常数据"""
self.mean = np.mean(data)
self.std = np.std(data)
return self
def detect(self, data: np.ndarray) -> np.ndarray:
"""
检测异常
返回:
布尔数组,True表示异常
"""
z_scores = np.abs((data - self.mean) / self.std)
return z_scores > self.threshold
def get_anomaly_scores(self, data: np.ndarray) -> np.ndarray:
"""获取异常分数"""
return np.abs((data - self.mean) / self.std)
# 使用示例
detector = ZScoreAnomalyDetector(threshold=3)
detector.fit(health_data['heart_rate'].values)
anomalies = detector.detect(health_data['heart_rate'].values)
anomaly_dates = health_data[anomalies]['date']
print(f"检测到{anomalies.sum()}个心率异常:")
print(anomaly_dates)
Code collapsed
2. IQR方法
code
# iqr_detector.py
class IQRAnomalyDetector:
"""IQR(四分位距)异常检测器"""
def __init__(self, multiplier: float = 1.5):
"""
参数:
multiplier: IQR倍数(通常为1.5)
"""
self.multiplier = multiplier
self.q1 = None
self.q3 = None
self.iqr = None
self.lower_bound = None
self.upper_bound = None
def fit(self, data: np.ndarray):
"""拟合数据"""
self.q1 = np.percentile(data, 25)
self.q3 = np.percentile(data, 75)
self.iqr = self.q3 - self.q1
self.lower_bound = self.q1 - self.multiplier * self.iqr
self.upper_bound = self.q3 + self.multiplier * self.iqr
return self
def detect(self, data: np.ndarray) -> np.ndarray:
"""检测异常"""
return (data < self.lower_bound) | (data > self.upper_bound)
def get_bounds(self):
"""获取异常边界"""
return {
'lower': self.lower_bound,
'upper': self.upper_bound
}
# 使用示例
detector = IQRAnomalyDetector(multiplier=1.5)
detector.fit(health_data['steps'].values)
anomalies = detector.detect(health_data['steps'].values)
bounds = detector.get_bounds()
print(f"正常范围: {bounds['lower']:.0f} - {bounds['upper']:.0f} 步")
print(f"检测到{anomalies.sum()}个异常")
Code collapsed
机器学习方法
1. Isolation Forest
code
# isolation_forest_detector.py
from sklearn.ensemble import IsolationForest
import numpy as np
class IsolationForestAnomalyDetector:
"""Isolation Forest异常检测器"""
def __init__(self, contamination: float = 0.05):
"""
参数:
contamination: 预期异常比例
"""
self.contamination = contamination
self.model = IsolationForest(
contamination=contamination,
random_state=42
)
def fit(self, X: np.ndarray):
"""
拟合模型
参数:
X: 特征矩阵 (n_samples, n_features)
"""
self.model.fit(X)
return self
def detect(self, X: np.ndarray) -> np.ndarray:
"""
检测异常
返回:
-1表示异常,1表示正常
"""
predictions = self.model.predict(X)
return predictions == -1
def get_anomaly_scores(self, X: np.ndarray) -> np.ndarray:
"""
获取异常分数
返回:
越低越异常
"""
return self.model.score_samples(X)
# 多变量异常检测
# 准备特征
features = ['heart_rate', 'sbp', 'dbp', 'steps', 'sleep_hours']
X = health_data[features].values
# 检测
detector = IsolationForestAnomalyDetector(contamination=0.05)
detector.fit(X)
anomalies = detector.detect(X)
anomaly_scores = detector.get_anomaly_scores(X)
# 结果
health_data['is_anomaly'] = anomalies
health_data['anomaly_score'] = anomaly_scores
print(f"检测到{anomalies.sum()}个多变量异常:")
print(health_data[anomalies][['date'] + features])
Code collapsed
2. One-Class SVM
code
# svm_detector.py
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler
class OneClassSVMDetector:
"""One-Class SVM异常检测器"""
def __init__(self, nu: float = 0.05):
"""
参数:
nu: 预期异常比例上限
"""
self.nu = nu
self.scaler = StandardScaler()
self.model = OneClassSVM(nu=nu, kernel='rbf')
def fit(self, X: np.ndarray):
"""拟合模型"""
X_scaled = self.scaler.fit_transform(X)
self.model.fit(X_scaled)
return self
def detect(self, X: np.ndarray) -> np.ndarray:
"""检测异常"""
X_scaled = self.scaler.transform(X)
predictions = self.model.predict(X_scaled)
return predictions == -1
# 使用示例
detector = OneClassSVMDetector(nu=0.05)
detector.fit(X)
anomalies = detector.detect(X)
print(f"检测到{anomalies.sum()}个异常")
Code collapsed
时间序列异常检测
LSTM Autoencoder
code
# lstm_autoencoder.py
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np
class LSTMAutoencoderDetector:
"""LSTM自编码器异常检测器"""
def __init__(self, sequence_length: int = 7):
"""
参数:
sequence_length: 输入序列长度(天)
"""
self.sequence_length = sequence_length
self.model = None
self.threshold = None
def prepare_sequences(self, data: np.ndarray) -> np.ndarray:
"""
准备时间序列数据
参数:
data: 1D数组
返回:
3D数组 (samples, sequence_length, features)
"""
sequences = []
for i in range(len(data) - self.sequence_length):
sequences.append(data[i:i + self.sequence_length])
return np.array(sequences)
def build_model(self, input_shape: tuple):
"""构建LSTM自编码器"""
self.model = keras.Sequential([
# 编码器
layers.LSTM(64, activation='relu',
input_shape=input_shape,
return_sequences=True),
layers.LSTM(32, activation='relu', return_sequences=False),
layers.RepeatVector(input_shape[0]),
# 解码器
layers.LSTM(32, activation='relu', return_sequences=True),
layers.LSTM(64, activation='relu', return_sequences=True),
layers.TimeDistributed(layers.Dense(1))
])
self.model.compile(optimizer='adam', loss='mae')
return self.model
def fit(self, data: np.ndarray, epochs: int = 50):
"""
训练模型
参数:
data: 原始数据(1D数组)
epochs: 训练轮数
"""
# 准备数据
sequences = self.prepare_sequences(data)
X = sequences[:, :-1, :] # 除最后一个时间步
y = sequences[:, 1:, :] # 除第一个时间步
# 构建并训练模型
input_shape = (self.sequence_length - 1, 1)
self.build_model(input_shape)
history = self.model.fit(
X, y,
epochs=epochs,
batch_size=32,
validation_split=0.1,
verbose=0
)
# 计算阈值(使用训练集的MAE)
train_predictions = self.model.predict(X)
train_mae = np.mean(np.abs(train_predictions - y), axis=(1, 2))
self.threshold = np.mean(train_mae) + 3 * np.std(train_mae)
return history
def detect(self, data: np.ndarray) -> tuple:
"""
检测异常
返回:
(anomalies, reconstruction_errors)
"""
# 准备数据
sequences = self.prepare_sequences(data)
X = sequences[:, :-1, :]
y = sequences[:, 1:, :]
# 预测
predictions = self.model.predict(X)
# 计算重构误差
mae = np.mean(np.abs(predictions - y), axis=(1, 2))
# 检测异常
anomalies = mae > self.threshold
return anomalies, mae
# 使用示例
detector = LSTMAutoencoderDetector(sequence_length=7)
# 使用心率数据训练
hr_data = health_data['heart_rate'].values
detector.fit(hr_data, epochs=50)
# 检测异常
anomalies, errors = detector.detect(hr_data)
print(f"检测到{anomalies.sum()}个时间序列异常")
Code collapsed
可视化
异常可视化
code
# visualizer.py
import matplotlib.pyplot as plt
import seaborn as sns
def plot_anomalies(data, variable, anomalies, title):
"""绘制异常数据"""
plt.figure(figsize=(15, 6))
# 正常数据
normal_mask = ~anomalies
plt.scatter(data[normal_mask]['date'],
data[normal_mask][variable],
c='blue', label='正常', alpha=0.6)
# 异常数据
plt.scatter(data[anomalies]['date'],
data[anomalies][variable],
c='red', s=100, label='异常', marker='x')
plt.xlabel('日期')
plt.ylabel(variable)
plt.title(title)
plt.legend()
plt.xticks(rotation=45)
plt.tight_layout()
plt.savefig(f'{variable}_anomalies.png')
plt.close()
# 绘制心率异常
plot_anomalies(health_data, 'heart_rate',
anomalies, '心率异常检测')
# 绘制步数异常
detector = IQRAnomalyDetector()
detector.fit(health_data['steps'].values)
steps_anomalies = detector.detect(health_data['steps'].values)
plot_anomalies(health_data, 'steps',
steps_anomalies, '步数异常检测')
Code collapsed
多变量异常热图
code
def plot_multivariate_anomalies(data, anomaly_scores):
"""绘制多变量异常分数热图"""
# 按日期排序
data_sorted = data.sort_values('date')
# 创建热图数据
variables = ['heart_rate', 'sbp', 'dbp', 'steps', 'sleep_hours']
heatmap_data = data_sorted[variables].T
# 标准化
heatmap_data = (heatmap_data - heatmap_data.mean(axis=1, keepdims=True)) / \
heatmap_data.std(axis=1, keepdims=True)
# 绘制热图
plt.figure(figsize=(20, 6))
sns.heatmap(heatmap_data,
cmap='RdYlBu_r',
cbar_kws={'label': '标准化值'},
xticklabels=False,
yticklabels=variables)
# 标记异常
anomaly_indices = np.where(anomalies)[0]
for idx in anomaly_indices:
plt.axvline(x=idx, color='red', alpha=0.3, linewidth=1)
plt.title('多变量健康数据异常检测')
plt.xlabel('日期')
plt.tight_layout()
plt.savefig('multivariate_anomalies.png')
plt.close()
Code collapsed
实时监测系统
异常预警
code
# alert_system.py
from typing import Dict, List
import smtplib
from email.mime.text import MIMEText
class HealthAnomalyAlertSystem:
"""健康异常预警系统"""
def __init__(self, email_config: Dict = None):
"""
参数:
email_config: 邮件配置 {'smtp_server', 'port', 'username', 'password'}
"""
self.email_config = email_config
self.alert_history = []
def check_and_alert(self, new_data: Dict, detectors: Dict):
"""
检查新数据并发出警报
参数:
new_data: 新的健康数据 {'heart_rate': 75, 'sbp': 120, ...}
detectors: 检测器字典 {'heart_rate': detector_obj, ...}
"""
alerts = []
for variable, value in new_data.items():
if variable in detectors:
detector = detectors[variable]
# 检测是否异常
is_anomaly = detector.detect(np.array([value]))[0]
if is_anomaly:
alert = self._create_alert(variable, value, detector)
alerts.append(alert)
# 发送警报
if alerts:
self._send_alerts(alerts)
return alerts
def _create_alert(self, variable: str, value: float, detector) -> Dict:
"""创建警报"""
alert = {
'variable': variable,
'value': value,
'timestamp': datetime.now(),
'severity': self._get_severity(variable, value, detector),
'message': self._get_alert_message(variable, value, detector)
}
self.alert_history.append(alert)
return alert
def _get_severity(self, variable: str, value: float, detector) -> str:
"""确定严重程度"""
# 简化版严重程度判断
if variable == 'heart_rate':
if value > 120 or value < 50:
return 'high'
elif value > 100 or value < 55:
return 'medium'
elif variable == 'sbp':
if value > 160 or value < 90:
return 'high'
elif value > 140 or value < 100:
return 'medium'
elif variable == 'sleep_hours':
if value < 4:
return 'high'
elif value < 5:
return 'medium'
return 'low'
def _get_alert_message(self, variable: str, value: float, detector) -> str:
"""生成警报消息"""
messages = {
'heart_rate': f"⚠️ 心率异常: {value:.0f} bpm",
'sbp': f"⚠️ 收缩压异常: {value:.0f} mmHg",
'dbp': f"⚠️ 舒张压异常: {value:.0f} mmHg",
'steps': f"⚠️ 步数异常: {value:.0f} 步",
'sleep_hours': f"⚠️ 睡眠不足: {value:.1f} 小时"
}
base_message = messages.get(variable, f"⚠️ {variable}异常: {value}")
# 添加建议
if variable == 'heart_rate' and value > 100:
base_message += "\n建议: 深呼吸、放松、休息"
elif variable == 'sleep_hours' and value < 6:
base_message += "\n建议: 今晚早点休息,保证睡眠"
return base_message
def _send_alerts(self, alerts: List[Dict]):
"""发送警报"""
# 过滤高严重度警报
high_severity = [a for a in alerts if a['severity'] == 'high']
if high_severity and self.email_config:
self._send_email_alert(high_severity)
# 推送通知(需要移动端SDK)
# self._send_push_notification(alerts)
def _send_email_alert(self, alerts: List[Dict]):
"""发送邮件警报"""
if not self.email_config:
return
# 构建邮件内容
subject = f"⚠️ 健康异常警报 ({len(alerts)}项)"
body = "检测到以下健康异常:\n\n"
for alert in alerts:
body += f"{alert['message']}\n"
body += f"时间: {alert['timestamp'].strftime('%Y-%m-%d %H:%M')}\n\n"
body += "\n请关注您的健康状况。"
# 发送邮件
msg = MIMEText(body)
msg['Subject'] = subject
msg['From'] = self.email_config['username']
msg['To'] = self.email_config['username']
try:
with smtplib.SMTP(
self.email_config['smtp_server'],
self.email_config['port']
) as server:
server.starttls()
server.login(
self.email_config['username'],
self.email_config['password']
)
server.send_message(msg)
print(f"邮件警报已发送: {subject}")
except Exception as e:
print(f"邮件发送失败: {e}")
Code collapsed
关键要点
- 多方法结合:统计+机器学习更准确
- 时序数据特殊处理:考虑时间依赖
- 阈值需个性化:基于历史数据调整
- 及时预警很重要:早期干预效果好
- 假阳性不可避免:需专业评估确认
常见问题
如何降低误报?
- 增加数据量
- 使用历史基线个性化
- 结合多个检测器
- 调整阈值参数
能检测哪些异常?
可检测:
- 心率异常(过快/过慢)
- 血压波动
- 睡眠骤变
- 活动量突降
- 体重异常变化
难以检测:
- 渐进式恶化(趋势异常需专门算法)
- 多变量复杂交互
隐私保护?
实现:
- 本地处理,数据不上传
- 数据加密存储
- 用户控制数据访问
参考资料
- 异常检测综述论文
- sklearn异常检测文档
- 时间序列异常检测
- 数字健康监测指南
发布日期:2026年3月8日 最后更新:2026年3月8日