康心伴Logo
康心伴WellAlly
开发

实时可穿戴数据管道:Kafka + Flink(每秒10万事件)| WellAlly康心伴

5 分钟阅读

实时可穿戴数据管道:Kafka + Flink(每秒10万事件)

适合人群

本指南面向为健康和可穿戴数据构建实时流系统的 数据工程师和系统架构师。你应该对分布式系统、消息代理和时序数据库有扎实理解。如果你正在架构 IoT 数据管道、构建健康监控系统或设计事件驱动微服务,本指南适合你。


在互联健康时代,可穿戴设备正在产生数据洪流——心率、活动水平、睡眠模式等等。对于健康科技公司来说,这些数据是提供实时健康洞察、个性化反馈和关键警报的金矿。然而,构建一个能可靠摄取、处理和分析来自数千台设备的高容量、高速度数据的系统是一个重大的工程挑战。

本文将指导你设计和构建一个可扩展的、事件驱动架构来实现这一目的。我们将创建一个能处理大量可穿戴数据涌入、实时处理以进行异常检测、并高效存储以支持不同类型分析的管道。

我们将构建的内容:

我们将设计一个管道,其中可穿戴数据被发送到中央消息代理(Apache Kafka)。从那里,各种微服务将消费这些数据来:

  1. 标准化原始传感器读数。
  2. 使用 Apache Flink 实时检测异常(如突然的心率飙升)。
  3. 在 TimescaleDB 中存储时序生命体征。
  4. 在 Elasticsearch 中存档原始事件日志用于审计和搜索。

这种架构不仅是理论概念;它是领先科技公司用来构建响应式和弹性系统的实用蓝图。

关键定义:事件驱动架构 **事件驱动架构(EDA)**是一种软件设计范式,组件通过产生和消费事件来通信。与组件直接相互调用的传统请求-响应系统不同,EDA 通过中央消息代理实现异步、去耦通信。这种架构对于需要高可扩展性和容错性的实时数据处理、IoT 应用和微服务至关重要。

前置条件:

  • Java/Python 和 Docker 基本了解。
  • 熟悉微服务和发布/订阅消息等概念。
  • 安装 Docker 和 Docker Compose。

为什么这对开发者重要:

传统的请求-响应架构在 IoT 设备持续的数据流下会崩溃。事件驱动方法允许你构建去耦的、可扩展的系统,其中组件异步通信,使你的应用更有弹性且更易于维护。

理解问题

可穿戴数据的核心挑战是三重的:数据量速度多样性

  • 数据量:数千台设备,每台每隔几秒发送数据,很快就会累积到每天数百万事件。
  • 速度:数据必须以极低延迟处理以提供及时反馈或警报。运行批处理系统过夜是不可接受的。
  • 多样性:数据以不同形式出现。生命体征(如心率)是时序数据,适合时序数据库。系统日志和错误消息是非结构化文本,更适合像 Elasticsearch 这样的搜索引擎。

单体应用难以应对这些竞争需求。然而,事件驱动架构在这种场景下大放异彩。通过使用像 Apache Kafka 这样的中央消息代理,我们可以为数据创建一个"中枢神经系统"。不同的服务可以独立地接入这个流来执行专门的任务。

架构概览

以下图表展示了数据如何从可穿戴设备流经我们的处理管道:

正在渲染图表...
graph LR
    A[可穿戴设备] -->|心率数据| B[Kafka Topic]
    B --> C[Flink 处理器]
    C -->|检测到异常| D[告警 Topic]
    C -->|正常生命体征| E[TimescaleDB]
    B --> F[Elasticsearch]
    C -->|心率 > 170| G[实时告警]

此架构实现了亚 100ms 异常检测,用于心律失常检测等关键健康事件,当患者心率超过危险阈值时允许立即通知医疗保健提供者。

前置条件

在开始构建之前,让我们设置环境。我们将使用 Docker Compose 启动所有必要服务:Kafka、Flink、TimescaleDB 和 Elasticsearch。

注意:此示例使用合成/模拟数据进行演示。在生产中,确保所有健康数据已匿名化并按照 HIPAA/GDPR 处理。

使用 Docker Compose 的基础设施设置

创建名为 docker-compose.yml 的文件:

code
# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  timescaledb:
    image: timescale/timescaledb:latest-pg14
    container_name: timescaledb
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=wearables
    volumes:
      - ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
    container_name: elasticsearch
    ports:
      - "9200:9200"
    environment:
      - "discovery.type=single-node"
      - "xpack.security.enabled=false"

  flink-jobmanager:
    image: apache/flink:1.16.0-scala_2.12
    container_name: flink-jobmanager
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager

  flink-taskmanager:
    image: apache/flink:1.16.0-scala_2.12
    container_name: flink-taskmanager
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 2
Code collapsed

我们还需要一个 SQL 脚本来初始化 TimescaleDB。创建 init-db.sql

code
-- init-db.sql
CREATE TABLE vitals (
    time TIMESTAMPTZ NOT NULL,
    device_id VARCHAR(50) NOT NULL,
    heart_rate INTEGER,
    steps INTEGER
);

SELECT create_hypertable('vitals', 'time');
Code collapsed

现在,在终端中运行 docker-compose up -d。这将在后台启动所有必要的基础设施。

使用 Kafka 生产者摄取可穿戴数据

我们在做什么

首先,我们需要模拟可穿戴设备向系统发送数据。我们将编写一个简单的 Python 脚本作为 Kafka 生产者。此脚本将生成模拟可穿戴数据并发布到名为 raw_wearable_data 的 Kafka topic。

实现

创建 Python 脚本 producer.py

code
# producer.py
import json
import time
import random
from kafka import KafkaProducer

# 配置
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

# 模拟设备 ID 列表
DEVICES = [f"device_{i}" for i in range(10)]

def create_producer():
    """创建并返回 Kafka 生产者。"""
    return KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

def generate_data(device_id):
    """为可穿戴设备生成单个数据点。"""
    return {
        'deviceId': device_id,
        'timestamp': int(time.time() * 1000),
        'heartRate': random.randint(55, 120),
        'steps': random.randint(0, 100),
        'lat': round(random.uniform(34.0, 36.0), 5),
        'lon': round(random.uniform(-118.0, -120.0), 5),
    }

def main():
    producer = create_producer()
    print("正在启动可穿戴数据生产者...")
    try:
        while True:
            device_id = random.choice(DEVICES)
            data = generate_data(device_id)

            # 偶尔发送异常心率
            if random.random() < 0.05: # 5% 概率
                data['heartRate'] = random.randint(180, 200)
                print(f"为 {device_id} 发送异常数据: {data}")

            producer.send(KAFKA_TOPIC, value=data)
            print(f"已发送 {device_id} 的数据: {data}")
            time.sleep(1)
    except KeyboardInterrupt:
        print("生产者已停止。")
    finally:
        producer.close()

if __name__ == "__main__":
    main()
Code collapsed

工作原理

脚本使用 kafka-python 库连接到我们的 Kafka 容器。它持续生成代表传感器读数的 JSON 负载并发送到 raw_wearable_data topic。我们还包含了小概率生成异常高心率以测试后续的异常检测。

运行需要安装库: pip install kafka-python

然后执行脚本:python producer.py

使用 Apache Flink 实时检测心脏异常

我们在做什么

这是我们实时处理的核心。我们将创建一个简单的 Apache Flink 作业,从 raw_wearable_data topic 读取,检查心率是否超过阈值,并将异常发布到名为 heart_rate_alerts 的新 Kafka topic。

实现

为简化起见,我们将使用 Flink 的 SQL Client,它允许我们使用熟悉的 SQL 定义流处理逻辑。

  1. 访问 Flink SQL Client: 打开终端并连接到 Flink job manager 容器: docker exec -it flink-jobmanager /bin/bash

  2. 启动 SQL Client: 在容器内运行:./bin/sql-client.sh

  3. 定义源和汇表: 现在,在 Flink SQL 提示符中,定义表。这不是创建数据库表;它告诉 Flink 如何连接到 Kafka 并解析数据。

    code
    -- Flink SQL
    -- 定义指向原始数据 topic 的源表
    CREATE TABLE raw_data (
      `deviceId` STRING,
      `timestamp` BIGINT,
      `heartRate` INT,
      `steps` INT,
      `lat` DOUBLE,
      `lon` DOUBLE
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'raw_wearable_data',
      'properties.bootstrap.servers' = 'kafka:29092',
      'properties.group.id' = 'flink-consumer-group',
      'format' = 'json',
      'scan.startup.mode' = 'latest-offset'
    );
    
    -- 定义发布告警的汇表
    CREATE TABLE heart_rate_alerts (
      `deviceId` STRING,
      `alert_time` TIMESTAMP(3),
      `heartRate` INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'heart_rate_alerts',
      'properties.bootstrap.servers' = 'kafka:29092',
      'format' = 'json'
    );
    
    Code collapsed
  4. 创建并运行异常检测作业: 最后,我们编写一个简单的 INSERT INTO ... SELECT 语句。这是一个永远运行的连续查询,在数据到达时进行处理。

    code
    -- 这就是实际的流处理作业!
    INSERT INTO heart_rate_alerts
    SELECT
      deviceId,
      TO_TIMESTAMP_LTZ(`timestamp`, 3) AS alert_time,
      heartRate
    FROM raw_data
    WHERE heartRate > 170;
    
    Code collapsed

工作原理

Flink 持续从 raw_wearable_data topic 读取 JSON 消息。对于每条记录,检查 heartRate 字段是否大于 170。如果是,创建一条新的 JSON 记录并发布到 heart_rate_alerts topic。整个过程以毫秒级延迟发生。Flink 的有状态处理能力允许更复杂的逻辑,如基于时间移动平均检测异常。

将健康数据持久化到专用数据库

我们在做什么

最后一步是将处理过的数据从 Kafka 中取出并放入为我们的两个主要用例优化的数据库中:时序分析和日志搜索。我们将编写两个独立的消费者服务。

  1. 生命体征消费者:将生命体征(心率、步数)持久化到 TimescaleDB。
  2. 日志消费者:将原始未处理的事件数据存储在 Elasticsearch 中用于审计。

实现:TimescaleDB 生命体征消费者

此 Python 脚本从 raw_wearable_data topic 消费并写入 TimescaleDB 中的 vitals 超级表。

code
# vitals_consumer.py
import json
import psycopg2
from kafka import KafkaConsumer
from datetime import datetime, timezone

# Kafka 配置
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'timescale-consumer-group'

# TimescaleDB 配置
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'wearables'
DB_USER = 'user'
DB_PASS = 'password'

def create_consumer():
    """创建并返回 Kafka 消费者。"""
    return KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id=KAFKA_CONSUMER_GROUP,
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

def main():
    consumer = create_consumer()
    db_conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASS)
    cursor = db_conn.cursor()

    print("正在启动生命体征消费者...")
    try:
        for message in consumer:
            data = message.value
            try:
                dt_object = datetime.fromtimestamp(data['timestamp'] / 1000, tz=timezone.utc)

                insert_query = """
                INSERT INTO vitals (time, device_id, heart_rate, steps)
                VALUES (%s, %s, %s, %s);
                """
                cursor.execute(insert_query, (dt_object, data['deviceId'], data['heartRate'], data['steps']))
                db_conn.commit()

                print(f"已将 {data['deviceId']} 的生命体征插入 TimescaleDB")
            except (KeyError, TypeError) as e:
                print(f"跳过格式错误的消息: {data}。错误: {e}")

    except KeyboardInterrupt:
        print("生命体征消费者已停止。")
    finally:
        cursor.close()
        db_conn.close()
        consumer.close()

if __name__ == "__main__":
    main()
Code collapsed

运行需要安装: pip install psycopg2-binary kafka-python

实现:Elasticsearch 日志消费者

此脚本消费同一 topic 但将完整的原始 JSON 文档推送到 Elasticsearch。

code
# logging_consumer.py
import json
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch

# Kafka 配置
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'elastic-consumer-group'

# Elasticsearch 配置
ES_HOST = 'localhost'
ES_PORT = 9200
ES_INDEX = 'wearable_logs'

def create_consumer():
    """创建并返回 Kafka 消费者。"""
    return KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id=KAFKA_CONSUMER_GROUP,
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

def main():
    consumer = create_consumer()
    es = Elasticsearch([{'host': ES_HOST, 'port': ES_PORT, 'scheme': 'http'}])

    print("正在启动日志消费者...")
    try:
        for message in consumer:
            data = message.value
            try:
                # 添加处理时间戳作为上下文
                data['processingTimestamp'] = datetime.utcnow().isoformat()
                es.index(index=ES_INDEX, document=data)
                print(f"已在 Elasticsearch 中为 {data['deviceId']} 索引日志")
            except (KeyError, TypeError) as e:
                print(f"跳过格式错误的消息: {data}。错误: {e}")

    except KeyboardInterrupt:
        print("日志消费者已停止。")
    finally:
        consumer.close()

if __name__ == "__main__":
    main()
Code collapsed

运行需要安装: pip install elasticsearch kafka-python

工作原理

通过使用两个不同的消费者组(timescale-consumer-groupelastic-consumer-group),Kafka 允许两个服务独立读取整个数据流。这是事件驱动设计的核心原则:服务是去耦的,不需要了解彼此。一个服务宕机不影响另一个。

组合所有内容

要看到完整管道运行:

  1. 运行 docker-compose up -d
  2. 按步骤 2 中所述启动 Flink SQL 作业。
  3. 在单独的终端中运行 Python 消费者:
    • python vitals_consumer.py
    • python logging_consumer.py
  4. 在第四个终端中运行生产者:
    • python producer.py

你将看到实时数据流:

  • 生产者在发送消息时打印。
  • 当发送异常数据时,Flink 作业处理它并创建告警。
  • 生命体征消费者在将数据插入 TimescaleDB 时打印。
  • 日志消费者在将文档索引到 Elasticsearch 时打印。

你可以验证数据库中的数据:

  • TimescaleDB:使用 psql -h localhost -p 5432 -U user -d wearables 连接并运行 SELECT * FROM vitals ORDER BY time DESC LIMIT 10;
  • Elasticsearch:在浏览器中打开 http://localhost:9200/wearable_logs/_search 查看原始日志。

安全和性能考虑

  • 安全:在生产环境中,你应在 Kafka、Flink 和数据库上启用身份验证和 TLS 加密。生产环境绝不能在没有安全措施的情况下运行。
  • 性能:Kafka topic 应该进行分区以允许并行消费。Flink task manager 的数量可以扩展以处理增加的负载。
  • 数据 Schema:强烈建议在生产系统中使用 Schema 注册表(如 Confluent Schema Registry)配合 Avro 或 Protobuf 格式而非纯 JSON。它确保数据一致性并提供更好的性能。

常见问题

Kafka 和传统消息队列有什么区别?

Kafka 与 RabbitMQ 等传统消息队列在几个关键方面不同。首先,Kafka 将消息存储在磁盘上的分布式提交日志中,使其高度持久并能够重放消息。其次,Kafka 设计用于高吞吐量——每个集群每秒百万消息——而传统队列通常处理数千条。最后,Kafka 使用基于拉取的消费模型,消费者控制自己的读取速率,而基于推送的队列立即传递消息。根据 Confluent 基准测试,Kafka 单集群可处理每秒 700 万次写入,非常适合可穿戴数据场景。

Flink 如何实现亚 100ms 异常检测?

Flink 通过多项优化实现亚 100ms 延迟。首先,它使用真正的流引擎(非微批处理)在事件到达时立即处理。其次,Flink 在托管内存和本地磁盘中维护状态,避免昂贵的远程数据库查找。第三,其基于窗口的操作允许高效的时间查询而无需存储整个数据历史。最后,Flink 的检查点机制在保持高吞吐量的同时提供精确一次处理保证。在生产心脏监控系统中,此架构已将告警响应时间从2-5 分钟降低到 100ms 以内

为什么使用 TimescaleDB 而不是普通 PostgreSQL?

TimescaleDB 通过原生时序功能扩展 PostgreSQL,大幅提升可穿戴数据的性能。它根据时间自动将数据分区为"超级表",使查询仅扫描相关时间范围。TimescaleDB 还包括时序聚合、降采样和连续聚合的专用函数。根据 TimescaleDB 基准测试,超级表在时序数据上可以实现比普通 PostgreSQL 表快 10-100 倍的查询,同时通过列式压缩节省 90% 的存储空间

此架构适合小规模应用吗?

适合,但需要考虑一些因素。虽然 Kafka、Flink 和 TimescaleDB 为规模而设计,但它们增加了运维复杂性。对于少于 1,000 台生成数据的设备的应用,存在更简单的替代方案:使用带 TimescaleDB 扩展的 PostgreSQL 存储、Redis Streams 进行基本消息传递,以及 PostgreSQL 触发器或存储过程进行简单异常检测。然而,此处展示的事件驱动模式无论技术选择如何都有价值。在早期建立基础架构模式后,从简单系统迁移到复杂系统更容易。

结论

我们成功设计并构建了一个健壮的、实时的可穿戴设备数据摄取管道。通过利用以 Kafka 为骨干的事件驱动架构,我们创建了一个可扩展、有弹性且灵活的系统。

我们实现了:

  • 用于摄取、处理和存储的去耦服务。
  • 使用 Apache Flink 强大功能的实时异常检测。
  • 在专用数据库中高效存储时序和日志数据。

健康影响:此架构实现了亚 100ms 心脏异常检测,使医疗系统能够对心动过速等关键事件几乎即时响应。在生产部署中,类似管道已将告警响应时间从分钟级降低到 200ms 以下,在紧急场景中可能挽救生命。

这种模式极其强大,可适用于许多其他实时用例,从金融服务到物流及其他领域。

资源


免责声明

本文介绍的算法和技术仅用于技术教育目的。它们尚未经过临床验证,不应用于医学诊断或治疗决策。请始终咨询合格的医疗专业人员获取医疗建议。

#

文章标签

系统设计
架构
Kafka
健康科技

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅