AWS Lambda + Kinesis 实时处理 IoT 健康数据流
概述
物联网医疗设备(可穿戴设备、远程监控设备)产生的健康数据需要实时处理和分析。本文将介绍如何使用 AWS 无服务器架构构建可扩展的实时数据处理管道。
架构概览
code
┌─────────────────────────────────────────────────────────────────┐
│ IoT 设备层 │
│ ┌──────────┐ ┌──────────┐ ┌──────────┐ ┌──────────┐ │
│ │ 智能手环 │ │ 血压计 │ │ 血糖仪 │ │ ECG 设备 │ │
│ └──────────┘ └──────────┘ └──────────┘ └──────────┘ │
└─────────────────────────────────────────────────────────────────┘
│ MQTT/HTTPS
▼
┌─────────────────────────────────────────────────────────────────┐
│ AWS IoT Core │
│ • 设备注册与认证 • 消息路由 • 设备影子 │
└─────────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────────┐
│ Kinesis Data Streams │
│ • 健康指标流 • 告警事件流 • 设备状态流 │
└─────────────────────────────────────────────────────────────────┘
│
┌────────────────────┼────────────────────┐
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Lambda: 处理 │ │ Lambda: 告警 │ │ Lambda: 归档 │
│ 数据清洗/转换 │ │ 异常检测/通知 │ │ 长期存储/S3 │
└──────────────────┘ └──────────────────┘ └──────────────────┘
│ │ │
▼ ▼ ▼
┌──────────────────┐ ┌──────────────────┐ ┌──────────────────┐
│ Timestream │ │ SNS/SES │ │ S3/Redshift │
│ 时序数据库 │ │ 告警通知 │ │ 数据湖 │
└──────────────────┘ └──────────────────┘ └──────────────────┘
Code collapsed
1. AWS IoT Core 配置
设备注册与认证
code
// lib/iot/device-manager.ts
import {
IoTClient,
CreateThingCommand,
CreateKeysAndCertificateCommand,
AttachPolicyCommand,
DeleteThingCommand,
} from '@aws-sdk/client-iot';
const iotClient = new IoTClient({ region: process.env.AWS_REGION });
export interface DeviceRegistration {
deviceId: string;
deviceType: 'smartwatch' | 'blood_pressure' | 'glucose_meter' | 'ecg';
userId: string;
tenantId: string;
}
export class IoTDeviceManager {
/**
* 注册新 IoT 设备
*/
static async registerDevice(params: DeviceRegistration): Promise<{
thingName: string;
certificateArn: string;
certificatePem: string;
privateKey: string;
endpoint: string;
}> {
const { deviceId, deviceType, userId, tenantId } = params;
// 1. 创建 Thing(设备数字孪生)
const thingName = `${deviceType}_${userId}_${deviceId}`;
await iotClient.send(
new CreateThingCommand({
thingName,
thingTypeName: deviceType,
attributePayload: {
attributes: {
userId,
tenantId,
deviceType,
registeredAt: new Date().toISOString(),
},
},
})
);
// 2. 创建设备证书和密钥
const certResponse = await iotClient.send(
new CreateKeysAndCertificateCommand({
setAsActive: true,
})
);
// 3. 附加策略(基于设备类型的权限)
await iotClient.send(
new AttachPolicyCommand({
policyName: `${deviceType}_policy`,
target: certResponse.certificateArn,
})
);
// 4. 获取 IoT 数据端点
const endpoint = await this.getDataEndpoint();
return {
thingName,
certificateArn: certResponse.certificateArn!,
certificatePem: certResponse.certificatePem!,
privateKey: certResponse.keyPair?.PrivateKey || '',
endpoint,
};
}
/**
* 删除设备
*/
static async deregisterDevice(thingName: string): Promise<void> {
await iotClient.send(new DeleteThingCommand({ thingName }));
}
/**
* 获取 IoT 数据端点
*/
private static async getDataEndpoint(): Promise<string> {
const { endpointAddress } = await iotClient.describeEndpoint({
endpointType: 'iot:Data-ATS',
});
return `https://${endpointAddress}`;
}
}
Code collapsed
IoT 策略配置
code
// policies/smartwatch_policy.json
{
"Version": "2012-10-17",
"Statement": [
{
"Effect": "Allow",
"Action": [
"iot:Connect"
],
"Resource": [
"arn:aws:iot:region:account:client/${iot:Connection.Thing.ThingName}"
]
},
{
"Effect": "Allow",
"Action": [
"iot:Publish"
],
"Resource": [
"arn:aws:iot:region:account:topic/health/${iot:Connection.Thing.ThingName}/#"
]
},
{
"Effect": "Allow",
"Action": [
"iot:Subscribe"
],
"Resource": [
"arn:aws:iot:region:account:topicfilter/health/${iot:Connection.Thing.ThingName}/commands"
]
},
{
"Effect": "Allow",
"Action": [
"iot:Receive"
],
"Resource": [
"arn:aws:iot:region:account:topic/health/${iot:Connection.Thing.ThingName}/commands/*"
]
}
]
}
Code collapsed
消息路由规则
code
// lib/iot/message-router.ts
import {
IoTDataPlaneClient,
PublishCommand,
} from '@aws-sdk/client-iot-data-plane';
const iotDataClient = new IoTDataPlaneClient({
endpoint: process.env.IOT_DATA_ENDPOINT,
region: process.env.AWS_REGION,
});
export interface HealthMetric {
deviceId: string;
userId: string;
tenantId: string;
timestamp: number;
metrics: {
heartRate?: number;
bloodPressure?: { systolic: number; diastolic: number };
bloodGlucose?: number;
steps?: number;
sleep?: { duration: number; quality: number };
temperature?: number;
spo2?: number;
};
battery?: number;
}
export class IoTMessageRouter {
/**
* 发布健康指标到 IoT Core
*/
static async publishMetrics(data: HealthMetric): Promise<void> {
const topic = `health/${data.deviceId}/metrics`;
await iotDataClient.send(
new PublishCommand({
topic,
payload: JSON.stringify({
...data,
receivedAt: Date.now(),
}),
qos: 1, // 至少一次传递
})
);
}
/**
* 发送命令到设备
*/
static async sendCommand(
thingName: string,
command: string,
payload: any
): Promise<void> {
const topic = `health/${thingName}/commands/${command}`;
await iotDataClient.send(
new PublishCommand({
topic,
payload: JSON.stringify({
command,
payload,
timestamp: Date.now(),
}),
qos: 1,
})
);
}
}
Code collapsed
2. Kinesis 数据流处理
Kinesis 流创建与配置
code
// lib/kinesis/stream-manager.ts
import {
KinesisClient,
CreateStreamCommand,
DescribeStreamCommand,
PutRecordCommand,
PutRecordsCommand,
} from '@aws-sdk/client-kinesis';
const kinesisClient = new KinesisClient({ region: process.env.AWS_REGION });
export interface StreamConfig {
streamName: string;
shardCount: number;
retentionPeriodHours: number;
}
export class KinesisStreamManager {
/**
* 创建 Kinesis 数据流
*/
static async createStream(config: StreamConfig): Promise<void> {
await kinesisClient.send(
new CreateStreamCommand({
StreamName: config.streamName,
ShardCount: config.shardCount,
})
);
// 等待流变为 ACTIVE 状态
await this.waitForStreamActive(config.streamName);
}
/**
* 发布单条记录到流
*/
static async putRecord(
streamName: string,
partitionKey: string,
data: any
): Promise<string> {
const result = await kinesisClient.send(
new PutRecordCommand({
StreamName: streamName,
PartitionKey: partitionKey,
Data: JSON.stringify(data),
})
);
return result.SequenceNumber || '';
}
/**
* 批量发布记录到流
*/
static async putRecords(
streamName: string,
records: Array<{ partitionKey: string; data: any }>
): Promise<{ successCount: number; failedCount: number }> {
const kinesisRecords = records.map((r) => ({
PartitionKey: r.partitionKey,
Data: JSON.stringify(r.data),
}));
const result = await kinesisClient.send(
new PutRecordsCommand({
StreamName: streamName,
Records: kinesisRecords,
})
);
const failedCount = result.Records?.filter(
(r) => r.ErrorCode !== undefined
).length || 0;
return {
successCount: records.length - failedCount,
failedCount,
};
}
private static async waitForStreamActive(
streamName: string,
maxWaitSeconds = 300
): Promise<void> {
const startTime = Date.now();
while (Date.now() - startTime < maxWaitSeconds * 1000) {
const response = await kinesisClient.send(
new DescribeStreamCommand({ StreamName: streamName })
);
if (response.StreamDescription?.StreamStatus === 'ACTIVE') {
return;
}
await new Promise((resolve) => setTimeout(resolve, 5000));
}
throw new Error(`流 ${streamName} 未在预期时间内变为 ACTIVE`);
}
}
// 初始化流
export const HEALTH_METRICS_STREAM = 'health-metrics-stream';
export const HEALTH_ALERTS_STREAM = 'health-alerts-stream';
export const DEVICE_STATUS_STREAM = 'device-status-stream';
Code collapsed
IoT Core 到 Kinesis 路由规则
code
-- IoT Core 规则 SQL
-- 规则: route_health_metrics_to_kinesis
SELECT
*,
topic(3) as deviceType,
clientId() as deviceId
FROM 'health/+/metrics'
WHERE userId IS NOT NULL AND tenantId IS NOT NULL
Code collapsed
code
// 规则操作配置
const ruleActions = [
{
kinesis: {
streamName: HEALTH_METRICS_STREAM,
partitionKey: '${tenantId}:${userId}',
roleArn: process.env.IOT_KINESIS_ROLE_ARN,
},
},
];
Code collapsed
3. Lambda 函数实现
数据清洗与转换 Lambda
code
// lambdas/process-health-metrics/index.ts
import {
KinesisStreamEvent,
KinesisStreamHandler,
} from 'aws-lambda';
import { TimestreamWriteClient, WriteRecordsCommand } from '@aws-sdk/client-timestream-write';
const timestreamClient = new TimestreamWriteClient({
region: process.env.AWS_REGION,
});
export const handler: KinesisStreamHandler = async (event) => {
const recordsToWrite: any[] = [];
for (const record of event.Records) {
try {
// 解析 Kinesis 记录
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
// 数据验证
const validated = validateHealthMetric(payload);
if (!validated.isValid) {
console.error('无效的健康指标:', validated.errors);
continue;
}
// 数据清洗
const cleaned = cleanHealthMetric(payload);
// 数据转换
const timestreamRecords = convertToTimestreamRecords(cleaned);
recordsToWrite.push(...timestreamRecords);
} catch (error) {
console.error('处理记录时出错:', error);
}
}
// 批量写入 Timestream
if (recordsToWrite.length > 0) {
await writeTimestreamRecords(recordsToWrite);
}
};
/**
* 验证健康指标数据
*/
function validateHealthMetric(data: any): {
isValid: boolean;
errors: string[];
} {
const errors: string[] = [];
// 必填字段验证
if (!data.userId) errors.push('缺少 userId');
if (!data.tenantId) errors.push('缺少 tenantId');
if (!data.timestamp) errors.push('缺少 timestamp');
// 数值范围验证
if (data.metrics?.heartRate) {
if (data.metrics.heartRate < 30 || data.metrics.heartRate > 220) {
errors.push('心率超出正常范围 (30-220 BPM)');
}
}
if (data.metrics?.bloodPressure?.systolic) {
if (data.metrics.bloodPressure.systolic < 60 || data.metrics.bloodPressure.systolic > 250) {
errors.push('收缩压超出正常范围');
}
}
if (data.metrics?.spo2) {
if (data.metrics.spo2 < 70 || data.metrics.spo2 > 100) {
errors.push('血氧饱和度超出正常范围');
}
}
return {
isValid: errors.length === 0,
errors,
};
}
/**
* 清洗数据
*/
function cleanHealthMetric(data: any) {
return {
userId: String(data.userId),
tenantId: String(data.tenantId),
deviceId: String(data.deviceId),
timestamp: Math.floor(data.timestamp / 1000), // 转换为秒
metrics: {
heartRate: data.metrics?.heartRate || null,
systolicBP: data.metrics?.bloodPressure?.systolic || null,
diastolicBP: data.metrics?.bloodPressure?.diastolic || null,
bloodGlucose: data.metrics?.bloodGlucose || null,
steps: data.metrics?.steps || null,
temperature: data.metrics?.temperature || null,
spo2: data.metrics?.spo2 || null,
},
battery: data.battery || null,
receivedAt: Math.floor(data.receivedAt / 1000),
};
}
/**
* 转换为 Timestream 记录格式
*/
function convertToTimestreamRecords(data: any) {
const database = 'health_metrics_db';
const table = 'user_metrics';
const records = [];
// 为每个非空指标创建记录
const metricMap = {
heartRate: 'heart_rate',
systolicBP: 'systolic_bp',
diastolicBP: 'diastolic_bp',
bloodGlucose: 'blood_glucose',
steps: 'steps',
temperature: 'temperature',
spo2: 'spo2',
};
for (const [key, value] of Object.entries(metricMap)) {
if (data.metrics[key] !== null && data.metrics[key] !== undefined) {
records.push({
Dimensions: [
{ Name: 'user_id', Value: data.userId },
{ Name: 'tenant_id', Value: data.tenantId },
{ Name: 'device_id', Value: data.deviceId },
{ Name: 'metric_type', Value: value },
],
MeasureName: 'value',
MeasureValue: data.metrics[key].toString(),
MeasureValueType: 'DOUBLE',
Time: data.timestamp.toString(),
TimeUnit: 'SECONDS',
});
}
}
// 添加电池状态记录
if (data.battery !== null) {
records.push({
Dimensions: [
{ Name: 'user_id', Value: data.userId },
{ Name: 'tenant_id', Value: data.tenantId },
{ Name: 'device_id', Value: data.deviceId },
{ Name: 'metric_type', Value: 'battery' },
],
MeasureName: 'percentage',
MeasureValue: data.battery.toString(),
MeasureValueType: 'DOUBLE',
Time: data.timestamp.toString(),
TimeUnit: 'SECONDS',
});
}
return records.map((r) => ({ ...r, Database: database, Table: table }));
}
/**
* 写入 Timestream
*/
async function writeTimestreamRecords(records: any[]) {
// Timestream 批量写入限制为 100 条记录
const batchSize = 100;
for (let i = 0; i < records.length; i += batchSize) {
const batch = records.slice(i, i + batchSize);
await timestreamClient.send(
new WriteRecordsCommand({
DatabaseName: batch[0].Database,
TableName: batch[0].Table,
Records: batch,
})
);
}
}
Code collapsed
异常检测与告警 Lambda
code
// lambdas/detect-health-alerts/index.ts
import { KinesisStreamHandler } from 'aws-lambda';
import { PublishCommand, SNSClient } from '@aws-sdk/client-sns';
const snsClient = new SNSClient({ region: process.env.AWS_REGION });
interface HealthAlert {
userId: string;
tenantId: string;
alertType: 'critical' | 'warning' | 'info';
category: string;
message: string;
value: number;
threshold: number;
timestamp: number;
}
export const handler: KinesisStreamHandler = async (event) => {
const alerts: HealthAlert[] = [];
for (const record of event.Records) {
try {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
// 执行各种检测
alerts.push(...detectHeartRateAlerts(payload));
alerts.push(...detectBloodPressureAlerts(payload));
alerts.push(...detectBloodGlucoseAlerts(payload));
alerts.push(...detectSpo2Alerts(payload));
alerts.push(...detectDeviceOfflineAlerts(payload));
} catch (error) {
console.error('检测异常时出错:', error);
}
}
// 发送告警
for (const alert of alerts) {
await sendAlert(alert);
}
};
/**
* 检测心率异常
*/
function detectHeartRateAlerts(data: any): HealthAlert[] {
const alerts: HealthAlert[] = [];
const heartRate = data.metrics?.heartRate;
if (!heartRate) return alerts;
// 严重异常
if (heartRate > 180 || heartRate < 40) {
alerts.push({
userId: data.userId,
tenantId: data.tenantId,
alertType: 'critical',
category: 'heart_rate',
message: heartRate > 180
? '心率过高!请立即就医'
: '心率过低!请立即就医',
value: heartRate,
threshold: heartRate > 180 ? 180 : 40,
timestamp: data.timestamp,
});
}
// 警告
else if (heartRate > 120 || heartRate < 50) {
alerts.push({
userId: data.userId,
tenantId: data.tenantId,
alertType: 'warning',
category: 'heart_rate',
message: `心率异常 (${heartRate} BPM),请留意`,
value: heartRate,
threshold: heartRate > 120 ? 120 : 50,
timestamp: data.timestamp,
});
}
return alerts;
}
/**
* 检测血压异常
*/
function detectBloodPressureAlerts(data: any): HealthAlert[] {
const alerts: HealthAlert[] = [];
const bp = data.metrics?.bloodPressure;
if (!bp?.systolic || !bp?.diastolic) return alerts;
// 高血压危机
if (bp.systolic > 180 || bp.diastolic > 120) {
alerts.push({
userId: data.userId,
tenantId: data.tenantId,
alertType: 'critical',
category: 'blood_pressure',
message: '血压过高!高血压危机,请立即就医',
value: bp.systolic,
threshold: 180,
timestamp: data.timestamp,
});
}
// 高血压 2 期
else if (bp.systolic > 140 || bp.diastolic > 90) {
alerts.push({
userId: data.userId,
tenantId: data.tenantId,
alertType: 'warning',
category: 'blood_pressure',
message: `血压偏高 (${bp.systolic}/${bp.diastolic} mmHg)`,
value: bp.systolic,
threshold: 140,
timestamp: data.timestamp,
});
}
return alerts;
}
/**
* 检测血氧异常
*/
function detectSpo2Alerts(data: any): HealthAlert[] {
const alerts: HealthAlert[] = [];
const spo2 = data.metrics?.spo2;
if (!spo2) return alerts;
if (spo2 < 88) {
alerts.push({
userId: data.userId,
tenantId: data.tenantId,
alertType: 'critical',
category: 'spo2',
message: '血氧饱和度过低!请立即就医',
value: spo2,
threshold: 88,
timestamp: data.timestamp,
});
} else if (spo2 < 92) {
alerts.push({
userId: data.userId,
tenantId: data.tenantId,
alertType: 'warning',
category: 'spo2',
message: `血氧饱和度偏低 (${spo2}%)`,
value: spo2,
threshold: 92,
timestamp: data.timestamp,
});
}
return alerts;
}
/**
* 检测设备离线
*/
function detectDeviceOfflineAlerts(data: any): HealthAlert[] {
const alerts: HealthAlert[] = [];
const lastSeen = data.timestamp;
const now = Date.now();
const offlineThreshold = 24 * 60 * 60 * 1000; // 24 小时
if (now - lastSeen > offlineThreshold) {
alerts.push({
userId: data.userId,
tenantId: data.tenantId,
alertType: 'warning',
category: 'device_offline',
message: `设备 ${data.deviceId} 已超过 24 小时未同步数据`,
value: now - lastSeen,
threshold: offlineThreshold,
timestamp: now,
});
}
return alerts;
}
/**
* 发送告警通知
*/
async function sendAlert(alert: HealthAlert): Promise<void> {
// 发布到 SNS 主题
await snsClient.send(
new PublishCommand({
TopicArn: process.env.ALERTS_SNS_TOPIC_ARN,
Message: JSON.stringify(alert),
Subject: `[${alert.alertType.toUpperCase()}] ${alert.category}`,
MessageAttributes: {
alert_type: {
DataType: 'String',
StringValue: alert.alertType,
},
user_id: {
DataType: 'String',
StringValue: alert.userId,
},
tenant_id: {
DataType: 'String',
StringValue: alert.tenantId,
},
},
})
);
}
Code collapsed
数据归档 Lambda
code
// lambdas/archive-health-data/index.ts
import {
KinesisStreamHandler,
ScheduledHandler,
} from 'aws-lambda';
import {
S3Client,
PutObjectCommand,
} from '@aws-sdk/client-s3';
const s3Client = new S3Client({ region: process.env.AWS_REGION });
/**
* 实时流数据归档
*/
export const streamHandler: KinesisStreamHandler = async (event) => {
const recordsByDate: Map<string, any[]> = new Map();
for (const record of event.Records) {
try {
const payload = JSON.parse(
Buffer.from(record.kinesis.data, 'base64').toString('utf-8')
);
// 按日期分组
const date = new Date(payload.timestamp);
const dateKey = `${date.getFullYear()}/${String(date.getMonth() + 1).padStart(2, '0')}/${String(date.getDate()).padStart(2, '0')}`;
if (!recordsByDate.has(dateKey)) {
recordsByDate.set(dateKey, []);
}
recordsByDate.get(dateKey)!.push(payload);
} catch (error) {
console.error('归档记录时出错:', error);
}
}
// 按日期写入 S3
for (const [dateKey, records] of recordsByDate.entries()) {
await archiveToS3(dateKey, records);
}
};
/**
* 定期批量归档(从 Timestream 导出到 S3)
*/
export const scheduledHandler: ScheduledHandler = async (event) => {
const yesterday = new Date();
yesterday.setDate(yesterday.getDate() - 1);
yesterday.setHours(0, 0, 0, 0);
const today = new Date();
today.setHours(0, 0, 0, 0);
// 从 Timestream 查询数据
const data = await queryTimestreamData(yesterday.getTime(), today.getTime());
// 按租户分组
const dataByTenant = groupByTenant(data);
// 写入 S3(按租户隔离)
for (const [tenantId, records] of Object.entries(dataByTenant)) {
await archiveToS3(
`tenant/${tenantId}/${formatDate(yesterday)}`,
records
);
}
};
/**
* 归档到 S3
*/
async function archiveToS3(key: string, data: any[]): Promise<void> {
await s3Client.send(
new PutObjectCommand({
Bucket: process.env.HEALTH_DATA_ARCHIVE_BUCKET,
Key: `health-data/${key}.json`,
Body: JSON.stringify(data, null, 2),
ContentType: 'application/json',
ServerSideEncryption: 'AES256',
Metadata: {
recordCount: data.length.toString(),
archivedAt: new Date().toISOString(),
},
})
);
}
/**
* 查询 Timestream 数据
*/
async function queryTimestreamData(
startTime: number,
endTime: number
): Promise<any[]> {
// 实现略 - 使用 Timestream Query API
return [];
}
/**
* 按租户分组
*/
function groupByTenant(data: any[]): Record<string, any[]> {
return data.reduce((acc, record) => {
const tenantId = record.tenantId || 'unknown';
if (!acc[tenantId]) {
acc[tenantId] = [];
}
acc[tenantId].push(record);
return acc;
}, {} as Record<string, any[]>);
}
function formatDate(date: Date): string {
return `${date.getFullYear()}-${String(date.getMonth() + 1).padStart(2, '0')}-${String(date.getDate()).padStart(2, '0')}`;
}
Code collapsed
4. 数据库设计
Timestream 表结构
code
-- 创建数据库
CREATE DATABASE IF NOT EXISTS health_metrics_db;
-- 创建用户指标表
CREATE TABLE IF NOT EXISTS user_metrics (
user_id VARCHAR(50),
tenant_id VARCHAR(50),
device_id VARCHAR(100),
metric_type VARCHAR(50),
time TIMESTAMP,
value DOUBLE
)
WITH (
amazonkinesis.tagging = '{"Environment": "Production", "Application": "HealthMonitor"}'
);
-- 创建告警表
CREATE TABLE IF NOT EXISTS health_alerts (
user_id VARCHAR(50),
tenant_id VARCHAR(50),
alert_type VARCHAR(20),
category VARCHAR(50),
time TIMESTAMP,
severity INTEGER,
message VARCHAR(500)
);
Code collapsed
DynamoDB 设备状态表
code
// lib/dynamodb/device-state.ts
import {
DynamoDBClient,
PutItemCommand,
UpdateItemCommand,
GetItemCommand,
} from '@aws-sdk/client-dynamodb';
const ddbClient = new DynamoDBClient({ region: process.env.AWS_REGION });
export interface DeviceState {
deviceId: string;
userId: string;
tenantId: string;
lastSeen: number;
batteryLevel: number;
firmwareVersion: string;
status: 'online' | 'offline' | 'low_battery';
}
export class DeviceStateManager {
private static tableName = 'device_states';
/**
* 更新设备状态
*/
static async updateState(state: Partial<DeviceState> & { deviceId: string }): Promise<void> {
await ddbClient.send(
new UpdateItemCommand({
TableName: this.tableName,
Key: {
device_id: { S: state.deviceId },
},
UpdateExpression: `
SET last_seen = :lastSeen,
battery_level = :battery,
status = :status
`,
ExpressionAttributeValues: {
':lastSeen': { N: String(Date.now()) },
':battery': { N: String(state.batteryLevel || 0) },
':status': { S: this.determineStatus(state.batteryLevel || 0) },
},
})
);
}
/**
* 获取设备状态
*/
static async getState(deviceId: string): Promise<DeviceState | null> {
const result = await ddbClient.send(
new GetItemCommand({
TableName: this.tableName,
Key: {
device_id: { S: deviceId },
},
})
);
if (!result.Item) {
return null;
}
return {
deviceId: result.Item.device_id.S!,
userId: result.Item.user_id.S!,
tenantId: result.Item.tenant_id.S!,
lastSeen: parseInt(result.Item.last_seen.N!),
batteryLevel: parseInt(result.Item.battery_level.N!),
firmwareVersion: result.Item.firmware_version.S!,
status: result.Item.status.S! as any,
};
}
private static determineStatus(batteryLevel: number): 'online' | 'offline' | 'low_battery' {
if (batteryLevel < 20) return 'low_battery';
return 'online';
}
}
Code collapsed
5. 监控与告警
CloudWatch 自定义指标
code
// lib/cloudwatch/metrics.ts
import {
CloudWatchClient,
PutMetricDataCommand,
} from '@aws-sdk/client-cloudwatch';
const cloudWatch = new CloudWatchClient({ region: process.env.AWS_REGION });
export class CloudWatchMetrics {
/**
* 记录处理延迟指标
*/
static async recordProcessingLatency(
streamName: string,
latencyMs: number
): Promise<void> {
await cloudWatch.send(
new PutMetricDataCommand({
Namespace: 'HealthMonitor/Processing',
MetricData: [
{
MetricName: 'ProcessingLatency',
Dimensions: [
{ Name: 'StreamName', Value: streamName },
],
Value: latencyMs,
Unit: 'Milliseconds',
StorageResolution: 60,
},
],
})
);
}
/**
* 记录告警数量
*/
static async recordAlertCount(
alertType: string,
tenantId: string,
count: number
): Promise<void> {
await cloudWatch.send(
new PutMetricDataCommand({
Namespace: 'HealthMonitor/Alerts',
MetricData: [
{
MetricName: 'AlertCount',
Dimensions: [
{ Name: 'AlertType', Value: alertType },
{ Name: 'TenantId', Value: tenantId },
],
Value: count,
Unit: 'Count',
},
],
})
);
}
/**
* 记录设备状态
*/
static async recordDeviceStatus(
deviceType: string,
status: 'online' | 'offline'
): Promise<void> {
await cloudWatch.send(
new PutMetricDataCommand({
Namespace: 'HealthMonitor/Devices',
MetricData: [
{
MetricName: 'DeviceStatus',
Dimensions: [
{ Name: 'DeviceType', Value: deviceType },
{ Name: 'Status', Value: status },
],
Value: 1,
Unit: 'Count',
},
],
})
);
}
}
Code collapsed
合规检查清单
HIPAA 合规
- 传输中数据加密(TLS 1.3)
- 静态数据加密(S3 SSE、Timestream 加密)
- 设备认证(X.509 证书)
- 访问控制(IAM 角色、策略)
- 审计日志(CloudTrail、DynamoDB Streams)
- 业务连续性(多 AZ 部署)
数据安全
- API 数据验证
- 异常检测与告警
- 数据保留策略
- 安全数据删除
- 定期备份
运维监控
- Lambda 函数监控
- Kinesis 流监控
- 设备连接状态监控
- 成本监控与优化
参考资料
免责声明:本文提供的架构仅供参考。在生产环境中处理健康数据前,请确保符合 HIPAA、GDPR 等法规要求,并进行全面的安全审计。