In the world of health and wellness applications, data is king. Users log workouts, track meals, and monitor their sleep, generating a constant stream of data. On the flip side, they expect to see beautiful, insightful dashboards showing their progress over weeks, months, or even years. This creates a classic architectural challenge: your database must be optimized for both frequent, simple writes (e.g., "log my 10km run") and complex, analytical reads (e.g., "show my average running pace trend, segmented by day of the week, for the past year").
A traditional monolithic architecture, where a single database handles both tasks, will inevitably hit a bottleneck. The data model that's perfect for quick writes (normalized) is often terrible for fast analytical queries, which may require complex joins and aggregations. This is where the Command Query Responsibility Segregation (CQRS) pattern comes in.
CQRS is an architectural pattern that separates the models for reading and writing data. Instead of one model handling everything, we'll have two:
- A Command model for handling state changes (writes, updates, deletes).
- A Query model for retrieving data.
In this tutorial, we'll build a simplified wellness analytics system that demonstrates the power of CQRS. We'll create a "write" side with a PostgreSQL database and a "read" side with a denormalized ClickHouse database, all connected by a Kafka event bus. This architecture allows us to scale each part of our system independently, ensuring both fast workout logging and lightning-fast analytics dashboards. ✨
Understanding the Problem
Imagine a typical wellness app. The write operations are simple and frequent:
LogWorkout (userId, type: 'run', duration: 3600, distance: 10)AddMeal (userId, name: 'Salad', calories: 450)
These are best handled by a transactional, normalized database like PostgreSQL to ensure data integrity.
The read operations, however, are a different beast. A user wants to see their dashboard with queries like:
- "What's my total distance run per month for the last year?"
- "Show me a weekly average of my caloric intake."
- "Compare my workout durations on weekdays vs. weekends."
Running these queries against a normalized transactional database can be slow and resource-intensive, often requiring multiple joins and on-the-fly calculations. As your user base and data volume grow, these queries will grind your application to a halt.
By separating these concerns, CQRS allows us to use the best tool for each job. We can have a write model optimized for transactional consistency and a read model that is denormalized and tailored for the specific queries our analytics dashboard will make.
Prerequisites
Before we start, you should have a basic understanding of:
- REST APIs (we'll use Node.js with Express for our examples)
- Docker and Docker Compose (for running our infrastructure)
- Basic SQL
You'll need the following installed:
- Node.js (v18 or later)
- Docker
Step 1: Setting Up the "Write" Side with PostgreSQL
Our write side, or "Command" side, will consist of a simple API to log new workouts and a PostgreSQL database to store them.
What we're doing
We'll define a simple, normalized schema in PostgreSQL for our workouts table and create an API endpoint to insert new records.
Implementation
First, let's create a docker-compose.yml file to run our infrastructure: PostgreSQL, Kafka, and ClickHouse.
# docker-compose.yml
version: '3.8'
services:
postgres:
image: postgres:15
restart: always
environment:
- POSTGRES_USER=user
- POSTGRES_PASSWORD=password
- POSTGRES_DB=wellness_write_db
ports:
- '5432:5432'
volumes:
- postgres_data:/var/lib/postgresql/data
zookeeper:
image: confluentinc/cp-zookeeper:7.3.0
environment:
ZOOKEEPER_CLIENT_PORT: 2181
kafka:
image: confluentinc/cp-kafka:7.3.0
depends_on: [zookeeper]
ports:
- "9092:9092"
environment:
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:9092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
clickhouse:
image: clickhouse/clickhouse-server:latest
ports:
- "8123:8123"
- "9000:9000"
volumes:
- clickhouse_data:/var/lib/clickhouse
volumes:
postgres_data:
clickhouse_data:
Run docker-compose up -d to start all the services.
Now, let's set up a simple Node.js application for our Command API.
mkdir wellness-app
cd wellness-app
npm init -y
npm install express pg kafkajs
Here's the code for our "write" service.
// command-service/index.js
const express = require('express');
const { Pool } = require('pg');
const { Kafka } = require('kafkajs');
const app = express();
app.use(express.json());
// PostgreSQL setup
const pool = new Pool({
user: 'user',
host: 'localhost',
database: 'wellness_write_db',
password: 'password',
port: 5432,
});
// Kafka setup
const kafka = new Kafka({
clientId: 'wellness-app',
brokers: ['localhost:9092']
});
const producer = kafka.producer();
// Create table if it doesn't exist
const initDb = async () => {
await pool.query(`
CREATE TABLE IF NOT EXISTS workouts (
id SERIAL PRIMARY KEY,
user_id VARCHAR(50) NOT NULL,
type VARCHAR(50) NOT NULL,
duration_seconds INT NOT NULL,
distance_km FLOAT,
logged_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
);
`);
};
app.post('/log-workout', async (req, res) => {
try {
const { userId, type, durationSeconds, distanceKm } = req.body;
// 1. Write to the transactional database
const result = await pool.query(
'INSERT INTO workouts(user_id, type, duration_seconds, distance_km) VALUES($1, $2, $3, $4) RETURNING *',
[userId, type, durationSeconds, distanceKm]
);
const newWorkout = result.rows;
console.log('Workout logged to PostgreSQL:', newWorkout);
// 2. Publish an event to Kafka
await producer.send({
topic: 'workout-logged',
messages: [{ value: JSON.stringify(newWorkout) }],
});
console.log('Event published to Kafka topic: workout-logged');
res.status(201).json(newWorkout);
} catch (error) {
console.error('Failed to log workout:', error);
res.status(500).send('Internal Server Error');
}
});
const start = async () => {
await initDb();
await producer.connect();
app.listen(3000, () => {
console.log('Command service listening on port 3000');
});
};
start();
How it works
- API Endpoint: We have a single endpoint,
/log-workout, that accepts a new workout record. - Transactional Write: The data is first written to our PostgreSQL
workoutstable. This is our "source of truth". The operation is atomic and ensures data integrity. - Event Publishing: Upon a successful database write, we publish an event to a Kafka topic named
workout-logged. This event contains the full details of the newly created workout. This is a crucial step for decoupling our write and read sides.
Step 2: Creating a Denormalized Read Model in ClickHouse
Now for the fun part: the "read" side. Our goal is to create a data model in ClickHouse that's optimized for the exact queries our analytics dashboard will need.
What we're doing
We will create a table in ClickHouse designed for fast analytical queries. ClickHouse is a columnar database, which makes it exceptionally fast for OLAP (Online Analytical Processing) workloads. We will also set up a mechanism to ingest data from our Kafka topic into this ClickHouse table.
Implementation
ClickHouse has a native Kafka engine that can directly subscribe to a Kafka topic. This makes the integration incredibly seamless.
First, connect to the ClickHouse instance. You can use the command-line client via Docker:
docker-compose exec -it clickhouse clickhouse-client
Inside the ClickHouse client, run the following SQL commands:
-- 1. Create the target table for our denormalized analytics data.
-- This is where the data will be permanently stored for fast queries.
CREATE TABLE IF NOT EXISTS wellness_analytics.workouts_view
(
`id` UInt32,
`user_id` String,
`type` String,
`duration_seconds` UInt32,
`distance_km` Float32,
`logged_at` DateTime,
`day_of_week` String,
`is_weekend` UInt8
)
ENGINE = MergeTree()
PARTITION BY toYYYYMM(logged_at)
ORDER BY (user_id, logged_at);
-- 2. Create a Kafka engine table.
-- This table acts as a live stream from our Kafka topic. It doesn't store data itself.
CREATE TABLE IF NOT EXISTS wellness_analytics.workouts_queue
(
`id` UInt32,
`user_id` String,
`type` String,
`duration_seconds` UInt32,
`distance_km` Float32,
`logged_at` DateTime
)
ENGINE = Kafka()
SETTINGS
kafka_broker_list = 'kafka:9092',
kafka_topic_list = 'workout-logged',
kafka_group_name = 'clickhouse_wellness_consumer',
kafka_format = 'JSONEachRow';
-- 3. Create a Materialized View.
-- This is the magic that connects the Kafka stream to our target analytics table.
-- It automatically listens for new messages in workouts_queue and inserts them
-- into workouts_view, performing transformations along the way.
CREATE MATERIALIZED VIEW IF NOT EXISTS wellness_analytics.workouts_consumer TO wellness_analytics.workouts_view
AS SELECT
id,
user_id,
type,
duration_seconds,
distance_km,
logged_at,
formatDateTime(logged_at, '%A') AS day_of_week,
if(in(toDayOfWeek(logged_at),), 1, 0) AS is_weekend
FROM wellness_analytics.workouts_queue;
How it works
workouts_viewTable: This is our main analytics table. We've used theMergeTreeengine, which is ClickHouse's powerhouse for analytics. Notice we've added pre-calculated, denormalized columns likeday_of_weekandis_weekend. This avoids doing these calculations at query time, making our dashboards faster.workouts_queueTable: This table uses theKafkaengine to connect to our Kafka broker and subscribe to theworkout-loggedtopic. It essentially provides a live, streaming view of the messages in the topic.workouts_consumerMaterialized View: This is the ETL (Extract, Transform, Load) pipeline. It automatically pulls new records fromworkouts_queue, applies simple transformations (like calculatingday_of_week), and loads them into ourworkouts_viewtable.
Step 3: Putting It All Together and Querying Analytics
With our write and read sides set up, let's test the entire flow.
Post a new workout
Use a tool like curl or Postman to send a POST request to our command service:
curl -X POST http://localhost:3000/log-workout \
-H "Content-Type: application/json" \
-d '{
"userId": "user123",
"type": "run",
"durationSeconds": 3600,
"distanceKm": 10.5
}'
You should see logs in your command service console indicating that the workout was logged to PostgreSQL and an event was published to Kafka.
Verify data in ClickHouse
Now, let's query our ClickHouse read model. Go back to your ClickHouse client:
SELECT * FROM wellness_analytics.workouts_view;
Within moments, you should see the new workout record, complete with the added day_of_week and is_weekend fields!
Run complex analytical queries
This is where our architecture shines. Let's run a query that would have been slow on our transactional database. For example, "What is the total distance and average duration of workouts per user, grouped by workout type?"
SELECT
user_id,
type,
sum(distance_km) as total_distance,
avg(duration_seconds) / 60 as avg_duration_minutes
FROM wellness_analytics.workouts_view
GROUP BY user_id, type;
This query will be incredibly fast in ClickHouse, even with millions or billions of rows, because the data is stored in a columnar format optimized for such aggregations.
Performance Considerations
- Independent Scaling: The biggest performance win is the ability to scale reads and writes independently. If your analytics dashboards become popular, you can add more ClickHouse nodes without affecting the performance of your workout logging service.
- ClickHouse Optimization: The
MergeTreeengine in ClickHouse is highly tunable. Partitioning your data (as we did by month) is crucial for performance, as it allows ClickHouse to skip reading irrelevant data. - Eventual Consistency: It's important to understand that this architecture is eventually consistent. There will be a slight delay (usually milliseconds to seconds) between a workout being logged and it appearing in the analytics dashboard. For most wellness analytics use cases, this is a perfectly acceptable trade-off for the massive performance gains.
Alternative Approaches
- Apache Druid: For use cases requiring even lower latency real-time ingestion and dashboards, Apache Druid is a strong alternative to ClickHouse. Druid's architecture is more distributed and can be more complex to manage, but it excels at real-time, event-driven scenarios.
- Change Data Capture (CDC): Instead of the application publishing events, you could use a CDC tool like Debezium to stream changes directly from PostgreSQL's write-ahead log (WAL) to Kafka. This further decouples the write service from the eventing system.
Conclusion
We've successfully built a scalable analytics backend using the CQRS pattern. By separating our write model (PostgreSQL) from our read model (ClickHouse) and connecting them with an event bus (Kafka), we've created a system that is:
- Performant: Optimized databases for both transactional writes and analytical reads.
- Scalable: We can scale the read and write sides independently to meet demand.
- Maintainable: The separation of concerns makes the system easier to understand, develop, and maintain.
This pattern is incredibly powerful for any application with mismatched read and write workloads, from e-commerce platforms to IoT systems and, of course, wellness analytics.
Next Steps
- Build a Query API: Create a new service that exposes endpoints to query the ClickHouse
workouts_viewand serves data to a frontend dashboard. - Explore Richer Analytics: Use ClickHouse's powerful SQL dialect and functions to build more complex analytics, like time-series analysis or funnel analysis.
- Error Handling: Implement a dead-letter queue in Kafka to handle events that fail to be processed by the ClickHouse consumer.