WellAlly Logo
WellAlly康心伴
Development

Designing an Event-Driven Architecture for Real-Time Wearable Data Ingestion

A guide to building a system for handling high-volume data from wearables. Learn how to use Kafka for ingestion, Flink for real-time anomaly detection, and specialized databases like TimescaleDB and Elasticsearch for storage.

W
2025-12-11
9 min read

In the age of connected health, wearable devices are generating a torrent of data—heart rate, activity levels, sleep patterns, and more. For healthtech companies, this data is a goldmine for providing real-time health insights, personalized feedback, and critical alerts. However, building a system that can reliably ingest, process, and analyze this high-volume, high-velocity data from thousands of devices is a significant engineering challenge.

This article will guide you through designing and building a scalable, event-driven architecture for this exact purpose. We'll create a pipeline that can handle a massive influx of wearable data, process it in real-time for anomaly detection, and store it efficiently for different types of analysis.

What we'll build:

We will design a pipeline where wearable data is sent to a central message broker (Apache Kafka). From there, various microservices will consume this data to:

  1. Normalize the raw sensor readings.
  2. Detect anomalies in real-time (e.g., a sudden heart rate spike) using Apache Flink.
  3. Store the time-series vital signs in TimescaleDB.
  4. Archive raw event logs in Elasticsearch for auditing and search.

This architecture is not just a theoretical concept; it's a practical blueprint used by leading tech companies to build responsive and resilient systems.

Prerequisites:

  • Basic understanding of Java/Python and Docker.
  • Familiarity with concepts like microservices and pub/sub messaging.
  • Docker and Docker Compose installed on your machine.

Why this matters to developers:

Traditional request-response architectures crumble under the constant stream of data from IoT devices. An event-driven approach allows you to build decoupled, scalable systems where components communicate asynchronously, making your application more resilient and easier to maintain.

Understanding the Problem

The core challenge with wearable data is threefold: Volume, Velocity, and Variety.

  • Volume: Thousands of devices, each sending data every few seconds, quickly adds up to millions of events per day.
  • Velocity: The data must be processed with very low latency to provide timely feedback or alerts. A batch processing system that runs overnight is a non-starter.
  • Variety: The data comes in different forms. Vital signs (like heart rate) are time-series data, perfect for a time-series database. System logs and error messages are unstructured text, better suited for a search engine like Elasticsearch.

A monolithic application would struggle to handle these competing demands. An event-driven architecture, however, shines in this scenario. By using a central message broker like Apache Kafka, we can create a "central nervous system" for our data. Different services can then tap into this stream to perform specialized tasks independently.

Prerequisites

Before we start building, let's get our environment set up. We'll use Docker Compose to spin up all the necessary services: Kafka, Flink, TimescaleDB, and Elasticsearch.

Create a file named docker-compose.yml:

code
# docker-compose.yml
version: '3.8'
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:7.3.0
    container_name: zookeeper
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:7.3.0
    container_name: kafka
    depends_on:
      - zookeeper
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1

  timescaledb:
    image: timescale/timescaledb:latest-pg14
    container_name: timescaledb
    ports:
      - "5432:5432"
    environment:
      - POSTGRES_USER=user
      - POSTGRES_PASSWORD=password
      - POSTGRES_DB=wearables
    volumes:
      - ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql

  elasticsearch:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
    container_name: elasticsearch
    ports:
      - "9200:9200"
    environment:
      - "discovery.type=single-node"
      - "xpack.security.enabled=false"

  flink-jobmanager:
    image: apache/flink:1.16.0-scala_2.12
    container_name: flink-jobmanager
    ports:
      - "8081:8081"
    command: jobmanager
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager

  flink-taskmanager:
    image: apache/flink:1.16.0-scala_2.12
    container_name: flink-taskmanager
    depends_on:
      - flink-jobmanager
    command: taskmanager
    scale: 1
    environment:
      - |
        FLINK_PROPERTIES=
        jobmanager.rpc.address: flink-jobmanager
        taskmanager.numberOfTaskSlots: 2
Code collapsed

We also need an SQL script to initialize our TimescaleDB. Create init-db.sql:

code
-- init-db.sql
CREATE TABLE vitals (
    time TIMESTAMPTZ NOT NULL,
    device_id VARCHAR(50) NOT NULL,
    heart_rate INTEGER,
    steps INTEGER
);

SELECT create_hypertable('vitals', 'time');
Code collapsed

Now, run docker-compose up -d in your terminal. This will start all the necessary infrastructure in the background.

Step 1: Ingesting Raw Data with a Kafka Producer

What we're doing

First, we need to simulate wearables sending data into our system. We'll write a simple Python script to act as a Kafka producer. This script will generate mock wearable data and publish it to a Kafka topic named raw_wearable_data.

Implementation

Create a Python script producer.py:

code
# producer.py
import json
import time
import random
from kafka import KafkaProducer

# Configuration
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'

# List of simulated device IDs
DEVICES = [f"device_{i}" for i in range(10)]

def create_producer():
    """Creates and returns a Kafka producer."""
    return KafkaProducer(
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        value_serializer=lambda v: json.dumps(v).encode('utf-8')
    )

def generate_data(device_id):
    """Generates a single data point for a wearable device."""
    return {
        'deviceId': device_id,
        'timestamp': int(time.time() * 1000),
        'heartRate': random.randint(55, 120),
        'steps': random.randint(0, 100),
        'lat': round(random.uniform(34.0, 36.0), 5),
        'lon': round(random.uniform(-118.0, -120.0), 5),
    }

def main():
    producer = create_producer()
    print("🚀 Starting wearable data producer...")
    try:
        while True:
            device_id = random.choice(DEVICES)
            data = generate_data(device_id)

            # Occasionally send an anomalous heart rate
            if random.random() < 0.05: # 5% chance
                data['heartRate'] = random.randint(180, 200)
                print(f"❗️ Anomaly sent for {device_id}: {data}")

            producer.send(KAFKA_TOPIC, value=data)
            print(f"Sent data for {device_id}: {data}")
            time.sleep(1)
    except KeyboardInterrupt:
        print("🛑 Producer stopped.")
    finally:
        producer.close()

if __name__ == "__main__":
    main()
Code collapsed

How it works

The script uses the kafka-python library to connect to our Kafka container. It continuously generates JSON payloads representing sensor readings and sends them to the raw_wearable_data topic. We've also included a small chance to generate an abnormally high heart rate to test our anomaly detection later.

To run this, you'll need to install the library: pip install kafka-python

Then, execute the script: python producer.py

Step 2: Real-Time Anomaly Detection with Apache Flink

What we're doing

This is the core of our real-time processing. We will create a simple Apache Flink job that reads from the raw_wearable_data topic, checks for heart rates above a certain threshold, and publishes any anomalies to a new Kafka topic called heart_rate_alerts.

Implementation

We'll use Flink's SQL Client for simplicity, which allows us to define our stream processing logic using familiar SQL.

  1. Access the Flink SQL Client: Open a terminal and connect to the Flink job manager container: docker exec -it flink-jobmanager /bin/bash

  2. Start the SQL Client: Inside the container, run: ./bin/sql-client.sh

  3. Define Source and Sink Tables: Now, in the Flink SQL prompt, we define our tables. This doesn't create database tables; it tells Flink how to connect to Kafka and interpret the data.

    code
    -- Flink SQL
    -- Define the source table pointing to our raw data topic
    CREATE TABLE raw_data (
      `deviceId` STRING,
      `timestamp` BIGINT,
      `heartRate` INT,
      `steps` INT,
      `lat` DOUBLE,
      `lon` DOUBLE
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'raw_wearable_data',
      'properties.bootstrap.servers' = 'kafka:29092',
      'properties.group.id' = 'flink-consumer-group',
      'format' = 'json',
      'scan.startup.mode' = 'latest-offset'
    );
    
    -- Define the sink table for publishing alerts
    CREATE TABLE heart_rate_alerts (
      `deviceId` STRING,
      `alert_time` TIMESTAMP(3),
      `heartRate` INT
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'heart_rate_alerts',
      'properties.bootstrap.servers' = 'kafka:29092',
      'format' = 'json'
    );
    
    Code collapsed
  4. Create and Run the Anomaly Detection Job: Finally, we write a simple INSERT INTO ... SELECT statement. This is a continuous query that runs forever, processing data as it arrives.

    code
    -- This is the actual streaming job!
    INSERT INTO heart_rate_alerts
    SELECT
      deviceId,
      TO_TIMESTAMP_LTZ(`timestamp`, 3) AS alert_time,
      heartRate
    FROM raw_data
    WHERE heartRate > 170;
    
    Code collapsed

How it works

Flink continuously reads JSON messages from the raw_wearable_data topic. For each record, it checks if the heartRate field is greater than 170. If it is, a new JSON record is created and published to the heart_rate_alerts topic. This entire process happens with millisecond latency. Flink's stateful processing capabilities allow for much more complex logic, like detecting anomalies based on a moving average over time.

Step 3: Sinking Data into Specialized Databases

What we're doing

Our final step is to get the processed data out of Kafka and into databases optimized for our two primary use cases: time-series analysis and log searching. We'll write two separate consumer services.

  1. Vitals Consumer: Persists vital signs (heart rate, steps) into TimescaleDB.
  2. Logging Consumer: Stores the raw, unprocessed event data in Elasticsearch for auditing.

Implementation: Vitals Consumer for TimescaleDB

This Python script consumes from the raw_wearable_data topic and writes to our vitals hypertable in TimescaleDB.

code
# vitals_consumer.py
import json
import psycopg2
from kafka import KafkaConsumer
from datetime import datetime, timezone

# Kafka Configuration
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'timescale-consumer-group'

# TimescaleDB Configuration
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'wearables'
DB_USER = 'user'
DB_PASS = 'password'

def create_consumer():
    """Creates and returns a Kafka consumer."""
    return KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id=KAFKA_CONSUMER_GROUP,
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

def main():
    consumer = create_consumer()
    db_conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASS)
    cursor = db_conn.cursor()

    print("🚀 Starting vitals consumer...")
    try:
        for message in consumer:
            data = message.value
            try:
                dt_object = datetime.fromtimestamp(data['timestamp'] / 1000, tz=timezone.utc)

                insert_query = """
                INSERT INTO vitals (time, device_id, heart_rate, steps)
                VALUES (%s, %s, %s, %s);
                """
                cursor.execute(insert_query, (dt_object, data['deviceId'], data['heartRate'], data['steps']))
                db_conn.commit()

                print(f"✅ Inserted vitals for {data['deviceId']} into TimescaleDB")
            except (KeyError, TypeError) as e:
                print(f"⚠️ Skipping malformed message: {data}. Error: {e}")

    except KeyboardInterrupt:
        print("🛑 Vitals consumer stopped.")
    finally:
        cursor.close()
        db_conn.close()
        consumer.close()

if __name__ == "__main__":
    main()
Code collapsed

To run this, you'll need: pip install psycopg2-binary kafka-python

Implementation: Logging Consumer for Elasticsearch

This script consumes the same topic but pushes the full, raw JSON document into Elasticsearch.

code
# logging_consumer.py
import json
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch

# Kafka Configuration
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'elastic-consumer-group'

# Elasticsearch Configuration
ES_HOST = 'localhost'
ES_PORT = 9200
ES_INDEX = 'wearable_logs'

def create_consumer():
    """Creates and returns a Kafka consumer."""
    return KafkaConsumer(
        KAFKA_TOPIC,
        bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
        group_id=KAFKA_CONSUMER_GROUP,
        auto_offset_reset='earliest',
        value_deserializer=lambda x: json.loads(x.decode('utf-8'))
    )

def main():
    consumer = create_consumer()
    es = Elasticsearch([{'host': ES_HOST, 'port': ES_PORT, 'scheme': 'http'}])

    print("🚀 Starting logging consumer...")
    try:
        for message in consumer:
            data = message.value
            try:
                # Add a processing timestamp for context
                data['processingTimestamp'] = datetime.utcnow().isoformat()
                es.index(index=ES_INDEX, document=data)
                print(f"📄 Indexed log for {data['deviceId']} in Elasticsearch")
            except (KeyError, TypeError) as e:
                print(f"⚠️ Skipping malformed message for logging: {data}. Error: {e}")

    except KeyboardInterrupt:
        print("🛑 Logging consumer stopped.")
    finally:
        consumer.close()

if __name__ == "__main__":
    main()
Code collapsed

To run this, you'll need: pip install elasticsearch kafka-python

How it Works

By having two different consumer groups (timescale-consumer-group and elastic-consumer-group), Kafka allows both services to read the entire stream of data independently. This is a core principle of event-driven design: services are decoupled and don't need to know about each other. One service can go down without affecting the other.

Putting It All Together

To see the full pipeline in action:

  1. Run docker-compose up -d.
  2. Start the Flink SQL job as described in Step 2.
  3. Run the Python consumers in separate terminals:
    • python vitals_consumer.py
    • python logging_consumer.py
  4. Run the producer in a fourth terminal:
    • python producer.py

You will now see a real-time flow of data:

  • The producer will print messages as it sends them.
  • When an anomaly is sent, the Flink job will process it and create an alert.
  • The vitals consumer will print messages as it inserts data into TimescaleDB.
  • The logging consumer will print messages as it indexes documents in Elasticsearch.

You can verify the data in the databases:

  • TimescaleDB: Connect using psql -h localhost -p 5432 -U user -d wearables and run SELECT * FROM vitals ORDER BY time DESC LIMIT 10;.
  • Elasticsearch: Open your browser to http://localhost:9200/wearable_logs/_search to see the raw logs.

Security and Performance Considerations

  • Security: In a production environment, you would enable authentication and TLS encryption on Kafka, Flink, and the databases. Never run without security in production.
  • Performance: Kafka topics should be partitioned to allow for parallel consumption. The number of Flink task managers can be scaled out to handle increased load.
  • Data Schema: Using a schema registry like Confluent Schema Registry with Avro or Protobuf format is highly recommended over plain JSON for production systems. It ensures data consistency and provides better performance.

Conclusion

We've successfully designed and built a robust, real-time data ingestion pipeline for wearable device data. By leveraging an event-driven architecture with Kafka as the backbone, we've created a system that is scalable, resilient, and flexible.

We achieved:

  • Decoupled services for ingestion, processing, and storage.
  • Real-time anomaly detection using the power of Apache Flink.
  • Efficient storage of time-series and log data in specialized databases.

This pattern is incredibly powerful and can be adapted for many other real-time use cases, from financial services to logistics and beyond.

Resources

#

Article Tags

systemdesignarchitecturekafkahealthtech
W

WellAlly's core development team, comprised of healthcare professionals, software engineers, and UX designers committed to revolutionizing digital health management.

Expertise

Healthcare TechnologySoftware DevelopmentUser ExperienceAI & Machine Learning

Found this article helpful?

Try KangXinBan and start your health management journey

© 2024 康心伴 WellAlly · Professional Health Management