康心伴Logo
康心伴WellAlly
Health

Python健康数据异常检测:机器学习识别异常模式 | WellAlly康心伴

5 分钟阅读

Python健康数据异常检测:机器学习识别异常模式

概述

健康数据中的异常值可能预示重要事件:

  • 心率突增 → 压力/心律失常
  • 血压骤升 → 高血压危象
  • 睡眠骤降 → 睡眠障碍
  • 步数减少 → 活动下降/疾病

异常检测(Anomaly Detection)能自动识别这些偏离正常模式的数据点。


异常检测方法

常用算法

算法适用场景优点缺点
Z-Score单变量,正态分布简单快速只适用于正态分布
IQR单变量,任意分布鲁棒性强不考虑时序
Isolation Forest多变量高效准确需要较多数据
LSTM-AE时间序列捕捉时序依赖训练复杂
One-Class SVM少样本无需正常数据参数敏感

环境设置

安装依赖

code
pip install numpy
pip install pandas
pip install scikit-learn
pip install tensorflow
pip install matplotlib
pip install seaborn
Code collapsed

数据准备

模拟健康数据

code
# data_generation.py
import numpy as np
import pandas as pd
from datetime import datetime, timedelta

def generate_health_data(days=180, anomaly_rate=0.05):
    """
    生成模拟健康数据

    参数:
        days: 数据天数
        anomaly_rate: 异常值比例

    返回:
        健康数据DataFrame
    """
    np.random.seed(42)

    # 日期范围
    dates = pd.date_range(
        start=datetime.now() - timedelta(days=days),
        periods=days,
        freq='D'
    )

    # 1. 心率数据 (bpm)
    # 基础心率 + 日变化 + 随机波动
    base_hr = 70
    daily_variation = 5 * np.sin(2 * np.pi * np.arange(days) / 7)
    random_noise = np.random.normal(0, 3, days)
    heart_rate = base_hr + daily_variation + random_noise

    # 2. 血压数据
    base_sbp = 120  # 收缩压
    base_dbp = 80   # 舒张压
    sbp = base_sbp + np.random.normal(0, 5, days)
    dbp = base_dbp + np.random.normal(0, 3, days)

    # 3. 步数数据
    base_steps = 8000
    weekday_steps = 10000
    weekend_steps = 5000
    steps = []
    for i, date in enumerate(dates):
        if date.dayofweek >= 5:  # 周末
            steps.append(weekend_steps + np.random.normal(0, 1000))
        else:  # 工作日
            steps.append(weekday_steps + np.random.normal(0, 1500))

    # 4. 睡眠时长 (小时)
    base_sleep = 7.5
    sleep = base_sleep + np.random.normal(0, 1, days)

    # 5. 体重 (kg)
    base_weight = 70
    weight = base_weight + np.cumsum(np.random.normal(0, 0.1, days))

    # 创建DataFrame
    df = pd.DataFrame({
        'date': dates,
        'heart_rate': heart_rate,
        'sbp': sbp,
        'dbp': dbp,
        'steps': steps,
        'sleep_hours': sleep,
        'weight': weight
    })

    # 添加异常值
    num_anomalies = int(days * anomaly_rate)
    anomaly_indices = np.random.choice(days, num_anomalies, replace=False)

    for idx in anomaly_indices:
        anomaly_type = np.random.choice(['hr', 'bp', 'sleep', 'steps'])

        if anomaly_type == 'hr':
            # 心率异常
            df.loc[idx, 'heart_rate'] += np.random.choice([-30, 30])
        elif anomaly_type == 'bp':
            # 血压异常
            df.loc[idx, 'sbp'] += np.random.choice([-20, 20])
            df.loc[idx, 'dbp'] += np.random.choice([-10, 10])
        elif anomaly_type == 'sleep':
            # 睡眠异常
            df.loc[idx, 'sleep_hours'] += np.random.choice([-3, -4, -5])
        elif anomaly_type == 'steps':
            # 步数异常
            df.loc[idx, 'steps'] *= np.random.choice([0.2, 0.3])

    return df

# 生成数据
health_data = generate_health_data(days=180, anomaly_rate=0.05)
print(f"数据形状: {health_data.shape}")
print(health_data.head())
Code collapsed

统计方法异常检测

1. Z-Score方法

code
# zscore_detector.py
from scipy import stats
import numpy as np

class ZScoreAnomalyDetector:
    """Z-Score异常检测器"""

    def __init__(self, threshold: float = 3.0):
        """
        参数:
            threshold: Z-Score阈值(通常为3)
        """
        self.threshold = threshold
        self.mean = None
        self.std = None

    def fit(self, data: np.ndarray):
        """拟合正常数据"""
        self.mean = np.mean(data)
        self.std = np.std(data)
        return self

    def detect(self, data: np.ndarray) -> np.ndarray:
        """
        检测异常

        返回:
            布尔数组,True表示异常
        """
        z_scores = np.abs((data - self.mean) / self.std)
        return z_scores > self.threshold

    def get_anomaly_scores(self, data: np.ndarray) -> np.ndarray:
        """获取异常分数"""
        return np.abs((data - self.mean) / self.std)

# 使用示例
detector = ZScoreAnomalyDetector(threshold=3)
detector.fit(health_data['heart_rate'].values)

anomalies = detector.detect(health_data['heart_rate'].values)
anomaly_dates = health_data[anomalies]['date']

print(f"检测到{anomalies.sum()}个心率异常:")
print(anomaly_dates)
Code collapsed

2. IQR方法

code
# iqr_detector.py
class IQRAnomalyDetector:
    """IQR(四分位距)异常检测器"""

    def __init__(self, multiplier: float = 1.5):
        """
        参数:
            multiplier: IQR倍数(通常为1.5)
        """
        self.multiplier = multiplier
        self.q1 = None
        self.q3 = None
        self.iqr = None
        self.lower_bound = None
        self.upper_bound = None

    def fit(self, data: np.ndarray):
        """拟合数据"""
        self.q1 = np.percentile(data, 25)
        self.q3 = np.percentile(data, 75)
        self.iqr = self.q3 - self.q1
        self.lower_bound = self.q1 - self.multiplier * self.iqr
        self.upper_bound = self.q3 + self.multiplier * self.iqr
        return self

    def detect(self, data: np.ndarray) -> np.ndarray:
        """检测异常"""
        return (data < self.lower_bound) | (data > self.upper_bound)

    def get_bounds(self):
        """获取异常边界"""
        return {
            'lower': self.lower_bound,
            'upper': self.upper_bound
        }

# 使用示例
detector = IQRAnomalyDetector(multiplier=1.5)
detector.fit(health_data['steps'].values)

anomalies = detector.detect(health_data['steps'].values)
bounds = detector.get_bounds()

print(f"正常范围: {bounds['lower']:.0f} - {bounds['upper']:.0f} 步")
print(f"检测到{anomalies.sum()}个异常")
Code collapsed

机器学习方法

1. Isolation Forest

code
# isolation_forest_detector.py
from sklearn.ensemble import IsolationForest
import numpy as np

class IsolationForestAnomalyDetector:
    """Isolation Forest异常检测器"""

    def __init__(self, contamination: float = 0.05):
        """
        参数:
            contamination: 预期异常比例
        """
        self.contamination = contamination
        self.model = IsolationForest(
            contamination=contamination,
            random_state=42
        )

    def fit(self, X: np.ndarray):
        """
        拟合模型

        参数:
            X: 特征矩阵 (n_samples, n_features)
        """
        self.model.fit(X)
        return self

    def detect(self, X: np.ndarray) -> np.ndarray:
        """
        检测异常

        返回:
            -1表示异常,1表示正常
        """
        predictions = self.model.predict(X)
        return predictions == -1

    def get_anomaly_scores(self, X: np.ndarray) -> np.ndarray:
        """
        获取异常分数

        返回:
            越低越异常
        """
        return self.model.score_samples(X)

# 多变量异常检测
# 准备特征
features = ['heart_rate', 'sbp', 'dbp', 'steps', 'sleep_hours']
X = health_data[features].values

# 检测
detector = IsolationForestAnomalyDetector(contamination=0.05)
detector.fit(X)

anomalies = detector.detect(X)
anomaly_scores = detector.get_anomaly_scores(X)

# 结果
health_data['is_anomaly'] = anomalies
health_data['anomaly_score'] = anomaly_scores

print(f"检测到{anomalies.sum()}个多变量异常:")
print(health_data[anomalies][['date'] + features])
Code collapsed

2. One-Class SVM

code
# svm_detector.py
from sklearn.svm import OneClassSVM
from sklearn.preprocessing import StandardScaler

class OneClassSVMDetector:
    """One-Class SVM异常检测器"""

    def __init__(self, nu: float = 0.05):
        """
        参数:
            nu: 预期异常比例上限
        """
        self.nu = nu
        self.scaler = StandardScaler()
        self.model = OneClassSVM(nu=nu, kernel='rbf')

    def fit(self, X: np.ndarray):
        """拟合模型"""
        X_scaled = self.scaler.fit_transform(X)
        self.model.fit(X_scaled)
        return self

    def detect(self, X: np.ndarray) -> np.ndarray:
        """检测异常"""
        X_scaled = self.scaler.transform(X)
        predictions = self.model.predict(X_scaled)
        return predictions == -1

# 使用示例
detector = OneClassSVMDetector(nu=0.05)
detector.fit(X)

anomalies = detector.detect(X)
print(f"检测到{anomalies.sum()}个异常")
Code collapsed

时间序列异常检测

LSTM Autoencoder

code
# lstm_autoencoder.py
import tensorflow as tf
from tensorflow import keras
from tensorflow.keras import layers
import numpy as np

class LSTMAutoencoderDetector:
    """LSTM自编码器异常检测器"""

    def __init__(self, sequence_length: int = 7):
        """
        参数:
            sequence_length: 输入序列长度(天)
        """
        self.sequence_length = sequence_length
        self.model = None
        self.threshold = None

    def prepare_sequences(self, data: np.ndarray) -> np.ndarray:
        """
        准备时间序列数据

        参数:
            data: 1D数组

        返回:
            3D数组 (samples, sequence_length, features)
        """
        sequences = []
        for i in range(len(data) - self.sequence_length):
            sequences.append(data[i:i + self.sequence_length])

        return np.array(sequences)

    def build_model(self, input_shape: tuple):
        """构建LSTM自编码器"""
        self.model = keras.Sequential([
            # 编码器
            layers.LSTM(64, activation='relu',
                      input_shape=input_shape,
                      return_sequences=True),
            layers.LSTM(32, activation='relu', return_sequences=False),
            layers.RepeatVector(input_shape[0]),

            # 解码器
            layers.LSTM(32, activation='relu', return_sequences=True),
            layers.LSTM(64, activation='relu', return_sequences=True),
            layers.TimeDistributed(layers.Dense(1))
        ])

        self.model.compile(optimizer='adam', loss='mae')
        return self.model

    def fit(self, data: np.ndarray, epochs: int = 50):
        """
        训练模型

        参数:
            data: 原始数据(1D数组)
            epochs: 训练轮数
        """
        # 准备数据
        sequences = self.prepare_sequences(data)
        X = sequences[:, :-1, :]  # 除最后一个时间步
        y = sequences[:, 1:, :]   # 除第一个时间步

        # 构建并训练模型
        input_shape = (self.sequence_length - 1, 1)
        self.build_model(input_shape)

        history = self.model.fit(
            X, y,
            epochs=epochs,
            batch_size=32,
            validation_split=0.1,
            verbose=0
        )

        # 计算阈值(使用训练集的MAE)
        train_predictions = self.model.predict(X)
        train_mae = np.mean(np.abs(train_predictions - y), axis=(1, 2))
        self.threshold = np.mean(train_mae) + 3 * np.std(train_mae)

        return history

    def detect(self, data: np.ndarray) -> tuple:
        """
        检测异常

        返回:
            (anomalies, reconstruction_errors)
        """
        # 准备数据
        sequences = self.prepare_sequences(data)
        X = sequences[:, :-1, :]
        y = sequences[:, 1:, :]

        # 预测
        predictions = self.model.predict(X)

        # 计算重构误差
        mae = np.mean(np.abs(predictions - y), axis=(1, 2))

        # 检测异常
        anomalies = mae > self.threshold

        return anomalies, mae

# 使用示例
detector = LSTMAutoencoderDetector(sequence_length=7)

# 使用心率数据训练
hr_data = health_data['heart_rate'].values
detector.fit(hr_data, epochs=50)

# 检测异常
anomalies, errors = detector.detect(hr_data)

print(f"检测到{anomalies.sum()}个时间序列异常")
Code collapsed

可视化

异常可视化

code
# visualizer.py
import matplotlib.pyplot as plt
import seaborn as sns

def plot_anomalies(data, variable, anomalies, title):
    """绘制异常数据"""
    plt.figure(figsize=(15, 6))

    # 正常数据
    normal_mask = ~anomalies
    plt.scatter(data[normal_mask]['date'],
               data[normal_mask][variable],
               c='blue', label='正常', alpha=0.6)

    # 异常数据
    plt.scatter(data[anomalies]['date'],
               data[anomalies][variable],
               c='red', s=100, label='异常', marker='x')

    plt.xlabel('日期')
    plt.ylabel(variable)
    plt.title(title)
    plt.legend()
    plt.xticks(rotation=45)
    plt.tight_layout()
    plt.savefig(f'{variable}_anomalies.png')
    plt.close()

# 绘制心率异常
plot_anomalies(health_data, 'heart_rate',
              anomalies, '心率异常检测')

# 绘制步数异常
detector = IQRAnomalyDetector()
detector.fit(health_data['steps'].values)
steps_anomalies = detector.detect(health_data['steps'].values)

plot_anomalies(health_data, 'steps',
              steps_anomalies, '步数异常检测')
Code collapsed

多变量异常热图

code
def plot_multivariate_anomalies(data, anomaly_scores):
    """绘制多变量异常分数热图"""
    # 按日期排序
    data_sorted = data.sort_values('date')

    # 创建热图数据
    variables = ['heart_rate', 'sbp', 'dbp', 'steps', 'sleep_hours']
    heatmap_data = data_sorted[variables].T

    # 标准化
    heatmap_data = (heatmap_data - heatmap_data.mean(axis=1, keepdims=True)) / \
                   heatmap_data.std(axis=1, keepdims=True)

    # 绘制热图
    plt.figure(figsize=(20, 6))
    sns.heatmap(heatmap_data,
                cmap='RdYlBu_r',
                cbar_kws={'label': '标准化值'},
                xticklabels=False,
                yticklabels=variables)

    # 标记异常
    anomaly_indices = np.where(anomalies)[0]
    for idx in anomaly_indices:
        plt.axvline(x=idx, color='red', alpha=0.3, linewidth=1)

    plt.title('多变量健康数据异常检测')
    plt.xlabel('日期')
    plt.tight_layout()
    plt.savefig('multivariate_anomalies.png')
    plt.close()
Code collapsed

实时监测系统

异常预警

code
# alert_system.py
from typing import Dict, List
import smtplib
from email.mime.text import MIMEText

class HealthAnomalyAlertSystem:
    """健康异常预警系统"""

    def __init__(self, email_config: Dict = None):
        """
        参数:
            email_config: 邮件配置 {'smtp_server', 'port', 'username', 'password'}
        """
        self.email_config = email_config
        self.alert_history = []

    def check_and_alert(self, new_data: Dict, detectors: Dict):
        """
        检查新数据并发出警报

        参数:
            new_data: 新的健康数据 {'heart_rate': 75, 'sbp': 120, ...}
            detectors: 检测器字典 {'heart_rate': detector_obj, ...}
        """
        alerts = []

        for variable, value in new_data.items():
            if variable in detectors:
                detector = detectors[variable]

                # 检测是否异常
                is_anomaly = detector.detect(np.array([value]))[0]

                if is_anomaly:
                    alert = self._create_alert(variable, value, detector)
                    alerts.append(alert)

        # 发送警报
        if alerts:
            self._send_alerts(alerts)

        return alerts

    def _create_alert(self, variable: str, value: float, detector) -> Dict:
        """创建警报"""
        alert = {
            'variable': variable,
            'value': value,
            'timestamp': datetime.now(),
            'severity': self._get_severity(variable, value, detector),
            'message': self._get_alert_message(variable, value, detector)
        }

        self.alert_history.append(alert)
        return alert

    def _get_severity(self, variable: str, value: float, detector) -> str:
        """确定严重程度"""
        # 简化版严重程度判断
        if variable == 'heart_rate':
            if value > 120 or value < 50:
                return 'high'
            elif value > 100 or value < 55:
                return 'medium'
        elif variable == 'sbp':
            if value > 160 or value < 90:
                return 'high'
            elif value > 140 or value < 100:
                return 'medium'
        elif variable == 'sleep_hours':
            if value < 4:
                return 'high'
            elif value < 5:
                return 'medium'

        return 'low'

    def _get_alert_message(self, variable: str, value: float, detector) -> str:
        """生成警报消息"""
        messages = {
            'heart_rate': f"⚠️ 心率异常: {value:.0f} bpm",
            'sbp': f"⚠️ 收缩压异常: {value:.0f} mmHg",
            'dbp': f"⚠️ 舒张压异常: {value:.0f} mmHg",
            'steps': f"⚠️ 步数异常: {value:.0f} 步",
            'sleep_hours': f"⚠️ 睡眠不足: {value:.1f} 小时"
        }

        base_message = messages.get(variable, f"⚠️ {variable}异常: {value}")

        # 添加建议
        if variable == 'heart_rate' and value > 100:
            base_message += "\n建议: 深呼吸、放松、休息"
        elif variable == 'sleep_hours' and value < 6:
            base_message += "\n建议: 今晚早点休息,保证睡眠"

        return base_message

    def _send_alerts(self, alerts: List[Dict]):
        """发送警报"""
        # 过滤高严重度警报
        high_severity = [a for a in alerts if a['severity'] == 'high']

        if high_severity and self.email_config:
            self._send_email_alert(high_severity)

        # 推送通知(需要移动端SDK)
        # self._send_push_notification(alerts)

    def _send_email_alert(self, alerts: List[Dict]):
        """发送邮件警报"""
        if not self.email_config:
            return

        # 构建邮件内容
        subject = f"⚠️ 健康异常警报 ({len(alerts)}项)"

        body = "检测到以下健康异常:\n\n"
        for alert in alerts:
            body += f"{alert['message']}\n"
            body += f"时间: {alert['timestamp'].strftime('%Y-%m-%d %H:%M')}\n\n"

        body += "\n请关注您的健康状况。"

        # 发送邮件
        msg = MIMEText(body)
        msg['Subject'] = subject
        msg['From'] = self.email_config['username']
        msg['To'] = self.email_config['username']

        try:
            with smtplib.SMTP(
                self.email_config['smtp_server'],
                self.email_config['port']
            ) as server:
                server.starttls()
                server.login(
                    self.email_config['username'],
                    self.email_config['password']
                )
                server.send_message(msg)

            print(f"邮件警报已发送: {subject}")
        except Exception as e:
            print(f"邮件发送失败: {e}")
Code collapsed

关键要点

  1. 多方法结合:统计+机器学习更准确
  2. 时序数据特殊处理:考虑时间依赖
  3. 阈值需个性化:基于历史数据调整
  4. 及时预警很重要:早期干预效果好
  5. 假阳性不可避免:需专业评估确认

常见问题

如何降低误报?

  1. 增加数据量
  2. 使用历史基线个性化
  3. 结合多个检测器
  4. 调整阈值参数

能检测哪些异常?

可检测

  • 心率异常(过快/过慢)
  • 血压波动
  • 睡眠骤变
  • 活动量突降
  • 体重异常变化

难以检测

  • 渐进式恶化(趋势异常需专门算法)
  • 多变量复杂交互

隐私保护?

实现:

  1. 本地处理,数据不上传
  2. 数据加密存储
  3. 用户控制数据访问

参考资料

  • 异常检测综述论文
  • sklearn异常检测文档
  • 时间序列异常检测
  • 数字健康监测指南

发布日期:2026年3月8日 最后更新:2026年3月8日

免责声明: 本内容仅供教育参考,不能替代专业医疗建议。请咨询医生获取个性化诊断和治疗方案。

#

文章标签

Python
异常检测
健康数据分析
机器学习
时间序列

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅