In the age of connected health, wearable devices are generating a torrent of data—heart rate, activity levels, sleep patterns, and more. For healthtech companies, this data is a goldmine for providing real-time health insights, personalized feedback, and critical alerts. However, building a system that can reliably ingest, process, and analyze this high-volume, high-velocity data from thousands of devices is a significant engineering challenge.
This article will guide you through designing and building a scalable, event-driven architecture for this exact purpose. We'll create a pipeline that can handle a massive influx of wearable data, process it in real-time for anomaly detection, and store it efficiently for different types of analysis.
What we'll build:
We will design a pipeline where wearable data is sent to a central message broker (Apache Kafka). From there, various microservices will consume this data to:
- Normalize the raw sensor readings.
- Detect anomalies in real-time (e.g., a sudden heart rate spike) using Apache Flink.
- Store the time-series vital signs in TimescaleDB.
- Archive raw event logs in Elasticsearch for auditing and search.
This architecture is not just a theoretical concept; it's a practical blueprint used by leading tech companies to build responsive and resilient systems.
Prerequisites:
- Basic understanding of Java/Python and Docker.
- Familiarity with concepts like microservices and pub/sub messaging.
- Docker and Docker Compose installed on your machine.
Why this matters to developers:
Traditional request-response architectures crumble under the constant stream of data from IoT devices. An event-driven approach allows you to build decoupled, scalable systems where components communicate asynchronously, making your application more resilient and easier to maintain.
Understanding the Problem
The core challenge with wearable data is threefold: Volume, Velocity, and Variety.
- Volume: Thousands of devices, each sending data every few seconds, quickly adds up to millions of events per day.
- Velocity: The data must be processed with very low latency to provide timely feedback or alerts. A batch processing system that runs overnight is a non-starter.
- Variety: The data comes in different forms. Vital signs (like heart rate) are time-series data, perfect for a time-series database. System logs and error messages are unstructured text, better suited for a search engine like Elasticsearch.
A monolithic application would struggle to handle these competing demands. An event-driven architecture, however, shines in this scenario. By using a central message broker like Apache Kafka, we can create a "central nervous system" for our data. Different services can then tap into this stream to perform specialized tasks independently.
Prerequisites
Before we start building, let's get our environment set up. We'll use Docker Compose to spin up all the necessary services: Kafka, Flink, TimescaleDB, and Elasticsearch.
Create a file named docker-compose.yml:
# docker-compose.yml
version: '3.8'
services:
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
container_name: zookeeper
environment:
ZOOKEEPER_CLIENT_PORT: 2181
ZOOKEEPER_TICK_TIME: 2000
kafka:
image: confluentinc/cp-kafka:7.3.0
container_name: kafka
depends_on:
- zookeeper
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: 'zookeeper:2181'
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_INTERNAL:PLAINTEXT
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092,PLAINTEXT_INTERNAL://kafka:29092
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
timescaledb:
image: timescale/timescaledb:latest-pg14
container_name: timescaledb
ports:
- "5432:5432"
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=wearables
volumes:
- ./init-db.sql:/docker-entrypoint-initdb.d/init-db.sql
elasticsearch:
image: docker.elastic.co/elasticsearch/elasticsearch:8.5.0
container_name: elasticsearch
ports:
- "9200:9200"
environment:
- "discovery.type=single-node"
- "xpack.security.enabled=false"
flink-jobmanager:
image: apache/flink:1.16.0-scala_2.12
container_name: flink-jobmanager
ports:
- "8081:8081"
command: jobmanager
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
flink-taskmanager:
image: apache/flink:1.16.0-scala_2.12
container_name: flink-taskmanager
depends_on:
- flink-jobmanager
command: taskmanager
scale: 1
environment:
- |
FLINK_PROPERTIES=
jobmanager.rpc.address: flink-jobmanager
taskmanager.numberOfTaskSlots: 2
We also need an SQL script to initialize our TimescaleDB. Create init-db.sql:
-- init-db.sql
CREATE TABLE vitals (
time TIMESTAMPTZ NOT NULL,
device_id VARCHAR(50) NOT NULL,
heart_rate INTEGER,
steps INTEGER
);
SELECT create_hypertable('vitals', 'time');
Now, run docker-compose up -d in your terminal. This will start all the necessary infrastructure in the background.
Step 1: Ingesting Raw Data with a Kafka Producer
What we're doing
First, we need to simulate wearables sending data into our system. We'll write a simple Python script to act as a Kafka producer. This script will generate mock wearable data and publish it to a Kafka topic named raw_wearable_data.
Implementation
Create a Python script producer.py:
# producer.py
import json
import time
import random
from kafka import KafkaProducer
# Configuration
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
# List of simulated device IDs
DEVICES = [f"device_{i}" for i in range(10)]
def create_producer():
"""Creates and returns a Kafka producer."""
return KafkaProducer(
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
value_serializer=lambda v: json.dumps(v).encode('utf-8')
)
def generate_data(device_id):
"""Generates a single data point for a wearable device."""
return {
'deviceId': device_id,
'timestamp': int(time.time() * 1000),
'heartRate': random.randint(55, 120),
'steps': random.randint(0, 100),
'lat': round(random.uniform(34.0, 36.0), 5),
'lon': round(random.uniform(-118.0, -120.0), 5),
}
def main():
producer = create_producer()
print("🚀 Starting wearable data producer...")
try:
while True:
device_id = random.choice(DEVICES)
data = generate_data(device_id)
# Occasionally send an anomalous heart rate
if random.random() < 0.05: # 5% chance
data['heartRate'] = random.randint(180, 200)
print(f"❗️ Anomaly sent for {device_id}: {data}")
producer.send(KAFKA_TOPIC, value=data)
print(f"Sent data for {device_id}: {data}")
time.sleep(1)
except KeyboardInterrupt:
print("🛑 Producer stopped.")
finally:
producer.close()
if __name__ == "__main__":
main()
How it works
The script uses the kafka-python library to connect to our Kafka container. It continuously generates JSON payloads representing sensor readings and sends them to the raw_wearable_data topic. We've also included a small chance to generate an abnormally high heart rate to test our anomaly detection later.
To run this, you'll need to install the library: pip install kafka-python
Then, execute the script: python producer.py
Step 2: Real-Time Anomaly Detection with Apache Flink
What we're doing
This is the core of our real-time processing. We will create a simple Apache Flink job that reads from the raw_wearable_data topic, checks for heart rates above a certain threshold, and publishes any anomalies to a new Kafka topic called heart_rate_alerts.
Implementation
We'll use Flink's SQL Client for simplicity, which allows us to define our stream processing logic using familiar SQL.
-
Access the Flink SQL Client: Open a terminal and connect to the Flink job manager container:
docker exec -it flink-jobmanager /bin/bash -
Start the SQL Client: Inside the container, run:
./bin/sql-client.sh -
Define Source and Sink Tables: Now, in the Flink SQL prompt, we define our tables. This doesn't create database tables; it tells Flink how to connect to Kafka and interpret the data.
code-- Flink SQL -- Define the source table pointing to our raw data topic CREATE TABLE raw_data ( `deviceId` STRING, `timestamp` BIGINT, `heartRate` INT, `steps` INT, `lat` DOUBLE, `lon` DOUBLE ) WITH ( 'connector' = 'kafka', 'topic' = 'raw_wearable_data', 'properties.bootstrap.servers' = 'kafka:29092', 'properties.group.id' = 'flink-consumer-group', 'format' = 'json', 'scan.startup.mode' = 'latest-offset' ); -- Define the sink table for publishing alerts CREATE TABLE heart_rate_alerts ( `deviceId` STRING, `alert_time` TIMESTAMP(3), `heartRate` INT ) WITH ( 'connector' = 'kafka', 'topic' = 'heart_rate_alerts', 'properties.bootstrap.servers' = 'kafka:29092', 'format' = 'json' );Code collapsed -
Create and Run the Anomaly Detection Job: Finally, we write a simple
INSERT INTO ... SELECTstatement. This is a continuous query that runs forever, processing data as it arrives.code-- This is the actual streaming job! INSERT INTO heart_rate_alerts SELECT deviceId, TO_TIMESTAMP_LTZ(`timestamp`, 3) AS alert_time, heartRate FROM raw_data WHERE heartRate > 170;Code collapsed
How it works
Flink continuously reads JSON messages from the raw_wearable_data topic. For each record, it checks if the heartRate field is greater than 170. If it is, a new JSON record is created and published to the heart_rate_alerts topic. This entire process happens with millisecond latency. Flink's stateful processing capabilities allow for much more complex logic, like detecting anomalies based on a moving average over time.
Step 3: Sinking Data into Specialized Databases
What we're doing
Our final step is to get the processed data out of Kafka and into databases optimized for our two primary use cases: time-series analysis and log searching. We'll write two separate consumer services.
- Vitals Consumer: Persists vital signs (heart rate, steps) into TimescaleDB.
- Logging Consumer: Stores the raw, unprocessed event data in Elasticsearch for auditing.
Implementation: Vitals Consumer for TimescaleDB
This Python script consumes from the raw_wearable_data topic and writes to our vitals hypertable in TimescaleDB.
# vitals_consumer.py
import json
import psycopg2
from kafka import KafkaConsumer
from datetime import datetime, timezone
# Kafka Configuration
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'timescale-consumer-group'
# TimescaleDB Configuration
DB_HOST = 'localhost'
DB_PORT = '5432'
DB_NAME = 'wearables'
DB_USER = 'user'
DB_PASS = 'password'
def create_consumer():
"""Creates and returns a Kafka consumer."""
return KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=KAFKA_CONSUMER_GROUP,
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
def main():
consumer = create_consumer()
db_conn = psycopg2.connect(host=DB_HOST, port=DB_PORT, dbname=DB_NAME, user=DB_USER, password=DB_PASS)
cursor = db_conn.cursor()
print("🚀 Starting vitals consumer...")
try:
for message in consumer:
data = message.value
try:
dt_object = datetime.fromtimestamp(data['timestamp'] / 1000, tz=timezone.utc)
insert_query = """
INSERT INTO vitals (time, device_id, heart_rate, steps)
VALUES (%s, %s, %s, %s);
"""
cursor.execute(insert_query, (dt_object, data['deviceId'], data['heartRate'], data['steps']))
db_conn.commit()
print(f"✅ Inserted vitals for {data['deviceId']} into TimescaleDB")
except (KeyError, TypeError) as e:
print(f"⚠️ Skipping malformed message: {data}. Error: {e}")
except KeyboardInterrupt:
print("🛑 Vitals consumer stopped.")
finally:
cursor.close()
db_conn.close()
consumer.close()
if __name__ == "__main__":
main()
To run this, you'll need: pip install psycopg2-binary kafka-python
Implementation: Logging Consumer for Elasticsearch
This script consumes the same topic but pushes the full, raw JSON document into Elasticsearch.
# logging_consumer.py
import json
from kafka import KafkaConsumer
from elasticsearch import Elasticsearch
# Kafka Configuration
KAFKA_TOPIC = 'raw_wearable_data'
KAFKA_BOOTSTRAP_SERVERS = 'localhost:9092'
KAFKA_CONSUMER_GROUP = 'elastic-consumer-group'
# Elasticsearch Configuration
ES_HOST = 'localhost'
ES_PORT = 9200
ES_INDEX = 'wearable_logs'
def create_consumer():
"""Creates and returns a Kafka consumer."""
return KafkaConsumer(
KAFKA_TOPIC,
bootstrap_servers=KAFKA_BOOTSTRAP_SERVERS,
group_id=KAFKA_CONSUMER_GROUP,
auto_offset_reset='earliest',
value_deserializer=lambda x: json.loads(x.decode('utf-8'))
)
def main():
consumer = create_consumer()
es = Elasticsearch([{'host': ES_HOST, 'port': ES_PORT, 'scheme': 'http'}])
print("🚀 Starting logging consumer...")
try:
for message in consumer:
data = message.value
try:
# Add a processing timestamp for context
data['processingTimestamp'] = datetime.utcnow().isoformat()
es.index(index=ES_INDEX, document=data)
print(f"📄 Indexed log for {data['deviceId']} in Elasticsearch")
except (KeyError, TypeError) as e:
print(f"⚠️ Skipping malformed message for logging: {data}. Error: {e}")
except KeyboardInterrupt:
print("🛑 Logging consumer stopped.")
finally:
consumer.close()
if __name__ == "__main__":
main()
To run this, you'll need: pip install elasticsearch kafka-python
How it Works
By having two different consumer groups (timescale-consumer-group and elastic-consumer-group), Kafka allows both services to read the entire stream of data independently. This is a core principle of event-driven design: services are decoupled and don't need to know about each other. One service can go down without affecting the other.
Putting It All Together
To see the full pipeline in action:
- Run
docker-compose up -d. - Start the Flink SQL job as described in Step 2.
- Run the Python consumers in separate terminals:
python vitals_consumer.pypython logging_consumer.py
- Run the producer in a fourth terminal:
python producer.py
You will now see a real-time flow of data:
- The producer will print messages as it sends them.
- When an anomaly is sent, the Flink job will process it and create an alert.
- The vitals consumer will print messages as it inserts data into TimescaleDB.
- The logging consumer will print messages as it indexes documents in Elasticsearch.
You can verify the data in the databases:
- TimescaleDB: Connect using
psql -h localhost -p 5432 -U user -d wearablesand runSELECT * FROM vitals ORDER BY time DESC LIMIT 10;. - Elasticsearch: Open your browser to
http://localhost:9200/wearable_logs/_searchto see the raw logs.
Security and Performance Considerations
- Security: In a production environment, you would enable authentication and TLS encryption on Kafka, Flink, and the databases. Never run without security in production.
- Performance: Kafka topics should be partitioned to allow for parallel consumption. The number of Flink task managers can be scaled out to handle increased load.
- Data Schema: Using a schema registry like Confluent Schema Registry with Avro or Protobuf format is highly recommended over plain JSON for production systems. It ensures data consistency and provides better performance.
Conclusion
We've successfully designed and built a robust, real-time data ingestion pipeline for wearable device data. By leveraging an event-driven architecture with Kafka as the backbone, we've created a system that is scalable, resilient, and flexible.
We achieved:
- Decoupled services for ingestion, processing, and storage.
- Real-time anomaly detection using the power of Apache Flink.
- Efficient storage of time-series and log data in specialized databases.
This pattern is incredibly powerful and can be adapted for many other real-time use cases, from financial services to logistics and beyond.
Resources
- Apache Kafka: Official Documentation
- Apache Flink: Flink SQL Client Documentation
- TimescaleDB: Getting Started Guide
- Elasticsearch: Python Client Documentation