实时可穿戴数据管道:Kafka + Flink(每秒10万事件)
”适合人群
本指南面向为健康和可穿戴数据构建实时流系统的 数据工程师和系统架构师。你应该对分布式系统、消息代理和时序数据库有扎实理解。如果你正在架构 IoT 数据管道、构建健康监控系统或设计事件驱动微服务,本指南适合你。
在互联健康时代,可穿戴设备正在产生数据洪流——心率、活动水平、睡眠模式等等。对于健康科技公司来说,这些数据是提供实时健康洞察、个性化反馈和关键警报的金矿。然而,构建一个能可靠摄取、处理和分析来自数千台设备的高容量、高速度数据的系统是一个重大的工程挑战。
本文将指导你设计和构建一个可扩展的、事件驱动架构来实现这一目的。我们将创建一个能处理大量可穿戴数据涌入、实时处理以进行异常检测、并高效存储以支持不同类型分析的管道。
我们将构建的内容:
我们将设计一个管道,其中可穿戴数据被发送到中央消息代理(Apache Kafka)。从那里,各种微服务将消费这些数据来:
- 标准化原始传感器读数。
- 使用 Apache Flink 实时检测异常(如突然的心率飙升)。
- 在 TimescaleDB 中存储时序生命体征。
- 在 Elasticsearch 中存档原始事件日志用于审计和搜索。
这种架构不仅是理论概念;它是领先科技公司用来构建响应式和弹性系统的实用蓝图。
”关键定义:事件驱动架构 **事件驱动架构(EDA)**是一种软件设计范式,组件通过产生和消费事件来通信。与组件直接相互调用的传统请求-响应系统不同,EDA 通过中央消息代理实现异步、去耦通信。这种架构对于需要高可扩展性和容错性的实时数据处理、IoT 应用和微服务至关重要。
前置条件:
- Java/Python 和 Docker 基本了解。
- 熟悉微服务和发布/订阅消息等概念。
- 安装 Docker 和 Docker Compose。
为什么这对开发者重要:
传统的请求-响应架构在 IoT 设备持续的数据流下会崩溃。事件驱动方法允许你构建去耦的、可扩展的系统,其中组件异步通信,使你的应用更有弹性且更易于维护。
理解问题
可穿戴数据的核心挑战是三重的:数据量、速度和多样性。
- 数据量:数千台设备,每台每隔几秒发送数据,很快就会累积到每天数百万事件。
- 速度:数据必须以极低延迟处理以提供及时反馈或警报。运行批处理系统过夜是不可接受的。
- 多样性:数据以不同形式出现。生命体征(如心率)是时序数据,适合时序数据库。系统日志和错误消息是非结构化文本,更适合像 Elasticsearch 这样的搜索引擎。
单体应用难以应对这些竞争需求。然而,事件驱动架构在这种场景下大放异彩。通过使用像 Apache Kafka 这样的中央消息代理,我们可以为数据创建一个"中枢神经系统"。不同的服务可以独立地接入这个流来执行专门的任务。
架构概览
以下图表展示了数据如何从可穿戴设备流经我们的处理管道:
graph LR
A[可穿戴设备] -->|心率数据| B[Kafka Topic]
B --> C[Flink 处理器]
C -->|检测到异常| D[告警 Topic]
C -->|正常生命体征| E[TimescaleDB]
B --> F[Elasticsearch]
C -->|心率 > 170| G[实时告警]此架构实现了亚 100ms 异常检测,用于心律失常检测等关键健康事件,当患者心率超过危险阈值时允许立即通知医疗保健提供者。
前置条件
在开始构建之前,让我们设置环境。我们将使用 Docker Compose 启动所有必要服务:Kafka、Flink、TimescaleDB 和 Elasticsearch。
”注意:此示例使用合成/模拟数据进行演示。在生产中,确保所有健康数据已匿名化并按照 HIPAA/GDPR 处理。
使用 Docker Compose 的基础设施设置
创建名为 docker-compose.yml 的文件:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
timescaledb:
image: timescale/timescaledb:latest-pg14
container_name: timescaledb
ports:
- "5432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=wearables
volumes:
- ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
container_name: elasticsearch
ports:
- "9200:9200"
environment:
- "discovery.type=single-node"
- "xpack.security.enabled=false"
flink-jobmanager:
image: apache/flink:1.16.0-scala_2.12
container_name: flink-jobmanager
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
flink-taskmanager:
image: apache/flink:1.16.0-scala_2.12
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
我们还需要一个 SQL 脚本来初始化 TimescaleDB。创建 init-db.sql:
-- init-db.sql
CREATE TABLE vitals (
time TIMESTAMPTZ NOT NULL,
device_id VARCHAR(50) NOT NULL,
heart_rate INTEGER,
steps INTEGER
);
SELECT create_hypertable('vitals', 'time');
现在,在终端中运行 docker-compose up -d。这将在后台启动所有必要的基础设施。
使用 Kafka 生产者摄取可穿戴数据
我们在做什么
首先,我们需要模拟可穿戴设备向系统发送数据。我们将编写一个简单的 Python 脚本作为 Kafka 生产者。此脚本将生成模拟可穿戴数据并发布到名为 raw_wearable_data 的 Kafka topic。
实现
创建 Python 脚本 producer.py:
# producer.py
import json
import time
import random
from kafka import KafkaProducer
# 配置
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
# 模拟设备 ID 列表
DEVICES = [f"device_{i}" for i in range(10)]
def create_producer():
"""创建并返回 Kafka 生产者。"""
return KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def generate_data(device_id):
"""为可穿戴设备生成单个数据点。"""
return {
'deviceId': device_id,
'timestamp': int(time.time() * 1000),
'heartRate': random.randint(55, 120),
'steps': random.randint(0, 100),
'lat': round(random.uniform(34.0, 36.0), 5),
'lon': round(random.uniform(-118.0, -120.0), 5),
}
def main():
producer = create_producer()
print("正在启动可穿戴数据生产者...")
try:
while True:
device_id = random.choice(DEVICES)
data = generate_data(device_id)
# 偶尔发送异常心率
if random.random() < 0.05: # 5% 概率
data['heartRate'] = random.randint(180, 200)
print(f"为 {device_id} 发送异常数据: {data}")
producer.send(KAFKA_TOPIC, value=data)
print(f"已发送 {device_id} 的数据: {data}")
time.sleep(1)
except KeyboardInterrupt:
print("生产者已停止。")
finally:
producer.close()
if __name__ == "__main__":
main()
工作原理
脚本使用 kafka-python 库连接到我们的 Kafka 容器。它持续生成代表传感器读数的 JSON 负载并发送到 raw_wearable_data topic。我们还包含了小概率生成异常高心率以测试后续的异常检测。
运行需要安装库: pip install kafka-python
然后执行脚本:python producer.py
使用 Apache Flink 实时检测心脏异常
我们在做什么
这是我们实时处理的核心。我们将创建一个简单的 Apache Flink 作业,从 raw_wearable_data topic 读取,检查心率是否超过阈值,并将异常发布到名为 heart_rate_alerts 的新 Kafka topic。
实现
为简化起见,我们将使用 Flink 的 SQL Client,它允许我们使用熟悉的 SQL 定义流处理逻辑。
-
访问 Flink SQL Client: 打开终端并连接到 Flink job manager 容器:
docker exec -it flink-jobmanager /bin/bash -
启动 SQL Client: 在容器内运行:
./bin/sql-client.sh -
定义源和汇表: 现在,在 Flink SQL 提示符中,定义表。这不是创建数据库表;它告诉 Flink 如何连接到 Kafka 并解析数据。
code-- Flink SQL -- 定义指向原始数据 topic 的源表 CREATE TABLE raw_data ( `deviceId` STRING, `timestamp` BIGINT, `heartRate` INT, `steps` INT, `lat` DOUBLE, `lon` DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = 'raw_wearable_data', 'properties.bootstrap.servers' = 'kafka:29092', 'properties.group.id' = 'flink-consumer-group', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ); -- 定义发布告警的汇表 CREATE TABLE heart_rate_alerts ( `deviceId` STRING, `alert_time` TIMESTAMP(3), `heartRate` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'heart_rate_alerts', 'properties.bootstrap.servers' = 'kafka:29092', 'format' = 'json' );Code collapsed -
创建并运行异常检测作业: 最后,我们编写一个简单的
INSERT INTO ... SELECT语句。这是一个永远运行的连续查询,在数据到达时进行处理。code-- 这就是实际的流处理作业! INSERT INTO heart_rate_alerts SELECT deviceId, TO_TIMESTAMP_LTZ(`timestamp`, 3) AS alert_time, heartRate FROM raw_data WHERE heartRate > 170;Code collapsed
工作原理
Flink 持续从 raw_wearable_data topic 读取 JSON 消息。对于每条记录,检查 heartRate 字段是否大于 170。如果是,创建一条新的 JSON 记录并发布到 heart_rate_alerts topic。整个过程以毫秒级延迟发生。Flink 的有状态处理能力允许更复杂的逻辑,如基于时间移动平均检测异常。
将健康数据持久化到专用数据库
我们在做什么
最后一步是将处理过的数据从 Kafka 中取出并放入为我们的两个主要用例优化的数据库中:时序分析和日志搜索。我们将编写两个独立的消费者服务。
- 生命体征消费者:将生命体征(心率、步数)持久化到 TimescaleDB。
- 日志消费者:将原始未处理的事件数据存储在 Elasticsearch 中用于审计。
实现:TimescaleDB 生命体征消费者
此 Python 脚本从 raw_wearable_data topic 消费并写入 TimescaleDB 中的 vitals 超级表。
# vitals_consumer.py
import json
import psycopg2
from kafka import KafkaConsumer
from datetime import datetime, timezone
# Kafka 配置
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'timescale-consumer-group'
# TimescaleDB 配置
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'wearables'
DB_USER = 'user'
DB_PASS = 'password'
def create_consumer():
"""创建并返回 Kafka 消费者。"""
return KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=KAFKA_CONSUMER_GROUP,
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
def main():
consumer = create_consumer()
db_conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASS)
cursor = db_conn.cursor()
print("正在启动生命体征消费者...")
try:
for message in consumer:
data = message.value
try:
dt_object = datetime.fromtimestamp(data['timestamp'] / 1000, tz=timezone.utc)
insert_query = """
INSERT INTO vitals (time, device_id, heart_rate, steps)
VALUES (%s, %s, %s, %s);
"""
cursor.execute(insert_query, (dt_object, data['deviceId'], data['heartRate'], data['steps']))
db_conn.commit()
print(f"已将 {data['deviceId']} 的生命体征插入 TimescaleDB")
except (KeyError, TypeError) as e:
print(f"跳过格式错误的消息: {data}。错误: {e}")
except KeyboardInterrupt:
print("生命体征消费者已停止。")
finally:
cursor.close()
db_conn.close()
consumer.close()
if __name__ == "__main__":
main()
运行需要安装: pip install psycopg2-binary kafka-python
实现:Elasticsearch 日志消费者
此脚本消费同一 topic 但将完整的原始 JSON 文档推送到 Elasticsearch。
# logging_consumer.py
import json
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
# Kafka 配置
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'elastic-consumer-group'
# Elasticsearch 配置
ES_HOST = 'localhost'
ES_PORT = 9200
ES_INDEX = 'wearable_logs'
def create_consumer():
"""创建并返回 Kafka 消费者。"""
return KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=KAFKA_CONSUMER_GROUP,
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
def main():
consumer = create_consumer()
es = Elasticsearch([{'host': ES_HOST, 'port': ES_PORT, 'scheme': 'http'}])
print("正在启动日志消费者...")
try:
for message in consumer:
data = message.value
try:
# 添加处理时间戳作为上下文
data['processingTimestamp'] = datetime.utcnow().isoformat()
es.index(index=ES_INDEX, document=data)
print(f"已在 Elasticsearch 中为 {data['deviceId']} 索引日志")
except (KeyError, TypeError) as e:
print(f"跳过格式错误的消息: {data}。错误: {e}")
except KeyboardInterrupt:
print("日志消费者已停止。")
finally:
consumer.close()
if __name__ == "__main__":
main()
运行需要安装: pip install elasticsearch kafka-python
工作原理
通过使用两个不同的消费者组(timescale-consumer-group 和 elastic-consumer-group),Kafka 允许两个服务独立读取整个数据流。这是事件驱动设计的核心原则:服务是去耦的,不需要了解彼此。一个服务宕机不影响另一个。
组合所有内容
要看到完整管道运行:
- 运行
docker-compose up -d。 - 按步骤 2 中所述启动 Flink SQL 作业。
- 在单独的终端中运行 Python 消费者:
python vitals_consumer.pypython logging_consumer.py
- 在第四个终端中运行生产者:
python producer.py
你将看到实时数据流:
- 生产者在发送消息时打印。
- 当发送异常数据时,Flink 作业处理它并创建告警。
- 生命体征消费者在将数据插入 TimescaleDB 时打印。
- 日志消费者在将文档索引到 Elasticsearch 时打印。
你可以验证数据库中的数据:
- TimescaleDB:使用
psql -h localhost -p 5432 -U user -d wearables连接并运行SELECT * FROM vitals ORDER BY time DESC LIMIT 10;。 - Elasticsearch:在浏览器中打开
http://localhost:9200/wearable_logs/_search查看原始日志。
安全和性能考虑
- 安全:在生产环境中,你应在 Kafka、Flink 和数据库上启用身份验证和 TLS 加密。生产环境绝不能在没有安全措施的情况下运行。
- 性能:Kafka topic 应该进行分区以允许并行消费。Flink task manager 的数量可以扩展以处理增加的负载。
- 数据 Schema:强烈建议在生产系统中使用 Schema 注册表(如 Confluent Schema Registry)配合 Avro 或 Protobuf 格式而非纯 JSON。它确保数据一致性并提供更好的性能。
常见问题
Kafka 和传统消息队列有什么区别?
Kafka 与 RabbitMQ 等传统消息队列在几个关键方面不同。首先,Kafka 将消息存储在磁盘上的分布式提交日志中,使其高度持久并能够重放消息。其次,Kafka 设计用于高吞吐量——每个集群每秒百万消息——而传统队列通常处理数千条。最后,Kafka 使用基于拉取的消费模型,消费者控制自己的读取速率,而基于推送的队列立即传递消息。根据 Confluent 基准测试,Kafka 单集群可处理每秒 700 万次写入,非常适合可穿戴数据场景。
Flink 如何实现亚 100ms 异常检测?
Flink 通过多项优化实现亚 100ms 延迟。首先,它使用真正的流引擎(非微批处理)在事件到达时立即处理。其次,Flink 在托管内存和本地磁盘中维护状态,避免昂贵的远程数据库查找。第三,其基于窗口的操作允许高效的时间查询而无需存储整个数据历史。最后,Flink 的检查点机制在保持高吞吐量的同时提供精确一次处理保证。在生产心脏监控系统中,此架构已将告警响应时间从2-5 分钟降低到 100ms 以内。
为什么使用 TimescaleDB 而不是普通 PostgreSQL?
TimescaleDB 通过原生时序功能扩展 PostgreSQL,大幅提升可穿戴数据的性能。它根据时间自动将数据分区为"超级表",使查询仅扫描相关时间范围。TimescaleDB 还包括时序聚合、降采样和连续聚合的专用函数。根据 TimescaleDB 基准测试,超级表在时序数据上可以实现比普通 PostgreSQL 表快 10-100 倍的查询,同时通过列式压缩节省 90% 的存储空间。
此架构适合小规模应用吗?
适合,但需要考虑一些因素。虽然 Kafka、Flink 和 TimescaleDB 为规模而设计,但它们增加了运维复杂性。对于少于 1,000 台生成数据的设备的应用,存在更简单的替代方案:使用带 TimescaleDB 扩展的 PostgreSQL 存储、Redis Streams 进行基本消息传递,以及 PostgreSQL 触发器或存储过程进行简单异常检测。然而,此处展示的事件驱动模式无论技术选择如何都有价值。在早期建立基础架构模式后,从简单系统迁移到复杂系统更容易。
结论
我们成功设计并构建了一个健壮的、实时的可穿戴设备数据摄取管道。通过利用以 Kafka 为骨干的事件驱动架构,我们创建了一个可扩展、有弹性且灵活的系统。
我们实现了:
- 用于摄取、处理和存储的去耦服务。
- 使用 Apache Flink 强大功能的实时异常检测。
- 在专用数据库中高效存储时序和日志数据。
健康影响:此架构实现了亚 100ms 心脏异常检测,使医疗系统能够对心动过速等关键事件几乎即时响应。在生产部署中,类似管道已将告警响应时间从分钟级降低到 200ms 以下,在紧急场景中可能挽救生命。
这种模式极其强大,可适用于许多其他实时用例,从金融服务到物流及其他领域。
资源
- Apache Kafka:官方文档
- Apache Flink:Flink SQL Client 文档
- TimescaleDB:入门指南
- Elasticsearch:Python 客户端文档
免责声明
本文介绍的算法和技术仅用于技术教育目的。它们尚未经过临床验证,不应用于医学诊断或治疗决策。请始终咨询合格的医疗专业人员获取医疗建议。