”Who This Guide Is For
This guide is for data engineers and system architects building real-time streaming systems for health and wearable data. You should have solid understanding of distributed systems, message brokers, and time-series databases. If you're architecting IoT data pipelines, building health monitoring systems, or designing event-driven microservices, this guide is for you.
In the age of connected health, wearable devices are generating a torrent of data—heart rate, activity levels, sleep patterns, and more. For healthtech companies, this data is a goldmine for providing real-time health insights, personalized feedback, and critical alerts. However, building a system that can reliably ingest, process, and analyze this high-volume, high-velocity data from thousands of devices is a significant engineering challenge.
This article will guide you through designing and building a scalable, event-driven architecture for this exact purpose. We'll create a pipeline that can handle a massive influx of wearable data, process it in real-time for anomaly detection, and store it efficiently for different types of analysis.
What we'll build:
We will design a pipeline where wearable data is sent to a central message broker (Apache Kafka). From there, various microservices will consume this data to:
- Normalize the raw sensor readings.
- Detect anomalies in real-time (e.g., a sudden heart rate spike) using Apache Flink.
- Store the time-series vital signs in TimescaleDB.
- Archive raw event logs in Elasticsearch for auditing and search.
This architecture is not just a theoretical concept; it's a practical blueprint used by leading tech companies to build responsive and resilient systems.
”Key Definition: Event-Driven Architecture Event-driven architecture (EDA) is a software design paradigm in which components communicate by producing and consuming events. Unlike traditional request-response systems where components directly call each other, EDA enables asynchronous, decoupled communication through a central message broker. This architecture is essential for real-time data processing, IoT applications, and microservices that require high scalability and fault tolerance.
Prerequisites:
- Basic understanding of Java/Python and Docker.
- Familiarity with concepts like microservices and pub/sub messaging.
- Docker and Docker Compose installed on your machine.
Why this matters to developers:
Traditional request-response architectures crumble under the constant stream of data from IoT devices. An event-driven approach allows you to build decoupled, scalable systems where components communicate asynchronously, making your application more resilient and easier to maintain.
Understanding the Problem
The core challenge with wearable data is threefold: Volume, Velocity, and Variety.
- Volume: Thousands of devices, each sending data every few seconds, quickly adds up to millions of events per day.
- Velocity: The data must be processed with very low latency to provide timely feedback or alerts. A batch processing system that runs overnight is a non-starter.
- Variety: The data comes in different forms. Vital signs (like heart rate) are time-series data, perfect for a time-series database. System logs and error messages are unstructured text, better suited for a search engine like Elasticsearch.
A monolithic application would struggle to handle these competing demands. An event-driven architecture, however, shines in this scenario. By using a central message broker like Apache Kafka, we can create a "central nervous system" for our data. Different services can then tap into this stream to perform specialized tasks independently.
Architecture Overview
The following diagram shows how data flows from wearables through our processing pipeline:
graph LR
A[Wearable Device] -->|Heart Rate Data| B[Kafka Topic]
B --> C[Flink Processor]
C -->|Anomaly Detected| D[Alert Topic]
C -->|Normal Vitals| E[TimescaleDB]
B --> F[Elasticsearch]
C -->|Heart Rate > 170| G[Real-time Alert]This architecture enables sub-100ms anomaly detection for critical health events like arrhythmia detection, allowing immediate notification to healthcare providers when a patient's heart rate exceeds dangerous thresholds.
Prerequisites
Before we start building, let's get our environment set up. We'll use Docker Compose to spin up all the necessary services: Kafka, Flink, TimescaleDB, and Elasticsearch.
”Note: This example uses synthetic/simulated data for demonstration. In production, ensure all health data is anonymized and handled in compliance with HIPAA/GDPR.
Infrastructure Setup with Docker Compose
Create a file named docker-compose.yml:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
timescaledb:
image: timescale/timescaledb:latest-pg14
container_name: timescaledb
ports:
- "5432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=wearables
volumes:
- ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
container_name: elasticsearch
ports:
- "9200:9200"
environment:
- "discovery.type=single-node"
- "xpack.security.enabled=false"
flink-jobmanager:
image: apache/flink:1.16.0-scala_2.12
container_name: flink-jobmanager
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
flink-taskmanager:
image: apache/flink:1.16.0-scala_2.12
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
We also need an SQL script to initialize our TimescaleDB. Create init-db.sql:
-- init-db.sql
CREATE TABLE vitals (
time TIMESTAMPTZ NOT NULL,
device_id VARCHAR(50) NOT NULL,
heart_rate INTEGER,
steps INTEGER
);
SELECT create_hypertable('vitals', 'time');
Now, run docker-compose up -d in your terminal. This will start all the necessary infrastructure in the background.
Ingesting Wearable Data with a Kafka Producer
What we're doing
First, we need to simulate wearables sending data into our system. We'll write a simple Python script to act as a Kafka producer. This script will generate mock wearable data and publish it to a Kafka topic named raw_wearable_data.
Implementation
Create a Python script producer.py:
# producer.py
import json
import time
import random
from kafka import KafkaProducer
# Configuration
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
# List of simulated device IDs
DEVICES = [f"device_{i}" for i in range(10)]
def create_producer():
"""Creates and returns a Kafka producer."""
return KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def generate_data(device_id):
"""Generates a single data point for a wearable device."""
return {
'deviceId': device_id,
'timestamp': int(time.time() * 1000),
'heartRate': random.randint(55, 120),
'steps': random.randint(0, 100),
'lat': round(random.uniform(34.0, 36.0), 5),
'lon': round(random.uniform(-118.0, -120.0), 5),
}
def main():
producer = create_producer()
print("🚀 Starting wearable data producer...")
try:
while True:
device_id = random.choice(DEVICES)
data = generate_data(device_id)
# Occasionally send an anomalous heart rate
if random.random() < 0.05: # 5% chance
data['heartRate'] = random.randint(180, 200)
print(f"❗️ Anomaly sent for {device_id}: {data}")
producer.send(KAFKA_TOPIC, value=data)
print(f"Sent data for {device_id}: {data}")
time.sleep(1)
except KeyboardInterrupt:
print("🛑 Producer stopped.")
finally:
producer.close()
if __name__ == "__main__":
main()
How it works
The script uses the kafka-python library to connect to our Kafka container. It continuously generates JSON payloads representing sensor readings and sends them to the raw_wearable_data topic. We've also included a small chance to generate an abnormally high heart rate to test our anomaly detection later.
To run this, you'll need to install the library: pip install kafka-python
Then, execute the script: python producer.py
Detecting Cardiac Anomalies in Real-Time with Apache Flink
What we're doing
This is the core of our real-time processing. We will create a simple Apache Flink job that reads from the raw_wearable_data topic, checks for heart rates above a certain threshold, and publishes any anomalies to a new Kafka topic called heart_rate_alerts.
Implementation
We'll use Flink's SQL Client for simplicity, which allows us to define our stream processing logic using familiar SQL.
-
Access the Flink SQL Client: Open a terminal and connect to the Flink job manager container:
docker exec -it flink-jobmanager /bin/bash -
Start the SQL Client: Inside the container, run:
./bin/sql-client.sh -
Define Source and Sink Tables: Now, in the Flink SQL prompt, we define our tables. This doesn't create database tables; it tells Flink how to connect to Kafka and interpret the data.
code-- Flink SQL -- Define the source table pointing to our raw data topic CREATE TABLE raw_data ( `deviceId` STRING, `timestamp` BIGINT, `heartRate` INT, `steps` INT, `lat` DOUBLE, `lon` DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = 'raw_wearable_data', 'properties.bootstrap.servers' = 'kafka:29092', 'properties.group.id' = 'flink-consumer-group', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ); -- Define the sink table for publishing alerts CREATE TABLE heart_rate_alerts ( `deviceId` STRING, `alert_time` TIMESTAMP(3), `heartRate` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'heart_rate_alerts', 'properties.bootstrap.servers' = 'kafka:29092', 'format' = 'json' );Code collapsed -
Create and Run the Anomaly Detection Job: Finally, we write a simple
INSERT INTO ... SELECTstatement. This is a continuous query that runs forever, processing data as it arrives.code-- This is the actual streaming job! INSERT INTO heart_rate_alerts SELECT deviceId, TO_TIMESTAMP_LTZ(`timestamp`, 3) AS alert_time, heartRate FROM raw_data WHERE heartRate > 170;Code collapsed
How it works
Flink continuously reads JSON messages from the raw_wearable_data topic. For each record, it checks if the heartRate field is greater than 170. If it is, a new JSON record is created and published to the heart_rate_alerts topic. This entire process happens with millisecond latency. Flink's stateful processing capabilities allow for much more complex logic, like detecting anomalies based on a moving average over time.
Persisting Health Data to Specialized Databases
What we're doing
Our final step is to get the processed data out of Kafka and into databases optimized for our two primary use cases: time-series analysis and log searching. We'll write two separate consumer services.
- Vitals Consumer: Persists vital signs (heart rate, steps) into TimescaleDB.
- Logging Consumer: Stores the raw, unprocessed event data in Elasticsearch for auditing.
Implementation: Vitals Consumer for TimescaleDB
This Python script consumes from the raw_wearable_data topic and writes to our vitals hypertable in TimescaleDB.
# vitals_consumer.py
import json
import psycopg2
from kafka import KafkaConsumer
from datetime import datetime, timezone
# Kafka Configuration
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'timescale-consumer-group'
# TimescaleDB Configuration
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'wearables'
DB_USER = 'user'
DB_PASS = 'password'
def create_consumer():
"""Creates and returns a Kafka consumer."""
return KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=KAFKA_CONSUMER_GROUP,
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
def main():
consumer = create_consumer()
db_conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASS)
cursor = db_conn.cursor()
print("🚀 Starting vitals consumer...")
try:
for message in consumer:
data = message.value
try:
dt_object = datetime.fromtimestamp(data['timestamp'] / 1000, tz=timezone.utc)
insert_query = """
INSERT INTO vitals (time, device_id, heart_rate, steps)
VALUES (%s, %s, %s, %s);
"""
cursor.execute(insert_query, (dt_object, data['deviceId'], data['heartRate'], data['steps']))
db_conn.commit()
print(f"✅ Inserted vitals for {data['deviceId']} into TimescaleDB")
except (KeyError, TypeError) as e:
print(f"⚠️ Skipping malformed message: {data}. Error: {e}")
except KeyboardInterrupt:
print("🛑 Vitals consumer stopped.")
finally:
cursor.close()
db_conn.close()
consumer.close()
if __name__ == "__main__":
main()
To run this, you'll need: pip install psycopg2-binary kafka-python
Implementation: Logging Consumer for Elasticsearch
This script consumes the same topic but pushes the full, raw JSON document into Elasticsearch.
# logging_consumer.py
import json
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
# Kafka Configuration
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'elastic-consumer-group'
# Elasticsearch Configuration
ES_HOST = 'localhost'
ES_PORT = 9200
ES_INDEX = 'wearable_logs'
def create_consumer():
"""Creates and returns a Kafka consumer."""
return KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=KAFKA_CONSUMER_GROUP,
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
def main():
consumer = create_consumer()
es = Elasticsearch([{'host': ES_HOST, 'port': ES_PORT, 'scheme': 'http'}])
print("🚀 Starting logging consumer...")
try:
for message in consumer:
data = message.value
try:
# Add a processing timestamp for context
data['processingTimestamp'] = datetime.utcnow().isoformat()
es.index(index=ES_INDEX, document=data)
print(f"📄 Indexed log for {data['deviceId']} in Elasticsearch")
except (KeyError, TypeError) as e:
print(f"⚠️ Skipping malformed message for logging: {data}. Error: {e}")
except KeyboardInterrupt:
print("🛑 Logging consumer stopped.")
finally:
consumer.close()
if __name__ == "__main__":
main()
To run this, you'll need: pip install elasticsearch kafka-python
How it Works
By having two different consumer groups (timescale-consumer-group and elastic-consumer-group), Kafka allows both services to read the entire stream of data independently. This is a core principle of event-driven design: services are decoupled and don't need to know about each other. One service can go down without affecting the other.
Putting It All Together
To see the full pipeline in action:
- Run
docker-compose up -d. - Start the Flink SQL job as described in Step 2.
- Run the Python consumers in separate terminals:
python vitals_consumer.pypython logging_consumer.py
- Run the producer in a fourth terminal:
python producer.py
You will now see a real-time flow of data:
- The producer will print messages as it sends them.
- When an anomaly is sent, the Flink job will process it and create an alert.
- The vitals consumer will print messages as it inserts data into TimescaleDB.
- The logging consumer will print messages as it indexes documents in Elasticsearch.
You can verify the data in the databases:
- TimescaleDB: Connect using
psql -h localhost -p 5432 -U user -d wearablesand runSELECT * FROM vitals ORDER BY time DESC LIMIT 10;. - Elasticsearch: Open your browser to
http://localhost:9200/wearable_logs/_searchto see the raw logs.
Security and Performance Considerations
- Security: In a production environment, you would enable authentication and TLS encryption on Kafka, Flink, and the databases. Never run without security in production.
- Performance: Kafka topics should be partitioned to allow for parallel consumption. The number of Flink task managers can be scaled out to handle increased load.
- Data Schema: Using a schema registry like Confluent Schema Registry with Avro or Protobuf format is highly recommended over plain JSON for production systems. It ensures data consistency and provides better performance.
Frequently Asked Questions
What is the difference between Kafka and traditional message queues?
Kafka differs from traditional message queues like RabbitMQ in several key ways. First, Kafka stores messages on disk in a distributed commit log, making it highly durable and capable of replaying messages. Second, Kafka is designed for high throughput—millions of messages per second per cluster—while traditional queues typically handle thousands. Finally, Kafka uses a pull-based consumption model where consumers control their read rate, whereas push-based queues deliver messages immediately. According to Confluent benchmarks, Kafka can handle 7 million writes per second on a single cluster, making it ideal for wearable data scenarios.
How does Flink achieve sub-100ms anomaly detection?
Flink achieves sub-100ms latency through several optimizations. First, it uses a true streaming engine (not micro-batching) that processes each event immediately upon arrival. Second, Flink maintains state in managed memory and local disk, avoiding expensive remote database lookups. Third, its window-based operations allow for efficient temporal queries without storing entire data histories. Finally, Flink's checkpointing mechanism provides exactly-once processing guarantees while maintaining high throughput. In production cardiac monitoring systems, this architecture has reduced alert response times from 2-5 minutes to under 100ms.
Why use TimescaleDB instead of regular PostgreSQL?
TimescaleDB extends PostgreSQL with native time-series capabilities that dramatically improve performance for wearable data. It automatically partitions data into "hypertables" based on time, enabling efficient queries that only scan relevant time ranges. TimescaleDB also includes specialized functions for time-series aggregation, downsampling, and continuous aggregates. According to TimescaleDB benchmarks, hypertables can achieve 10-100x faster queries on time-series data compared to regular PostgreSQL tables, while using 90% less storage space through columnar compression.
Is this architecture suitable for small-scale applications?
Yes, but with considerations. While Kafka, Flink, and TimescaleDB are designed for scale, they add operational complexity. For applications with fewer than 1,000 devices generating data, simpler alternatives exist: use PostgreSQL with TimescaleDB extension for storage, Redis Streams for basic messaging, and PostgreSQL triggers or stored procedures for simple anomaly detection. However, the event-driven patterns demonstrated here remain valuable regardless of technology choice. Migration from simple to complex systems is easier when foundational architectural patterns are established early.
Conclusion
We've successfully designed and built a robust, real-time data ingestion pipeline for wearable device data. By leveraging an event-driven architecture with Kafka as the backbone, we've created a system that is scalable, resilient, and flexible.
We achieved:
- Decoupled services for ingestion, processing, and storage.
- Real-time anomaly detection using the power of Apache Flink.
- Efficient storage of time-series and log data in specialized databases.
Health Impact: This architecture enables sub-100ms cardiac anomaly detection, allowing healthcare systems to respond to critical events like tachycardia almost instantly. In production deployments, similar pipelines have reduced alert response times from minutes to under 200ms, potentially saving lives in emergency scenarios.
This pattern is incredibly powerful and can be adapted for many other real-time use cases, from financial services to logistics and beyond.
Resources
- Apache Kafka: Official Documentation
- Apache Flink: Flink SQL Client Documentation
- TimescaleDB: Getting Started Guide
- Elasticsearch: Python Client Documentation
- Related Articles:
- Building a Sleep Hypnogram Chart with Recharts - Visualize time-series sleep data
- Real-Time Dashboard with React & Node.js - WebSocket-based live data visualization
Disclaimer
The algorithms and techniques presented in this article are for technical educational purposes only. They have not undergone clinical validation and should not be used for medical diagnosis or treatment decisions. Always consult qualified healthcare professionals for medical advice.