WellAlly Logo
WellAlly康心伴

Temperature (°C)

Humidity (%)

```\"}}]},{\"@type\":\"ProductReview\",\"@id\":\"/en/blog/real-time-iot-pipeline-esp32-nodejs#review\",\"itemReviewed\":{\"@id\":\"/en/blog/real-time-iot-pipeline-esp32-nodejs#article\"},\"reviewRating\":{\"@type\":\"Rating\",\"ratingValue\":\"5\",\"bestRating\":\"5\"},\"author\":{\"@type\":\"Person\",\"name\":\"WellAlly Editorial Team\"},\"reviewBody\":\"This article provides clear, accurate, and helpful health information, meeting our content standards.\"}]}","id":"blog-article-real-time-iot-pipeline-esp32-nodejs"}])
Development

From Wearable to Web: A Real-Time Data Pipeline with IoT, MQTT, and Node.js

A deep-dive tutorial on building a complete, real-time IoT data pipeline. Stream sensor data from an ESP32 wearable through an MQTT broker to a Node.js backend and visualize it live on a web app using WebSockets.

W
2025-12-12
10 min read

Introduction

Ever wondered how your fitness tracker sends your heart rate to your phone in real-time? Or how smart home sensors instantly update a dashboard? The magic lies in a robust, real-time data pipeline. For developers, building these pipelines is a core skill in the rapidly growing world of the Internet of Things (IoT).

In this deep-dive tutorial, we're going to build a complete, end-to-end data pipeline. We'll simulate a wearable device using a popular and affordable ESP32 microcontroller, stream its sensor data over the lightweight MQTT protocol, process it with a Node.js backend, and visualize it live on a web dashboard using WebSockets.

This project isn't just a theoretical exercise; it's a blueprint for building your own IoT applications, from personal health monitors to industrial sensor networks. You'll learn how to handle real-time data efficiently, ensuring low latency and high reliability.

Prerequisites:

  • Basic understanding of JavaScript (Node.js) and C++ (Arduino).
  • Node.js and npm installed on your machine.
  • Arduino IDE set up for ESP32 development.
  • An ESP32 development board (any model with Wi-Fi will do).
  • A sensor for your ESP32 (we'll use a BME280 for temperature and humidity, but you can easily adapt the code for any sensor).

Understanding the Problem: The IoT Data Challenge

Streaming data from millions of devices is complex. A naive approach of sending HTTP requests from each device to a central server doesn't scale. It's inefficient, creates tight coupling, and struggles with unreliable networks—a common scenario for IoT devices.

This is where the publish/subscribe (pub/sub) model, and specifically the MQTT protocol, shines.

  • Decoupling: Devices (publishers) send messages to a central hub called a broker on specific "topics" without knowing who will receive them. Applications (subscribers) listen to these topics to get the data they need.
  • Lightweight: MQTT is designed for constrained devices and low-bandwidth networks, making it perfect for battery-powered wearables.
  • Reliability: MQTT has built-in Quality of Service (QoS) levels to ensure message delivery, even over spotty connections.

Our architecture will look like this:

ESP32 (Wearable) -> MQTT Broker -> Node.js Backend -> WebSocket -> Web Browser

This approach is scalable, resilient, and the industry standard for many IoT applications.

Prerequisites & Setup

Before we start coding, let's get our environment ready.

1. The MQTT Broker

The broker is the central message hub. For this tutorial, we have two great options:

  • Easiest (Cloud-based): We'll use a free, public MQTT broker from EMQ X at broker.emqx.io. It requires no setup and is perfect for quick development.
  • Local & Secure: For a more production-like setup, you can install your own Mosquitto broker. You can do this easily with Docker:
code
# Pull the Mosquitto image
docker pull eclipse-mosquitto

# Run the broker
docker run -it -p 1883:1883 -p 9001:9001 -v $(pwd)/mosquitto.conf:/mosquitto/config/mosquitto.conf eclipse-mosquitto
Code collapsed

(For this tutorial, all examples will use the public broker.emqx.io for simplicity.)

2. Node.js Project Setup

On your computer, create a new project directory and initialize a Node.js project.

code
mkdir wearable-pipeline-backend
cd wearable-pipeline-backend
npm init -y
Code collapsed

Now, install the necessary libraries: mqtt for connecting to our broker and ws for creating the WebSocket server.

code
npm install mqtt ws
Code collapsed

Your package.json should look something like this:

code
{
  "name": "wearable-pipeline-backend",
  "version": "1.0.0",
  "description": "",
  "main": "server.js",
  "scripts": {
    "start": "node server.js"
  },
  "keywords": [],
  "author": "",
  "license": "ISC",
  "dependencies": {
    "mqtt": "^5.3.5",
    "ws": "^8.16.0"
  }
}
Code collapsed

3. Arduino IDE Setup for ESP32

Make sure your Arduino IDE is configured for your ESP32 board. You'll also need the PubSubClient library for MQTT and the necessary libraries for your sensor (e.g., Adafruit BME280 library).

You can install these through the Arduino IDE's Library Manager:

  1. Go to Sketch -> Include Library -> Manage Libraries...
  2. Search for and install PubSubClient.
  3. Search for and install Adafruit BME280 Library and its dependency, Adafruit Unified Sensor.

Step 1: The Wearable Device (ESP32)

What we're doing

We'll program the ESP32 to read sensor data periodically and publish it as a JSON payload to our MQTT broker on a specific topic.

Implementation

Fire up the Arduino IDE and paste in the following code. Remember to change the Wi-Fi credentials to your own!

code
// src/esp32_mqtt_publisher/esp32_mqtt_publisher.ino

#include <WiFi.h>
#include <PubSubClient.h>
#include <Adafruit_BME280.h>
#include <ArduinoJson.h>

// --- Wi-Fi and MQTT Configuration ---
const char* ssid = "YOUR_WIFI_SSID";
const char* password = "YOUR_WIFI_PASSWORD";
const char* mqtt_server = "broker.emqx.io";
const int mqtt_port = 1883;

// --- MQTT Topics ---
const char* temp_topic = "wearable/device1/temperature";
const char* humidity_topic = "wearable/device1/humidity";

// --- Globals ---
WiFiClient espClient;
PubSubClient client(espClient);
Adafruit_BME280 bme; // I2C
unsigned long lastMsg = 0;
#define MSG_BUFFER_SIZE (50)
char msg[MSG_BUFFER_SIZE];

void setup_wifi() {
  delay(10);
  Serial.println();
  Serial.print("Connecting to ");
  Serial.println(ssid);
  WiFi.begin(ssid, password);
  while (WiFi.status() != WL_CONNECTED) {
    delay(500);
    Serial.print(".");
  }
  Serial.println("");
  Serial.println("WiFi connected");
  Serial.println("IP address: ");
  Serial.println(WiFi.localIP());
}

void reconnect() {
  // Loop until we're reconnected
  while (!client.connected()) {
    Serial.print("Attempting MQTT connection...");
    // Create a random client ID
    String clientId = "ESP32Client-";
    clientId += String(random(0xffff), HEX);
    // Attempt to connect
    if (client.connect(clientId.c_str())) {
      Serial.println("connected");
    } else {
      Serial.print("failed, rc=");
      Serial.print(client.state());
      Serial.println(" try again in 5 seconds");
      // Wait 5 seconds before retrying
      delay(5000);
    }
  }
}

void setup() {
  Serial.begin(115200);
  setup_wifi();
  client.setServer(mqtt_server, mqtt_port);
  
  if (!bme.begin(0x76)) {
    Serial.println("Could not find a valid BME280 sensor, check wiring!");
    while (1);
  }
}

void loop() {
  if (!client.connected()) {
    reconnect();
  }
  client.loop();

  unsigned long now = millis();
  if (now - lastMsg > 5000) { // Publish every 5 seconds
    lastMsg = now;

    float temperature = bme.readTemperature(); // Celsius
    float humidity = bme.readHumidity();

    if (isnan(temperature) || isnan(humidity)) {
        Serial.println("Failed to read from BME sensor!");
        return;
    }

    // Prepare JSON payload for temperature
    StaticJsonDocument<128> tempDoc;
    tempDoc["deviceId"] = "device1";
    tempDoc["value"] = temperature;
    tempDoc["unit"] = "C";
    tempDoc["timestamp"] = millis();
    char tempOutput[128];
    serializeJson(tempDoc, tempOutput);

    // Publish temperature
    client.publish(temp_topic, tempOutput);
    Serial.print("Published to ");
    Serial.print(temp_topic);
    Serial.println(tempOutput);
    
    // Prepare JSON payload for humidity
    StaticJsonDocument<128> humidityDoc;
    humidityDoc["deviceId"] = "device1";
    humidityDoc["value"] = humidity;
    humidityDoc["unit"] = "%";
    humidityDoc["timestamp"] = millis();
    char humidityOutput[128];
    serializeJson(humidityDoc, humidityOutput);

    // Publish humidity
    client.publish(humidity_topic, humidityOutput);
    Serial.print("Published to ");
    Serial.print(humidity_topic);
    Serial.println(humidityOutput);
  }
}
Code collapsed

How it works

  1. setup_wifi(): Connects the ESP32 to your local Wi-Fi network.
  2. reconnect(): This function handles connecting to the MQTT broker. If the connection drops, it will keep retrying. This is crucial for device resilience.
  3. setup(): Initializes the serial monitor, Wi-Fi, MQTT server connection, and the BME280 sensor.
  4. loop(): The main logic. It ensures the MQTT client is connected, then every 5 seconds, it reads temperature and humidity. It formats this data into a simple JSON object using ArduinoJson and publishes each reading to its respective topic (wearable/device1/temperature and wearable/device1/humidity) using client.publish().

Common Pitfalls

  • Wrong Wi-Fi Credentials: Double-check your SSID and password. The Serial Monitor will hang on "Connecting to..." if they are incorrect.
  • Firewall Issues: Ensure your network doesn't block port 1883, the standard MQTT port.
  • Sensor Wiring: If you see "Could not find a valid BME280 sensor," check your I2C wiring (SDA/SCL pins).

Upload the code and open the Serial Monitor. You should see it connect to Wi-Fi and start publishing data! ✨

Step 2: The Node.js Backend (MQTT-to-WebSocket Bridge)

What we're doing

This is the core of our pipeline. We'll create a Node.js server that:

  1. Connects to the MQTT broker and subscribes to the topics our ESP32 is publishing to.
  2. Creates a WebSocket server to accept connections from web browsers.
  3. When it receives an MQTT message, it immediately broadcasts that message to all connected WebSocket clients.

Implementation

In your wearable-pipeline-backend directory, create a file named server.js.

code
// server.js

const mqtt = require('mqtt');
const { WebSocketServer } = require('ws');

// --- MQTT Configuration ---
const MQTT_BROKER = 'mqtt://broker.emqx.io';
const TOPICS = ['wearable/device1/temperature', 'wearable/device1/humidity'];

// --- WebSocket Server Configuration ---
const WSS_PORT = 8080;

// 1. Create a WebSocket server
const wss = new WebSocketServer({ port: WSS_PORT });
console.log(`WebSocket server started on port ${WSS_PORT}`);

wss.on('connection', (ws) => {
  console.log('Client connected to WebSocket');
  ws.on('close', () => console.log('Client disconnected'));
});

// Function to broadcast to all clients
function broadcast(topic, message) {
  console.log(`Broadcasting to ${wss.clients.size} clients:`, topic, message.toString());
  wss.clients.forEach(client => {
    if (client.readyState === 1) { // WebSocket.OPEN = 1
      client.send(JSON.stringify({ topic, data: JSON.parse(message.toString()) }));
    }
  });
}

// 2. Connect to MQTT Broker
const client = mqtt.connect(MQTT_BROKER);

client.on('connect', () => {
  console.log('Connected to MQTT broker');
  
  // 3. Subscribe to all topics
  TOPICS.forEach(topic => {
    client.subscribe(topic, (err) => {
      if (!err) {
        console.log(`Subscribed to topic: ${topic}`);
      }
    });
  });
});

// 4. Handle incoming messages
client.on('message', (topic, message) => {
  console.log(`Received message from topic ${topic}: ${message.toString()}`);
  
  // Broadcast the message to all WebSocket clients
  broadcast(topic, message);
});

client.on('error', (err) => {
    console.error('MQTT Error:', err);
    client.end();
});
Code collapsed

How it works

  1. WebSocket Server Setup: We create a new WebSocketServer instance on port 8080. We add a simple log to see when clients connect and disconnect.
  2. MQTT Client Setup: We use mqtt.connect() to connect to the public broker.
  3. Subscription: In the client.on('connect', ...) callback, we loop through our TOPICS array and subscribe to each one. This tells the broker we want to receive messages published on these topics.
  4. Message Handling: The client.on('message', ...) event is the magic link. When a message arrives from the broker, this function is triggered. We then call our broadcast function.
  5. Broadcasting: The broadcast function iterates over all connected WebSocket clients (wss.clients) and sends them the data. We wrap the topic and the parsed message data into a new JSON object for easy handling on the frontend.

Run the server from your terminal:

code
node server.js
Code collapsed

You should see it connect to the MQTT broker and subscribe to the topics. If your ESP32 is still running, you'll see the incoming messages logged to your terminal!

Step 3: The Web App (Frontend Visualization)

What we're doing

Finally, let's build the user-facing part: a simple HTML page with JavaScript that connects to our Node.js WebSocket server and visualizes the incoming data in real-time using Chart.js.

Implementation

In your backend project folder, create a new file named index.html.

code
<!-- index.html -->
<!DOCTYPE html>
<html lang="en">
<head>
    <meta charset="UTF-8">
    <meta name="viewport" content="width=device-width, initial-scale=1.0">
    <title>Real-Time Wearable Data</title>
    <script src="https://cdn.jsdelivr.net/npm/chart.js"></script>
    <style>
        body { font-family: sans-serif; display: flex; justify-content: center; align-items: center; height: 100vh; background-color: #f0f2f5; }
        .dashboard { display: flex; gap: 20px; }
        .chart-container { width: 500px; padding: 20px; background: white; border-radius: 8px; box-shadow: 0 2px 4px rgba(0,0,0,0.1); }
        h1 { text-align: center; }
    </style>
</head>
<body>
    <div class="dashboard">
        <div class="chart-container">
            <h1>Temperature (°C)</h1>
            <canvas id="tempChart"></canvas>
        </div>
        <div class="chart-container">
            <h1>Humidity (%)</h1>
            <canvas id="humidityChart"></canvas>
        </div>
    </div>

<script>
    const MAX_DATA_POINTS = 20;

    // --- Chart.js Setup for Temperature ---
    const tempCtx = document.getElementById('tempChart').getContext('2d');
    const tempChart = new Chart(tempCtx, {
        type: 'line',
        data: {
            labels: [],
            datasets: [{
                label: 'Temperature',
                data: [],
                borderColor: 'rgba(255, 99, 132, 1)',
                backgroundColor: 'rgba(255, 99, 132, 0.2)',
                borderWidth: 2,
                fill: true
            }]
        },
        options: {
            scales: {
                x: { title: { display: true, text: 'Time' } },
                y: { title: { display: true, text: 'Value' } }
            },
            animation: { duration: 200 }
        }
    });

    // --- Chart.js Setup for Humidity ---
    const humidityCtx = document.getElementById('humidityChart').getContext('2d');
    const humidityChart = new Chart(humidityCtx, {
        type: 'line',
        data: {
            labels: [],
            datasets: [{
                label: 'Humidity',
                data: [],
                borderColor: 'rgba(54, 162, 235, 1)',
                backgroundColor: 'rgba(54, 162, 235, 0.2)',
                borderWidth: 2,
                fill: true
            }]
        },
        options: {
            scales: {
                x: { title: { display: true, text: 'Time' } },
                y: { title: { display: true, text: 'Value' } }
            },
            animation: { duration: 200 }
        }
    });

    function addData(chart, label, data) {
        chart.data.labels.push(label);
        chart.data.datasets.forEach((dataset) => {
            dataset.data.push(data);
        });

        // Limit the number of data points
        if (chart.data.labels.length > MAX_DATA_POINTS) {
            chart.data.labels.shift();
            chart.data.datasets.forEach((dataset) => {
                dataset.data.shift();
            });
        }
        chart.update();
    }

    // --- WebSocket Connection ---
    const socket = new WebSocket('ws://localhost:8080');

    socket.onopen = () => {
        console.log('WebSocket connection established');
    };

    socket.onmessage = (event) => {
        const message = JSON.parse(event.data);
        const topic = message.topic;
        const data = message.data;
        const now = new Date();
        const timeLabel = `${now.getHours()}:${now.getMinutes()}:${now.getSeconds()}`;

        console.log(`Received from ${topic}:`, data);

        if (topic.includes('temperature')) {
            addData(tempChart, timeLabel, data.value);
        } else if (topic.includes('humidity')) {
            addData(humidityChart, timeLabel, data.value);
        }
    };

    socket.onclose = () => {
        console.log('WebSocket connection closed');
    };

    socket.onerror = (error) => {
        console.error('WebSocket error:', error);
    };

</script>
</body>
</html>
Code collapsed

How it works

  1. Chart Setup: We use the Chart.js library to create two line charts, one for temperature and one for humidity. We configure them with some basic styling and labels.
  2. WebSocket Client: We create a new WebSocket instance, connecting to our backend server at ws://localhost:8080.
  3. socket.onmessage: This is where we receive the data pushed from our Node.js server. We parse the incoming JSON string.
  4. Data Routing: We check the topic from the message to decide which chart to update.
  5. addData() Function: This helper function pushes the new data point and a timestamp label to the correct chart. To keep the chart readable, it also removes the oldest data point if we exceed MAX_DATA_POINTS, creating a "sliding window" effect.

To see it in action, simply open the index.html file in your web browser. The charts will start updating in real-time as data arrives from your ESP32!

Security Best Practices

Our current setup is great for development, but for a production environment, security is non-negotiable. Here are the critical next steps.

  • Disable Anonymous Access on your Broker: In a production Mosquitto setup, you must disable anonymous connections and create user accounts with passwords.
    • Use mosquitto_passwd to create a password file.
    • In your mosquitto.conf, set allow_anonymous false and password_file /path/to/passwordfile.
  • Use TLS/SSL Encryption: MQTT and WebSocket traffic should be encrypted.
    • Configure your MQTT broker to use TLS on port 8883. Your ESP32 and Node.js clients will need to be configured to use a secure connection (mqtts:// and wss://).
    • This prevents man-in-the-middle attacks where an attacker could sniff your sensor data or credentials.
  • Implement Authorization (ACLs): Use Access Control Lists (ACLs) to enforce the principle of least privilege.
    • A specific device (device1) should only be allowed to publish to its own topics (wearable/device1/+).
    • Your backend server should only be allowed to subscribe to the topics it needs (wearable/+/+).
    • This prevents a compromised device from publishing malicious data to other topics or listening in on data it shouldn't have access to.

Conclusion

Congratulations! You've successfully built a complete, real-time IoT data pipeline. We've gone from a physical sensor on an ESP32, through a scalable MQTT broker, to a reactive Node.js backend, and finally to a live-updating dashboard in a web browser.

You now have a solid foundation to build upon. You can expand this project by:

  • Adding more sensors and devices.
  • Storing the data in a time-series database like InfluxDB or TimescaleDB for historical analysis.
  • Adding alerting rules in the backend to send notifications when sensor values cross a certain threshold.
  • Building a more sophisticated frontend dashboard with React, Vue, or Svelte.

The pub/sub architecture using MQTT and WebSockets is incredibly powerful and forms the backbone of countless professional IoT systems. Go ahead and start building!

Resources

#

Article Tags

iot
nodejs
mqtt
esp32
architecture

Related Medical Knowledge

Learn more about related medical concepts and tests

W

WellAlly's core development team, comprised of healthcare professionals, software engineers, and UX designers committed to revolutionizing digital health management.

Expertise

Healthcare Technology
Software Development
User Experience
AI & Machine Learning

Found this article helpful?

Try KangXinBan and start your health management journey