康心伴Logo
康心伴WellAlly
Data & Privacy

AWS Lambda + Kinesis 实时处理 IoT 健康数据流

5 分钟阅读

AWS Lambda + Kinesis 实时处理 IoT 健康数据流

概述

物联网医疗设备(可穿戴设备、远程监控设备)产生的健康数据需要实时处理和分析。本文将介绍如何使用 AWS 无服务器架构构建可扩展的实时数据处理管道。

架构概览

code
┌─────────────────────────────────────────────────────────────────┐
│                        IoT 设备层                               │
│  ┌──────────┐  ┌──────────┐  ┌──────────┐  ┌──────────┐        │
│  │ 智能手环 │  │ 血压计   │  │ 血糖仪   │  │ ECG 设备 │        │
│  └──────────┘  └──────────┘  └──────────┘  └──────────┘        │
└─────────────────────────────────────────────────────────────────┘
                              │ MQTT/HTTPS
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                      AWS IoT Core                              │
│  • 设备注册与认证    • 消息路由    • 设备影子                   │
└─────────────────────────────────────────────────────────────────┘
                              │
                              ▼
┌─────────────────────────────────────────────────────────────────┐
│                   Kinesis Data Streams                         │
│  • 健康指标流       • 告警事件流    • 设备状态流                │
└─────────────────────────────────────────────────────────────────┘
                              │
         ┌────────────────────┼────────────────────┐
         ▼                    ▼                    ▼
┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐
│   Lambda: 处理   │  │ Lambda: 告警     │  │ Lambda: 归档     │
│   数据清洗/转换  │  │ 异常检测/通知    │  │ 长期存储/S3      │
└──────────────────┘  └──────────────────┘  └──────────────────┘
         │                    │                    │
         ▼                    ▼                    ▼
┌──────────────────┐  ┌──────────────────┐  ┌──────────────────┐
│  Timestream      │  │  SNS/SES         │  │  S3/Redshift     │
│  时序数据库      │  │  告警通知        │  │  数据湖          │
└──────────────────┘  └──────────────────┘  └──────────────────┘
Code collapsed

1. AWS IoT Core 配置

设备注册与认证

code
// lib/iot/device-manager.ts
import {
  IoTClient,
  CreateThingCommand,
  CreateKeysAndCertificateCommand,
  AttachPolicyCommand,
  DeleteThingCommand,
} from '@aws-sdk/client-iot';

const iotClient = new IoTClient({ region: process.env.AWS_REGION });

export interface DeviceRegistration {
  deviceId: string;
  deviceType: 'smartwatch' | 'blood_pressure' | 'glucose_meter' | 'ecg';
  userId: string;
  tenantId: string;
}

export class IoTDeviceManager {
  /**
   * 注册新 IoT 设备
   */
  static async registerDevice(params: DeviceRegistration): Promise<{
    thingName: string;
    certificateArn: string;
    certificatePem: string;
    privateKey: string;
    endpoint: string;
  }> {
    const { deviceId, deviceType, userId, tenantId } = params;

    // 1. 创建 Thing(设备数字孪生)
    const thingName = `${deviceType}_${userId}_${deviceId}`;

    await iotClient.send(
      new CreateThingCommand({
        thingName,
        thingTypeName: deviceType,
        attributePayload: {
          attributes: {
            userId,
            tenantId,
            deviceType,
            registeredAt: new Date().toISOString(),
          },
        },
      })
    );

    // 2. 创建设备证书和密钥
    const certResponse = await iotClient.send(
      new CreateKeysAndCertificateCommand({
        setAsActive: true,
      })
    );

    // 3. 附加策略(基于设备类型的权限)
    await iotClient.send(
      new AttachPolicyCommand({
        policyName: `${deviceType}_policy`,
        target: certResponse.certificateArn,
      })
    );

    // 4. 获取 IoT 数据端点
    const endpoint = await this.getDataEndpoint();

    return {
      thingName,
      certificateArn: certResponse.certificateArn!,
      certificatePem: certResponse.certificatePem!,
      privateKey: certResponse.keyPair?.PrivateKey || '',
      endpoint,
    };
  }

  /**
   * 删除设备
   */
  static async deregisterDevice(thingName: string): Promise<void> {
    await iotClient.send(new DeleteThingCommand({ thingName }));
  }

  /**
   * 获取 IoT 数据端点
   */
  private static async getDataEndpoint(): Promise<string> {
    const { endpointAddress } = await iotClient.describeEndpoint({
      endpointType: 'iot:Data-ATS',
    });

    return `https://${endpointAddress}`;
  }
}
Code collapsed

IoT 策略配置

code
// policies/smartwatch_policy.json
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Action": [
        "iot:Connect"
      ],
      "Resource": [
        "arn:aws:iot:region:account:client/${iot:Connection.Thing.ThingName}"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "iot:Publish"
      ],
      "Resource": [
        "arn:aws:iot:region:account:topic/health/${iot:Connection.Thing.ThingName}/#"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "iot:Subscribe"
      ],
      "Resource": [
        "arn:aws:iot:region:account:topicfilter/health/${iot:Connection.Thing.ThingName}/commands"
      ]
    },
    {
      "Effect": "Allow",
      "Action": [
        "iot:Receive"
      ],
      "Resource": [
        "arn:aws:iot:region:account:topic/health/${iot:Connection.Thing.ThingName}/commands/*"
      ]
    }
  ]
}
Code collapsed

消息路由规则

code
// lib/iot/message-router.ts
import {
  IoTDataPlaneClient,
  PublishCommand,
} from '@aws-sdk/client-iot-data-plane';

const iotDataClient = new IoTDataPlaneClient({
  endpoint: process.env.IOT_DATA_ENDPOINT,
  region: process.env.AWS_REGION,
});

export interface HealthMetric {
  deviceId: string;
  userId: string;
  tenantId: string;
  timestamp: number;
  metrics: {
    heartRate?: number;
    bloodPressure?: { systolic: number; diastolic: number };
    bloodGlucose?: number;
    steps?: number;
    sleep?: { duration: number; quality: number };
    temperature?: number;
    spo2?: number;
  };
  battery?: number;
}

export class IoTMessageRouter {
  /**
   * 发布健康指标到 IoT Core
   */
  static async publishMetrics(data: HealthMetric): Promise<void> {
    const topic = `health/${data.deviceId}/metrics`;

    await iotDataClient.send(
      new PublishCommand({
        topic,
        payload: JSON.stringify({
          ...data,
          receivedAt: Date.now(),
        }),
        qos: 1, // 至少一次传递
      })
    );
  }

  /**
   * 发送命令到设备
   */
  static async sendCommand(
    thingName: string,
    command: string,
    payload: any
  ): Promise<void> {
    const topic = `health/${thingName}/commands/${command}`;

    await iotDataClient.send(
      new PublishCommand({
        topic,
        payload: JSON.stringify({
          command,
          payload,
          timestamp: Date.now(),
        }),
        qos: 1,
      })
    );
  }
}
Code collapsed

2. Kinesis 数据流处理

Kinesis 流创建与配置

code
// lib/kinesis/stream-manager.ts
import {
  KinesisClient,
  CreateStreamCommand,
  DescribeStreamCommand,
  PutRecordCommand,
  PutRecordsCommand,
} from '@aws-sdk/client-kinesis';

const kinesisClient = new KinesisClient({ region: process.env.AWS_REGION });

export interface StreamConfig {
  streamName: string;
  shardCount: number;
  retentionPeriodHours: number;
}

export class KinesisStreamManager {
  /**
   * 创建 Kinesis 数据流
   */
  static async createStream(config: StreamConfig): Promise<void> {
    await kinesisClient.send(
      new CreateStreamCommand({
        StreamName: config.streamName,
        ShardCount: config.shardCount,
      })
    );

    // 等待流变为 ACTIVE 状态
    await this.waitForStreamActive(config.streamName);
  }

  /**
   * 发布单条记录到流
   */
  static async putRecord(
    streamName: string,
    partitionKey: string,
    data: any
  ): Promise<string> {
    const result = await kinesisClient.send(
      new PutRecordCommand({
        StreamName: streamName,
        PartitionKey: partitionKey,
        Data: JSON.stringify(data),
      })
    );

    return result.SequenceNumber || '';
  }

  /**
   * 批量发布记录到流
   */
  static async putRecords(
    streamName: string,
    records: Array<{ partitionKey: string; data: any }>
  ): Promise<{ successCount: number; failedCount: number }> {
    const kinesisRecords = records.map((r) => ({
      PartitionKey: r.partitionKey,
      Data: JSON.stringify(r.data),
    }));

    const result = await kinesisClient.send(
      new PutRecordsCommand({
        StreamName: streamName,
        Records: kinesisRecords,
      })
    );

    const failedCount = result.Records?.filter(
      (r) => r.ErrorCode !== undefined
    ).length || 0;

    return {
      successCount: records.length - failedCount,
      failedCount,
    };
  }

  private static async waitForStreamActive(
    streamName: string,
    maxWaitSeconds = 300
  ): Promise<void> {
    const startTime = Date.now();

    while (Date.now() - startTime < maxWaitSeconds * 1000) {
      const response = await kinesisClient.send(
        new DescribeStreamCommand({ StreamName: streamName })
      );

      if (response.StreamDescription?.StreamStatus === 'ACTIVE') {
        return;
      }

      await new Promise((resolve) => setTimeout(resolve, 5000));
    }

    throw new Error(`流 ${streamName} 未在预期时间内变为 ACTIVE`);
  }
}

// 初始化流
export const HEALTH_METRICS_STREAM = 'health-metrics-stream';
export const HEALTH_ALERTS_STREAM = 'health-alerts-stream';
export const DEVICE_STATUS_STREAM = 'device-status-stream';
Code collapsed

IoT Core 到 Kinesis 路由规则

code
-- IoT Core 规则 SQL
-- 规则: route_health_metrics_to_kinesis

SELECT
  *,
  topic(3) as deviceType,
  clientId() as deviceId
FROM 'health/+/metrics'
WHERE userId IS NOT NULL AND tenantId IS NOT NULL
Code collapsed
code
// 规则操作配置
const ruleActions = [
  {
    kinesis: {
      streamName: HEALTH_METRICS_STREAM,
      partitionKey: '${tenantId}:${userId}',
      roleArn: process.env.IOT_KINESIS_ROLE_ARN,
    },
  },
];
Code collapsed

3. Lambda 函数实现

数据清洗与转换 Lambda

code
// lambdas/process-health-metrics/index.ts
import {
  KinesisStreamEvent,
  KinesisStreamHandler,
} from 'aws-lambda';

import { TimestreamWriteClient, WriteRecordsCommand } from '@aws-sdk/client-timestream-write';

const timestreamClient = new TimestreamWriteClient({
  region: process.env.AWS_REGION,
});

export const handler: KinesisStreamHandler = async (event) => {
  const recordsToWrite: any[] = [];

  for (const record of event.Records) {
    try {
      // 解析 Kinesis 记录
      const payload = JSON.parse(
        Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
      );

      // 数据验证
      const validated = validateHealthMetric(payload);

      if (!validated.isValid) {
        console.error('无效的健康指标:', validated.errors);
        continue;
      }

      // 数据清洗
      const cleaned = cleanHealthMetric(payload);

      // 数据转换
      const timestreamRecords = convertToTimestreamRecords(cleaned);
      recordsToWrite.push(...timestreamRecords);

    } catch (error) {
      console.error('处理记录时出错:', error);
    }
  }

  // 批量写入 Timestream
  if (recordsToWrite.length > 0) {
    await writeTimestreamRecords(recordsToWrite);
  }
};

/**
 * 验证健康指标数据
 */
function validateHealthMetric(data: any): {
  isValid: boolean;
  errors: string[];
} {
  const errors: string[] = [];

  // 必填字段验证
  if (!data.userId) errors.push('缺少 userId');
  if (!data.tenantId) errors.push('缺少 tenantId');
  if (!data.timestamp) errors.push('缺少 timestamp');

  // 数值范围验证
  if (data.metrics?.heartRate) {
    if (data.metrics.heartRate < 30 || data.metrics.heartRate > 220) {
      errors.push('心率超出正常范围 (30-220 BPM)');
    }
  }

  if (data.metrics?.bloodPressure?.systolic) {
    if (data.metrics.bloodPressure.systolic < 60 || data.metrics.bloodPressure.systolic > 250) {
      errors.push('收缩压超出正常范围');
    }
  }

  if (data.metrics?.spo2) {
    if (data.metrics.spo2 < 70 || data.metrics.spo2 > 100) {
      errors.push('血氧饱和度超出正常范围');
    }
  }

  return {
    isValid: errors.length === 0,
    errors,
  };
}

/**
 * 清洗数据
 */
function cleanHealthMetric(data: any) {
  return {
    userId: String(data.userId),
    tenantId: String(data.tenantId),
    deviceId: String(data.deviceId),
    timestamp: Math.floor(data.timestamp / 1000), // 转换为秒
    metrics: {
      heartRate: data.metrics?.heartRate || null,
      systolicBP: data.metrics?.bloodPressure?.systolic || null,
      diastolicBP: data.metrics?.bloodPressure?.diastolic || null,
      bloodGlucose: data.metrics?.bloodGlucose || null,
      steps: data.metrics?.steps || null,
      temperature: data.metrics?.temperature || null,
      spo2: data.metrics?.spo2 || null,
    },
    battery: data.battery || null,
    receivedAt: Math.floor(data.receivedAt / 1000),
  };
}

/**
 * 转换为 Timestream 记录格式
 */
function convertToTimestreamRecords(data: any) {
  const database = 'health_metrics_db';
  const table = 'user_metrics';

  const records = [];

  // 为每个非空指标创建记录
  const metricMap = {
    heartRate: 'heart_rate',
    systolicBP: 'systolic_bp',
    diastolicBP: 'diastolic_bp',
    bloodGlucose: 'blood_glucose',
    steps: 'steps',
    temperature: 'temperature',
    spo2: 'spo2',
  };

  for (const [key, value] of Object.entries(metricMap)) {
    if (data.metrics[key] !== null && data.metrics[key] !== undefined) {
      records.push({
        Dimensions: [
          { Name: 'user_id', Value: data.userId },
          { Name: 'tenant_id', Value: data.tenantId },
          { Name: 'device_id', Value: data.deviceId },
          { Name: 'metric_type', Value: value },
        ],
        MeasureName: 'value',
        MeasureValue: data.metrics[key].toString(),
        MeasureValueType: 'DOUBLE',
        Time: data.timestamp.toString(),
        TimeUnit: 'SECONDS',
      });
    }
  }

  // 添加电池状态记录
  if (data.battery !== null) {
    records.push({
      Dimensions: [
        { Name: 'user_id', Value: data.userId },
        { Name: 'tenant_id', Value: data.tenantId },
        { Name: 'device_id', Value: data.deviceId },
        { Name: 'metric_type', Value: 'battery' },
      ],
      MeasureName: 'percentage',
      MeasureValue: data.battery.toString(),
      MeasureValueType: 'DOUBLE',
      Time: data.timestamp.toString(),
      TimeUnit: 'SECONDS',
    });
  }

  return records.map((r) => ({ ...r, Database: database, Table: table }));
}

/**
 * 写入 Timestream
 */
async function writeTimestreamRecords(records: any[]) {
  // Timestream 批量写入限制为 100 条记录
  const batchSize = 100;

  for (let i = 0; i < records.length; i += batchSize) {
    const batch = records.slice(i, i + batchSize);

    await timestreamClient.send(
      new WriteRecordsCommand({
        DatabaseName: batch[0].Database,
        TableName: batch[0].Table,
        Records: batch,
      })
    );
  }
}
Code collapsed

异常检测与告警 Lambda

code
// lambdas/detect-health-alerts/index.ts
import { KinesisStreamHandler } from 'aws-lambda';
import { PublishCommand, SNSClient } from '@aws-sdk/client-sns';

const snsClient = new SNSClient({ region: process.env.AWS_REGION });

interface HealthAlert {
  userId: string;
  tenantId: string;
  alertType: 'critical' | 'warning' | 'info';
  category: string;
  message: string;
  value: number;
  threshold: number;
  timestamp: number;
}

export const handler: KinesisStreamHandler = async (event) => {
  const alerts: HealthAlert[] = [];

  for (const record of event.Records) {
    try {
      const payload = JSON.parse(
        Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
      );

      // 执行各种检测
      alerts.push(...detectHeartRateAlerts(payload));
      alerts.push(...detectBloodPressureAlerts(payload));
      alerts.push(...detectBloodGlucoseAlerts(payload));
      alerts.push(...detectSpo2Alerts(payload));
      alerts.push(...detectDeviceOfflineAlerts(payload));

    } catch (error) {
      console.error('检测异常时出错:', error);
    }
  }

  // 发送告警
  for (const alert of alerts) {
    await sendAlert(alert);
  }
};

/**
 * 检测心率异常
 */
function detectHeartRateAlerts(data: any): HealthAlert[] {
  const alerts: HealthAlert[] = [];
  const heartRate = data.metrics?.heartRate;

  if (!heartRate) return alerts;

  // 严重异常
  if (heartRate > 180 || heartRate < 40) {
    alerts.push({
      userId: data.userId,
      tenantId: data.tenantId,
      alertType: 'critical',
      category: 'heart_rate',
      message: heartRate > 180
        ? '心率过高!请立即就医'
        : '心率过低!请立即就医',
      value: heartRate,
      threshold: heartRate > 180 ? 180 : 40,
      timestamp: data.timestamp,
    });
  }
  // 警告
  else if (heartRate > 120 || heartRate < 50) {
    alerts.push({
      userId: data.userId,
      tenantId: data.tenantId,
      alertType: 'warning',
      category: 'heart_rate',
      message: `心率异常 (${heartRate} BPM),请留意`,
      value: heartRate,
      threshold: heartRate > 120 ? 120 : 50,
      timestamp: data.timestamp,
    });
  }

  return alerts;
}

/**
 * 检测血压异常
 */
function detectBloodPressureAlerts(data: any): HealthAlert[] {
  const alerts: HealthAlert[] = [];
  const bp = data.metrics?.bloodPressure;

  if (!bp?.systolic || !bp?.diastolic) return alerts;

  // 高血压危机
  if (bp.systolic > 180 || bp.diastolic > 120) {
    alerts.push({
      userId: data.userId,
      tenantId: data.tenantId,
      alertType: 'critical',
      category: 'blood_pressure',
      message: '血压过高!高血压危机,请立即就医',
      value: bp.systolic,
      threshold: 180,
      timestamp: data.timestamp,
    });
  }
  // 高血压 2 期
  else if (bp.systolic > 140 || bp.diastolic > 90) {
    alerts.push({
      userId: data.userId,
      tenantId: data.tenantId,
      alertType: 'warning',
      category: 'blood_pressure',
      message: `血压偏高 (${bp.systolic}/${bp.diastolic} mmHg)`,
      value: bp.systolic,
      threshold: 140,
      timestamp: data.timestamp,
    });
  }

  return alerts;
}

/**
 * 检测血氧异常
 */
function detectSpo2Alerts(data: any): HealthAlert[] {
  const alerts: HealthAlert[] = [];
  const spo2 = data.metrics?.spo2;

  if (!spo2) return alerts;

  if (spo2 < 88) {
    alerts.push({
      userId: data.userId,
      tenantId: data.tenantId,
      alertType: 'critical',
      category: 'spo2',
      message: '血氧饱和度过低!请立即就医',
      value: spo2,
      threshold: 88,
      timestamp: data.timestamp,
    });
  } else if (spo2 < 92) {
    alerts.push({
      userId: data.userId,
      tenantId: data.tenantId,
      alertType: 'warning',
      category: 'spo2',
      message: `血氧饱和度偏低 (${spo2}%)`,
      value: spo2,
      threshold: 92,
      timestamp: data.timestamp,
    });
  }

  return alerts;
}

/**
 * 检测设备离线
 */
function detectDeviceOfflineAlerts(data: any): HealthAlert[] {
  const alerts: HealthAlert[] = [];
  const lastSeen = data.timestamp;
  const now = Date.now();
  const offlineThreshold = 24 * 60 * 60 * 1000; // 24 小时

  if (now - lastSeen > offlineThreshold) {
    alerts.push({
      userId: data.userId,
      tenantId: data.tenantId,
      alertType: 'warning',
      category: 'device_offline',
      message: `设备 ${data.deviceId} 已超过 24 小时未同步数据`,
      value: now - lastSeen,
      threshold: offlineThreshold,
      timestamp: now,
    });
  }

  return alerts;
}

/**
 * 发送告警通知
 */
async function sendAlert(alert: HealthAlert): Promise<void> {
  // 发布到 SNS 主题
  await snsClient.send(
    new PublishCommand({
      TopicArn: process.env.ALERTS_SNS_TOPIC_ARN,
      Message: JSON.stringify(alert),
      Subject: `[${alert.alertType.toUpperCase()}] ${alert.category}`,
      MessageAttributes: {
        alert_type: {
          DataType: 'String',
          StringValue: alert.alertType,
        },
        user_id: {
          DataType: 'String',
          StringValue: alert.userId,
        },
        tenant_id: {
          DataType: 'String',
          StringValue: alert.tenantId,
        },
      },
    })
  );
}
Code collapsed

数据归档 Lambda

code
// lambdas/archive-health-data/index.ts
import {
  KinesisStreamHandler,
  ScheduledHandler,
} from 'aws-lambda';
import {
  S3Client,
  PutObjectCommand,
} from '@aws-sdk/client-s3';

const s3Client = new S3Client({ region: process.env.AWS_REGION });

/**
 * 实时流数据归档
 */
export const streamHandler: KinesisStreamHandler = async (event) => {
  const recordsByDate: Map<string, any[]> = new Map();

  for (const record of event.Records) {
    try {
      const payload = JSON.parse(
        Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
      );

      // 按日期分组
      const date = new Date(payload.timestamp);
      const dateKey = `${date.getFullYear()}/${String(date.getMonth() + 1).padStart(2, '0')}/${String(date.getDate()).padStart(2, '0')}`;

      if (!recordsByDate.has(dateKey)) {
        recordsByDate.set(dateKey, []);
      }

      recordsByDate.get(dateKey)!.push(payload);

    } catch (error) {
      console.error('归档记录时出错:', error);
    }
  }

  // 按日期写入 S3
  for (const [dateKey, records] of recordsByDate.entries()) {
    await archiveToS3(dateKey, records);
  }
};

/**
 * 定期批量归档(从 Timestream 导出到 S3)
 */
export const scheduledHandler: ScheduledHandler = async (event) => {
  const yesterday = new Date();
  yesterday.setDate(yesterday.getDate() - 1);
  yesterday.setHours(0, 0, 0, 0);

  const today = new Date();
  today.setHours(0, 0, 0, 0);

  // 从 Timestream 查询数据
  const data = await queryTimestreamData(yesterday.getTime(), today.getTime());

  // 按租户分组
  const dataByTenant = groupByTenant(data);

  // 写入 S3(按租户隔离)
  for (const [tenantId, records] of Object.entries(dataByTenant)) {
    await archiveToS3(
      `tenant/${tenantId}/${formatDate(yesterday)}`,
      records
    );
  }
};

/**
 * 归档到 S3
 */
async function archiveToS3(key: string, data: any[]): Promise<void> {
  await s3Client.send(
    new PutObjectCommand({
      Bucket: process.env.HEALTH_DATA_ARCHIVE_BUCKET,
      Key: `health-data/${key}.json`,
      Body: JSON.stringify(data, null, 2),
      ContentType: 'application/json',
      ServerSideEncryption: 'AES256',
      Metadata: {
        recordCount: data.length.toString(),
        archivedAt: new Date().toISOString(),
      },
    })
  );
}

/**
 * 查询 Timestream 数据
 */
async function queryTimestreamData(
  startTime: number,
  endTime: number
): Promise<any[]> {
  // 实现略 - 使用 Timestream Query API
  return [];
}

/**
 * 按租户分组
 */
function groupByTenant(data: any[]): Record<string, any[]> {
  return data.reduce((acc, record) => {
    const tenantId = record.tenantId || 'unknown';
    if (!acc[tenantId]) {
      acc[tenantId] = [];
    }
    acc[tenantId].push(record);
    return acc;
  }, {} as Record<string, any[]>);
}

function formatDate(date: Date): string {
  return `${date.getFullYear()}-${String(date.getMonth() + 1).padStart(2, '0')}-${String(date.getDate()).padStart(2, '0')}`;
}
Code collapsed

4. 数据库设计

Timestream 表结构

code
-- 创建数据库
CREATE DATABASE IF NOT EXISTS health_metrics_db;

-- 创建用户指标表
CREATE TABLE IF NOT EXISTS user_metrics (
    user_id VARCHAR(50),
    tenant_id VARCHAR(50),
    device_id VARCHAR(100),
    metric_type VARCHAR(50),
    time TIMESTAMP,
    value DOUBLE
)
WITH (
    amazonkinesis.tagging = '{"Environment": "Production", "Application": "HealthMonitor"}'
);

-- 创建告警表
CREATE TABLE IF NOT EXISTS health_alerts (
    user_id VARCHAR(50),
    tenant_id VARCHAR(50),
    alert_type VARCHAR(20),
    category VARCHAR(50),
    time TIMESTAMP,
    severity INTEGER,
    message VARCHAR(500)
);
Code collapsed

DynamoDB 设备状态表

code
// lib/dynamodb/device-state.ts
import {
  DynamoDBClient,
  PutItemCommand,
  UpdateItemCommand,
  GetItemCommand,
} from '@aws-sdk/client-dynamodb';

const ddbClient = new DynamoDBClient({ region: process.env.AWS_REGION });

export interface DeviceState {
  deviceId: string;
  userId: string;
  tenantId: string;
  lastSeen: number;
  batteryLevel: number;
  firmwareVersion: string;
  status: 'online' | 'offline' | 'low_battery';
}

export class DeviceStateManager {
  private static tableName = 'device_states';

  /**
   * 更新设备状态
   */
  static async updateState(state: Partial<DeviceState> & { deviceId: string }): Promise<void> {
    await ddbClient.send(
      new UpdateItemCommand({
        TableName: this.tableName,
        Key: {
          device_id: { S: state.deviceId },
        },
        UpdateExpression: `
          SET last_seen = :lastSeen,
              battery_level = :battery,
              status = :status
        `,
        ExpressionAttributeValues: {
          ':lastSeen': { N: String(Date.now()) },
          ':battery': { N: String(state.batteryLevel || 0) },
          ':status': { S: this.determineStatus(state.batteryLevel || 0) },
        },
      })
    );
  }

  /**
   * 获取设备状态
   */
  static async getState(deviceId: string): Promise<DeviceState | null> {
    const result = await ddbClient.send(
      new GetItemCommand({
        TableName: this.tableName,
        Key: {
          device_id: { S: deviceId },
        },
      })
    );

    if (!result.Item) {
      return null;
    }

    return {
      deviceId: result.Item.device_id.S!,
      userId: result.Item.user_id.S!,
      tenantId: result.Item.tenant_id.S!,
      lastSeen: parseInt(result.Item.last_seen.N!),
      batteryLevel: parseInt(result.Item.battery_level.N!),
      firmwareVersion: result.Item.firmware_version.S!,
      status: result.Item.status.S! as any,
    };
  }

  private static determineStatus(batteryLevel: number): 'online' | 'offline' | 'low_battery' {
    if (batteryLevel < 20) return 'low_battery';
    return 'online';
  }
}
Code collapsed

5. 监控与告警

CloudWatch 自定义指标

code
// lib/cloudwatch/metrics.ts
import {
  CloudWatchClient,
  PutMetricDataCommand,
} from '@aws-sdk/client-cloudwatch';

const cloudWatch = new CloudWatchClient({ region: process.env.AWS_REGION });

export class CloudWatchMetrics {
  /**
   * 记录处理延迟指标
   */
  static async recordProcessingLatency(
    streamName: string,
    latencyMs: number
  ): Promise<void> {
    await cloudWatch.send(
      new PutMetricDataCommand({
        Namespace: 'HealthMonitor/Processing',
        MetricData: [
          {
            MetricName: 'ProcessingLatency',
            Dimensions: [
              { Name: 'StreamName', Value: streamName },
            ],
            Value: latencyMs,
            Unit: 'Milliseconds',
            StorageResolution: 60,
          },
        ],
      })
    );
  }

  /**
   * 记录告警数量
   */
  static async recordAlertCount(
    alertType: string,
    tenantId: string,
    count: number
  ): Promise<void> {
    await cloudWatch.send(
      new PutMetricDataCommand({
        Namespace: 'HealthMonitor/Alerts',
        MetricData: [
          {
            MetricName: 'AlertCount',
            Dimensions: [
              { Name: 'AlertType', Value: alertType },
              { Name: 'TenantId', Value: tenantId },
            ],
            Value: count,
            Unit: 'Count',
          },
        ],
      })
    );
  }

  /**
   * 记录设备状态
   */
  static async recordDeviceStatus(
    deviceType: string,
    status: 'online' | 'offline'
  ): Promise<void> {
    await cloudWatch.send(
      new PutMetricDataCommand({
        Namespace: 'HealthMonitor/Devices',
        MetricData: [
          {
            MetricName: 'DeviceStatus',
            Dimensions: [
              { Name: 'DeviceType', Value: deviceType },
              { Name: 'Status', Value: status },
            ],
            Value: 1,
            Unit: 'Count',
          },
        ],
      })
    );
  }
}
Code collapsed

合规检查清单

HIPAA 合规

  • 传输中数据加密(TLS 1.3)
  • 静态数据加密(S3 SSE、Timestream 加密)
  • 设备认证(X.509 证书)
  • 访问控制(IAM 角色、策略)
  • 审计日志(CloudTrail、DynamoDB Streams)
  • 业务连续性(多 AZ 部署)

数据安全

  • API 数据验证
  • 异常检测与告警
  • 数据保留策略
  • 安全数据删除
  • 定期备份

运维监控

  • Lambda 函数监控
  • Kinesis 流监控
  • 设备连接状态监控
  • 成本监控与优化

参考资料


免责声明:本文提供的架构仅供参考。在生产环境中处理健康数据前,请确保符合 HIPAA、GDPR 等法规要求,并进行全面的安全审计。

#

文章标签

aws
lambda
kinesis
iot
real-time
serverless
healthtech

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅