康心伴Logo
康心伴WellAlly
Backend Development

使用Node.js和RabbitMQ构建事件驱动健身系统

5 分钟阅读

使用Node.js和RabbitMQ构建事件驱动健身系统

现代健身应用需要处理多种实时事件——运动数据上传、成就解锁、社交互动等。事件驱动架构(EDA)能够优雅地处理这些异步场景。本文将教你如何使用Node.js和RabbitMQ构建一个可扩展的事件驱动健身系统。

为什么使用事件驱动架构?

特性传统架构事件驱动架构
耦合度紧耦合松耦合
扩展性垂直扩展水平扩展
响应性同步阻塞异步非阻塞
可靠性单点故障消息持久化

健身应用场景:

  • 运动数据上传后触发多个处理流程
  • 成就系统需要监听各种事件
  • 通知系统需要异步发送
  • 数据分析需要离线处理

项目架构

code
fitness-events/
├── services/
│   ├── api-gateway/               # API网关
│   │   ├── src/
│   │   │   ├── routes/
│   │   │   ├── controllers/
│   │   │   └── events/            # 事件发布者
│   │   └── package.json
│   ├── workout-processor/         # 运动数据处理
│   │   ├── src/
│   │   │   ├── consumers/         # 消息消费者
│   │   │   ├── processors/        # 业务处理
│   │   │   └── publishers/        # 结果发布
│   │   └── package.json
│   ├── achievement-service/       # 成就系统
│   │   ├── src/
│   │   │   ├── consumers/
│   │   │   ├── evaluators/        # 成就评估
│   │   │   └── notifiers/
│   │   └── package.json
│   └── notification-service/      # 通知服务
│       ├── src/
│       │   ├── consumers/
│       │   ├── senders/           # 消息发送
│       │   └── templates/         # 消息模板
│       └── package.json
├── shared/
│   ├── events/                    # 事件定义
│   ├── utils/                     # 共享工具
│   └── config/                    # 共享配置
├── docker/
│   ├── docker-compose.yml
│   └── rabbitmq/
│       └── definitions.json       # RabbitMQ配置
└── README.md
Code collapsed

1. 事件定义

shared/events/types.js

code
// 事件类型定义
module.exports = {
  // 运动相关事件
  WORKOUT: {
    STARTED: 'workout.started',
    COMPLETED: 'workout.completed',
    DATA_UPLOADED: 'workout.data_uploaded',
    UPDATED: 'workout.updated',
    DELETED: 'workout.deleted',
  },

  // 用户相关事件
  USER: {
    REGISTERED: 'user.registered',
    PROFILE_UPDATED: 'user.profile_updated',
    GOAL_SET: 'user.goal_set',
    GOAL_ACHIEVED: 'user.goal_achieved',
  },

  // 成就相关事件
  ACHIEVEMENT: {
    UNLOCKED: 'achievement.unlocked',
    PROGRESS_UPDATED: 'achievement.progress_updated',
    MILESTONE_REACHED: 'achievement.milestone_reached',
  },

  // 社交相关事件
  SOCIAL: {
    FOLLOW: 'social.follow',
    UNFOLLOW: 'social.unfollow',
    COMMENT: 'social.comment',
    LIKE: 'social.like',
  },

  // 通知相关事件
  NOTIFICATION: {
    SEND: 'notification.send',
    SENT: 'notification.sent',
    FAILED: 'notification.failed',
  },

  // 系统事件
  SYSTEM: {
    ERROR: 'system.error',
    WARNING: 'system.warning',
    METRICS: 'system.metrics',
  },
};

// 事件基类
class BaseEvent {
  constructor(type, data, metadata = {}) {
    this.type = type;
    this.data = data;
    this.metadata = {
      id: generateEventId(),
      timestamp: new Date().toISOString(),
      version: '1.0',
      correlationId: metadata.correlationId || generateCorrelationId(),
      ...metadata,
    };
  }

  toJSON() {
    return {
      type: this.type,
      data: this.data,
      metadata: this.metadata,
    };
  }
}

// 具体事件类
class WorkoutStartedEvent extends BaseEvent {
  constructor(workoutId, userId, workoutType, metadata = {}) {
    super(
      EVENTS.WORKOUT.STARTED,
      { workoutId, userId, workoutType },
      metadata
    );
  }
}

class WorkoutCompletedEvent extends BaseEvent {
  constructor(workoutId, userId, stats, metadata = {}) {
    super(
      EVENTS.WORKOUT.COMPLETED,
      {
        workoutId,
        userId,
        duration: stats.duration,
        calories: stats.calories,
        distance: stats.distance,
        heartRate: stats.heartRate,
      },
      metadata
    );
  }
}

class AchievementUnlockedEvent extends BaseEvent {
  constructor(userId, achievementId, achievementName, metadata = {}) {
    super(
      EVENTS.ACHIEVEMENT.UNLOCKED,
      { userId, achievementId, achievementName },
      metadata
    );
  }
}

function generateEventId() {
  return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}

function generateCorrelationId() {
  return `corr_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}

module.exports = {
  BaseEvent,
  WorkoutStartedEvent,
  WorkoutCompletedEvent,
  AchievementUnlockedEvent,
};
Code collapsed

2. RabbitMQ连接管理

shared/utils/rabbitmq.js

code
const amqp = require('amqplib');

class RabbitMQConnection {
  constructor(config) {
    this.config = config;
    this.connection = null;
    this.channel = null;
    this.connecting = false;
    this.reconnectDelay = 5000;
  }

  async connect() {
    if (this.connecting || this.connection) {
      return;
    }

    this.connecting = true;

    try {
      this.connection = await amqp.connect(this.config.url);
      this.channel = await this.connection.createChannel();

      // 设置错误处理
      this.connection.on('error', (err) => {
        console.error('RabbitMQ连接错误:', err);
      });

      this.connection.on('close', () => {
        console.warn('RabbitMQ连接已关闭,尝试重连...');
        setTimeout(() => this.connect(), this.reconnectDelay);
      });

      // 设置QoS
      await this.channel.prefetch(this.config.prefetch || 10);

      console.log('RabbitMQ连接成功');
      this.connecting = false;
    } catch (error) {
      console.error('RabbitMQ连接失败:', error);
      this.connecting = false;
      throw error;
    }
  }

  async createExchange(name, type = 'topic', options = {}) {
    if (!this.channel) {
      throw new Error('RabbitMQ未连接');
    }

    await this.channel.assertExchange(name, type, {
      durable: options.durable !== false,
      ...options,
    });

    console.log(`Exchange创建成功: ${name}`);
  }

  async createQueue(name, options = {}) {
    if (!this.channel) {
      throw new Error('RabbitMQ未连接');
    }

    const queue = await this.channel.assertQueue(name, {
      durable: options.durable !== false,
      ...options,
    });

    console.log(`Queue创建成功: ${name}`);
    return queue;
  }

  async bindQueue(queue, exchange, routingKey) {
    if (!this.channel) {
      throw new Error('RabbitMQ未连接');
    }

    await this.channel.bindQueue(queue, exchange, routingKey);
    console.log(`绑定成功: ${queue} -> ${exchange} (${routingKey})`);
  }

  async publish(exchange, routingKey, message, options = {}) {
    if (!this.channel) {
      throw new Error('RabbitMQ未连接');
    }

    const content = Buffer.from(JSON.stringify(message));
    const result = this.channel.publish(exchange, routingKey, content, {
      persistent: true,
      contentType: 'application/json',
      ...options,
    });

    if (!result) {
      console.warn('消息发送失败,缓冲区已满');
    }

    return result;
  }

  async consume(queue, handler, options = {}) {
    if (!this.channel) {
      throw new Error('RabbitMQ未连接');
    }

    await this.channel.consume(
      queue,
      async (msg) => {
        if (!msg) {
          return;
        }

        try {
          const content = JSON.parse(msg.content.toString());
          await handler(content, msg);

          // 确认消息
          this.channel.ack(msg);
        } catch (error) {
          console.error('消息处理失败:', error);

          // 拒绝消息并重新入队
          if (options.requeue !== false) {
            this.channel.nack(msg, false, true);
          } else {
            this.channel.ack(msg);
          }
        }
      },
      {
        noAck: false,
        ...options,
      }
    );
  }

  async close() {
    try {
      if (this.channel) {
        await this.channel.close();
      }
      if (this.connection) {
        await this.connection.close();
      }
      console.log('RabbitMQ连接已关闭');
    } catch (error) {
      console.error('关闭RabbitMQ连接失败:', error);
    }
  }
}

module.exports = RabbitMQConnection;
Code collapsed

3. 事件发布者

services/api-gateway/src/events/publisher.js

code
const RabbitMQConnection = require('../../shared/utils/rabbitmq');
const { WorkoutStartedEvent, WorkoutCompletedEvent } = require('../../shared/events/types');

class EventPublisher {
  constructor(config) {
    this.rabbitmq = new RabbitMQConnection(config.rabbitmq);
    this.exchange = 'fitness.events';
  }

  async initialize() {
    await this.rabbitmq.connect();
    await this.rabbitmq.createExchange(this.exchange, 'topic', { durable: true });
  }

  async publishWorkoutStarted(workoutId, userId, workoutType) {
    const event = new WorkoutStartedEvent(workoutId, userId, workoutType);
    return await this.rabbitmq.publish(
      this.exchange,
      'workout.started',
      event.toJSON()
    );
  }

  async publishWorkoutCompleted(workoutId, userId, stats) {
    const event = new WorkoutCompletedEvent(workoutId, userId, stats);
    return await this.rabbitmq.publish(
      this.exchange,
      'workout.completed',
      event.toJSON()
    );
  }

  async publishAchievementUnlocked(userId, achievementId, achievementName) {
    const event = new AchievementUnlockedEvent(userId, achievementId, achievementName);
    return await this.rabbitmq.publish(
      this.exchange,
      'achievement.unlocked',
      event.toJSON()
    );
  }
}

module.exports = EventPublisher;
Code collapsed

4. 运动数据处理服务

services/workout-processor/src/consumers/workout.js

code
const RabbitMQConnection = require('../../shared/utils/rabbitmq');

class WorkoutConsumer {
  constructor(config, processor) {
    this.rabbitmq = new RabbitMQConnection(config.rabbitmq);
    this.processor = processor;
    this.exchange = 'fitness.events';
    this.queue = 'workout.processor';
  }

  async initialize() {
    await this.rabbitmq.connect();

    // 创建exchange
    await this.rabbitmq.createExchange(this.exchange, 'topic');

    // 创建queue
    await this.rabbitmq.createQueue(this.queue, {
      durable: true,
      deadLetterExchange: `${this.exchange}.dlx`,
    });

    // 绑定routing key
    await this.rabbitmq.bindQueue(this.queue, this.exchange, 'workout.#');
    await this.rabbitmq.bindQueue(this.queue, this.exchange, 'user.goal_achieved');
  }

  async start() {
    await this.rabbitmq.consume(this.queue, async (message) => {
      console.log(`收到消息: ${message.type}`);

      switch (message.type) {
        case 'workout.started':
          await this.handleWorkoutStarted(message);
          break;
        case 'workout.completed':
          await this.handleWorkoutCompleted(message);
          break;
        case 'workout.data_uploaded':
          await this.handleDataUploaded(message);
          break;
        default:
          console.warn(`未知事件类型: ${message.type}`);
      }
    });
  }

  async handleWorkoutStarted(event) {
    await this.processor.processWorkoutStarted(event.data);
  }

  async handleWorkoutCompleted(event) {
    await this.processor.processWorkoutCompleted(event.data);
  }

  async handleDataUploaded(event) {
    await this.processor.processDataUpload(event.data);
  }
}

module.exports = WorkoutConsumer;
Code collapsed

services/workout-processor/src/processors/workout.js

code
const WorkoutPublisher = require('../publishers/workout');

class WorkoutProcessor {
  constructor(database, publisher) {
    this.database = database;
    this.publisher = publisher;
  }

  async processWorkoutStarted(data) {
    console.log(`处理运动开始事件: ${data.workoutId}`);

    // 1. 保存运动记录
    await this.database.workouts.create({
      id: data.workoutId,
      userId: data.userId,
      type: data.workoutType,
      status: 'started',
      startTime: new Date(),
    });

    // 2. 初始化实时统计
    await this.database.workoutStats.create({
      workoutId: data.workoutId,
      userId: data.userId,
      metrics: {},
    });

    // 3. 发布用户通知
    await this.publisher.publishUserNotification(data.userId, {
      type: 'workout_started',
      message: '开始新的运动!',
    });
  }

  async processWorkoutCompleted(data) {
    console.log(`处理运动完成事件: ${data.workoutId}`);

    // 1. 更新运动记录
    await this.database.workouts.update(data.workoutId, {
      status: 'completed',
      endTime: new Date(),
      duration: data.duration,
      calories: data.calories,
      distance: data.distance,
    });

    // 2. 计算运动指标
    const metrics = await this.calculateMetrics(data);

    // 3. 保存详细数据
    await this.database.workoutStats.update(data.workoutId, {
      metrics: metrics,
      heartRate: data.heartRate,
    });

    // 4. 更新用户统计
    await this.updateUserStats(data.userId, metrics);

    // 5. 检查目标完成情况
    await this.checkGoalAchievement(data.userId, metrics);

    // 6. 发布运动完成事件
    await this.publisher.publishWorkoutProcessed(data.workoutId, metrics);
  }

  async processDataUpload(data) {
    console.log(`处理运动数据上传: ${data.workoutId}`);

    // 1. 解析和验证数据
    const parsedData = await this.parseWorkoutData(data.rawData);

    // 2. 存储详细数据
    await this.database.workoutDetails.bulkCreate(parsedData);

    // 3. 触发数据分析
    await this.publisher.publishDataAnalysisRequest(data.workoutId, parsedData);
  }

  async calculateMetrics(workoutData) {
    // 计算各种运动指标
    return {
      averageHeartRate: this.calculateAverage(workoutData.heartRate),
      maxHeartRate: Math.max(...workoutData.heartRate),
      pace: this.calculatePace(workoutData.distance, workoutData.duration),
      intensity: this.calculateIntensity(workoutData),
      // ... 更多指标
    };
  }

  async updateUserStats(userId, metrics) {
    // 更新用户的累积统计
    await this.database.userStats.upsert({
      userId,
      totalWorkouts: Sequelize.literal('total_workouts + 1'),
      totalCalories: Sequelize.literal('total_calories + ' + metrics.calories),
      totalDistance: Sequelize.literal('total_distance + ' + metrics.distance),
      lastWorkoutAt: new Date(),
    });
  }

  async checkGoalAchievement(userId, metrics) {
    // 检查用户是否达成目标
    const goals = await this.database.userGoals.findAll({
      where: { userId, status: 'active' },
    });

    for (const goal of goals) {
      if (this.isGoalAchieved(goal, metrics)) {
        await this.publisher.publishGoalAchieved(userId, goal.id);
      }
    }
  }
}

module.exports = WorkoutProcessor;
Code collapsed

5. 成就系统

services/achievement-service/src/consumers/achievement.js

code
const RabbitMQConnection = require('../../shared/utils/rabbitmq');

class AchievementConsumer {
  constructor(config, evaluator) {
    this.rabbitmq = new RabbitMQConnection(config.rabbitmq);
    this.evaluator = evaluator;
    this.exchange = 'fitness.events';
    this.queue = 'achievement.evaluator';
  }

  async initialize() {
    await this.rabbitmq.connect();
    await this.rabbitmq.createExchange(this.exchange, 'topic');
    await this.rabbitmq.createQueue(this.queue, { durable: true });

    // 订阅所有可能影响成就的事件
    const routingKeys = [
      'workout.completed',
      'workout.data_uploaded',
      'user.goal_achieved',
      'social.follow',
      'social.like',
    ];

    for (const routingKey of routingKeys) {
      await this.rabbitmq.bindQueue(this.queue, this.exchange, routingKey);
    }
  }

  async start() {
    await this.rabbitmq.consume(this.queue, async (message) => {
      console.log(`成就系统收到事件: ${message.type}`);

      // 获取用户的所有活跃成就目标
      const userAchievements = await this.evaluator.getActiveAchievements(
        message.data.userId
      );

      // 评估每个成就
      for (const achievement of userAchievements) {
        const result = await this.evaluator.evaluate(
          achievement,
          message
        );

        if (result.unlocked) {
          await this.handleAchievementUnlocked(
            message.data.userId,
            achievement,
            result
          );
        } else if (result.progressUpdated) {
          await this.handleProgressUpdated(
            message.data.userId,
            achievement,
            result.progress
          );
        }
      }
    });
  }

  async handleAchievementUnlocked(userId, achievement, result) {
    console.log(`成就解锁: ${userId} -> ${achievement.name}`);

    // 1. 更新数据库
    await this.database.userAchievements.create({
      userId,
      achievementId: achievement.id,
      unlockedAt: new Date(),
      metadata: result.metadata,
    });

    // 2. 发布成就解锁事件
    await this.publisher.publishAchievementUnlocked(
      userId,
      achievement.id,
      achievement.name
    );

    // 3. 发送通知
    await this.publisher.sendNotification(userId, {
      type: 'achievement',
      title: '成就解锁!',
      message: `恭喜解锁成就: ${achievement.name}`,
      icon: achievement.icon,
    });
  }

  async handleProgressUpdated(userId, achievement, progress) {
    console.log(`成就进度更新: ${userId} -> ${achievement.name}: ${progress}%`);

    // 更新进度
    await this.database.userAchievementProgress.upsert({
      userId,
      achievementId: achievement.id,
      progress,
      updatedAt: new Date(),
    });
  }
}

module.exports = AchievementConsumer;
Code collapsed

services/achievement-service/src/evaluators/fitness.js

code
class FitnessAchievementEvaluator {
  constructor(database) {
    this.database = database;
  }

  async getActiveAchievements(userId) {
    return await this.database.achievements.findAll({
      where: { active: true },
      include: [{
        model: this.database.userAchievementProgress,
        where: { userId },
        required: false,
      }],
    });
  }

  async evaluate(achievement, event) {
    switch (achievement.type) {
      case 'total_workouts':
        return await this.evaluateTotalWorkouts(achievement, event);
      case 'consecutive_days':
        return await this.evaluateConsecutiveDays(achievement, event);
      case 'total_distance':
        return await this.evaluateTotalDistance(achievement, event);
      case 'calorie_burn':
        return await this.evaluateCalorieBurn(achievement, event);
      default:
        return { unlocked: false };
    }
  }

  async evaluateTotalWorkouts(achievement, event) {
    if (event.type !== 'workout.completed') {
      return { unlocked: false };
    }

    const userStats = await this.database.userStats.findOne({
      where: { userId: event.data.userId },
    });

    const currentProgress = userStats.totalWorkouts || 0;
    const targetProgress = achievement.requirement.totalWorkouts;

    const unlocked = currentProgress >= targetProgress;
    const progress = Math.min(100, (currentProgress / targetProgress) * 100);

    return {
      unlocked,
      progressUpdated: true,
      progress,
      metadata: { currentProgress, targetProgress },
    };
  }

  async evaluateConsecutiveDays(achievement, event) {
    if (event.type !== 'workout.completed') {
      return { unlocked: false };
    }

    const userId = event.data.userId;

    // 获取连续运动天数
    const streak = await this.calculateStreak(userId);
    const targetStreak = achievement.requirement.consecutiveDays;

    const unlocked = streak >= targetStreak;
    const progress = Math.min(100, (streak / targetStreak) * 100);

    return {
      unlocked,
      progressUpdated: true,
      progress,
      metadata: { streak, targetStreak },
    };
  }

  async calculateStreak(userId) {
    // 计算用户连续运动天数
    const workouts = await this.database.workouts.findAll({
      where: {
        userId,
        status: 'completed',
      },
      order: [['startTime', 'DESC']],
      limit: 365, // 最多检查一年
    });

    if (workouts.length === 0) return 0;

    let streak = 1;
    let currentDate = new Date(workouts[0].startTime);
    currentDate.setHours(0, 0, 0, 0);

    for (let i = 1; i < workouts.length; i++) {
      const workoutDate = new Date(workouts[i].startTime);
      workoutDate.setHours(0, 0, 0, 0);

      const dayDiff = (currentDate - workoutDate) / (1000 * 60 * 60 * 24);

      if (dayDiff === 1) {
        streak++;
        currentDate = workoutDate;
      } else {
        break;
      }
    }

    return streak;
  }
}

module.exports = FitnessAchievementEvaluator;
Code collapsed

6. Docker部署配置

docker/docker-compose.yml

code
version: '3.8'

services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    ports:
      - "5672:5672"
      - "15672:15672"
    environment:
      - RABBITMQ_DEFAULT_USER=admin
      - RABBITMQ_DEFAULT_PASS=admin123
    volumes:
      - ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json:ro
      - rabbitmq_data:/var/lib/rabbitmq

  api-gateway:
    build: ../services/api-gateway
    ports:
      - "3000:3000"
    environment:
      - RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
      - PORT=3000
    depends_on:
      - rabbitmq

  workout-processor:
    build: ../services/workout-processor
    environment:
      - RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
    depends_on:
      - rabbitmq
    deploy:
      replicas: 2

  achievement-service:
    build: ../services/achievement-service
    environment:
      - RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
    depends_on:
      - rabbitmq

  notification-service:
    build: ../services/notification-service
    environment:
      - RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
    depends_on:
      - rabbitmq

volumes:
  rabbitmq_data:
Code collapsed

docker/rabbitmq/definitions.json

code
{
  "users": [
    {
      "name": "admin",
      "password": "admin123",
      "tags": "administrator"
    }
  ],
  "vhosts": [
    { "name": "/" }
  ],
  "exchanges": [
    {
      "name": "fitness.events",
      "vhost": "/",
      "type": "topic",
      "durable": true
    }
  ],
  "queues": [
    {
      "name": "workout.processor",
      "vhost": "/",
      "durable": true,
      "arguments": {
        "x-dead-letter-exchange": "fitness.events.dlx"
      }
    }
  ]
}
Code collapsed

架构优势

通过事件驱动架构,这个健身系统实现了:

  1. 解耦: 各服务独立开发和部署
  2. 可扩展: 可以单独扩展高负载服务
  3. 容错: 单个服务故障不影响整体
  4. 灵活: 容易添加新的事件消费者
  5. 可追溯: 事件流可以完整追踪业务流程

这种架构特别适合需要处理多种异步事件的健身应用场景。

#

文章标签

Node.js
RabbitMQ
事件驱动
微服务
健身应用
消息队列

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅