使用Node.js和RabbitMQ构建事件驱动健身系统
现代健身应用需要处理多种实时事件——运动数据上传、成就解锁、社交互动等。事件驱动架构(EDA)能够优雅地处理这些异步场景。本文将教你如何使用Node.js和RabbitMQ构建一个可扩展的事件驱动健身系统。
为什么使用事件驱动架构?
| 特性 | 传统架构 | 事件驱动架构 |
|---|---|---|
| 耦合度 | 紧耦合 | 松耦合 |
| 扩展性 | 垂直扩展 | 水平扩展 |
| 响应性 | 同步阻塞 | 异步非阻塞 |
| 可靠性 | 单点故障 | 消息持久化 |
健身应用场景:
- 运动数据上传后触发多个处理流程
- 成就系统需要监听各种事件
- 通知系统需要异步发送
- 数据分析需要离线处理
项目架构
code
fitness-events/
├── services/
│ ├── api-gateway/ # API网关
│ │ ├── src/
│ │ │ ├── routes/
│ │ │ ├── controllers/
│ │ │ └── events/ # 事件发布者
│ │ └── package.json
│ ├── workout-processor/ # 运动数据处理
│ │ ├── src/
│ │ │ ├── consumers/ # 消息消费者
│ │ │ ├── processors/ # 业务处理
│ │ │ └── publishers/ # 结果发布
│ │ └── package.json
│ ├── achievement-service/ # 成就系统
│ │ ├── src/
│ │ │ ├── consumers/
│ │ │ ├── evaluators/ # 成就评估
│ │ │ └── notifiers/
│ │ └── package.json
│ └── notification-service/ # 通知服务
│ ├── src/
│ │ ├── consumers/
│ │ ├── senders/ # 消息发送
│ │ └── templates/ # 消息模板
│ └── package.json
├── shared/
│ ├── events/ # 事件定义
│ ├── utils/ # 共享工具
│ └── config/ # 共享配置
├── docker/
│ ├── docker-compose.yml
│ └── rabbitmq/
│ └── definitions.json # RabbitMQ配置
└── README.md
Code collapsed
1. 事件定义
shared/events/types.js
code
// 事件类型定义
module.exports = {
// 运动相关事件
WORKOUT: {
STARTED: 'workout.started',
COMPLETED: 'workout.completed',
DATA_UPLOADED: 'workout.data_uploaded',
UPDATED: 'workout.updated',
DELETED: 'workout.deleted',
},
// 用户相关事件
USER: {
REGISTERED: 'user.registered',
PROFILE_UPDATED: 'user.profile_updated',
GOAL_SET: 'user.goal_set',
GOAL_ACHIEVED: 'user.goal_achieved',
},
// 成就相关事件
ACHIEVEMENT: {
UNLOCKED: 'achievement.unlocked',
PROGRESS_UPDATED: 'achievement.progress_updated',
MILESTONE_REACHED: 'achievement.milestone_reached',
},
// 社交相关事件
SOCIAL: {
FOLLOW: 'social.follow',
UNFOLLOW: 'social.unfollow',
COMMENT: 'social.comment',
LIKE: 'social.like',
},
// 通知相关事件
NOTIFICATION: {
SEND: 'notification.send',
SENT: 'notification.sent',
FAILED: 'notification.failed',
},
// 系统事件
SYSTEM: {
ERROR: 'system.error',
WARNING: 'system.warning',
METRICS: 'system.metrics',
},
};
// 事件基类
class BaseEvent {
constructor(type, data, metadata = {}) {
this.type = type;
this.data = data;
this.metadata = {
id: generateEventId(),
timestamp: new Date().toISOString(),
version: '1.0',
correlationId: metadata.correlationId || generateCorrelationId(),
...metadata,
};
}
toJSON() {
return {
type: this.type,
data: this.data,
metadata: this.metadata,
};
}
}
// 具体事件类
class WorkoutStartedEvent extends BaseEvent {
constructor(workoutId, userId, workoutType, metadata = {}) {
super(
EVENTS.WORKOUT.STARTED,
{ workoutId, userId, workoutType },
metadata
);
}
}
class WorkoutCompletedEvent extends BaseEvent {
constructor(workoutId, userId, stats, metadata = {}) {
super(
EVENTS.WORKOUT.COMPLETED,
{
workoutId,
userId,
duration: stats.duration,
calories: stats.calories,
distance: stats.distance,
heartRate: stats.heartRate,
},
metadata
);
}
}
class AchievementUnlockedEvent extends BaseEvent {
constructor(userId, achievementId, achievementName, metadata = {}) {
super(
EVENTS.ACHIEVEMENT.UNLOCKED,
{ userId, achievementId, achievementName },
metadata
);
}
}
function generateEventId() {
return `evt_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
function generateCorrelationId() {
return `corr_${Date.now()}_${Math.random().toString(36).substr(2, 9)}`;
}
module.exports = {
BaseEvent,
WorkoutStartedEvent,
WorkoutCompletedEvent,
AchievementUnlockedEvent,
};
Code collapsed
2. RabbitMQ连接管理
shared/utils/rabbitmq.js
code
const amqp = require('amqplib');
class RabbitMQConnection {
constructor(config) {
this.config = config;
this.connection = null;
this.channel = null;
this.connecting = false;
this.reconnectDelay = 5000;
}
async connect() {
if (this.connecting || this.connection) {
return;
}
this.connecting = true;
try {
this.connection = await amqp.connect(this.config.url);
this.channel = await this.connection.createChannel();
// 设置错误处理
this.connection.on('error', (err) => {
console.error('RabbitMQ连接错误:', err);
});
this.connection.on('close', () => {
console.warn('RabbitMQ连接已关闭,尝试重连...');
setTimeout(() => this.connect(), this.reconnectDelay);
});
// 设置QoS
await this.channel.prefetch(this.config.prefetch || 10);
console.log('RabbitMQ连接成功');
this.connecting = false;
} catch (error) {
console.error('RabbitMQ连接失败:', error);
this.connecting = false;
throw error;
}
}
async createExchange(name, type = 'topic', options = {}) {
if (!this.channel) {
throw new Error('RabbitMQ未连接');
}
await this.channel.assertExchange(name, type, {
durable: options.durable !== false,
...options,
});
console.log(`Exchange创建成功: ${name}`);
}
async createQueue(name, options = {}) {
if (!this.channel) {
throw new Error('RabbitMQ未连接');
}
const queue = await this.channel.assertQueue(name, {
durable: options.durable !== false,
...options,
});
console.log(`Queue创建成功: ${name}`);
return queue;
}
async bindQueue(queue, exchange, routingKey) {
if (!this.channel) {
throw new Error('RabbitMQ未连接');
}
await this.channel.bindQueue(queue, exchange, routingKey);
console.log(`绑定成功: ${queue} -> ${exchange} (${routingKey})`);
}
async publish(exchange, routingKey, message, options = {}) {
if (!this.channel) {
throw new Error('RabbitMQ未连接');
}
const content = Buffer.from(JSON.stringify(message));
const result = this.channel.publish(exchange, routingKey, content, {
persistent: true,
contentType: 'application/json',
...options,
});
if (!result) {
console.warn('消息发送失败,缓冲区已满');
}
return result;
}
async consume(queue, handler, options = {}) {
if (!this.channel) {
throw new Error('RabbitMQ未连接');
}
await this.channel.consume(
queue,
async (msg) => {
if (!msg) {
return;
}
try {
const content = JSON.parse(msg.content.toString());
await handler(content, msg);
// 确认消息
this.channel.ack(msg);
} catch (error) {
console.error('消息处理失败:', error);
// 拒绝消息并重新入队
if (options.requeue !== false) {
this.channel.nack(msg, false, true);
} else {
this.channel.ack(msg);
}
}
},
{
noAck: false,
...options,
}
);
}
async close() {
try {
if (this.channel) {
await this.channel.close();
}
if (this.connection) {
await this.connection.close();
}
console.log('RabbitMQ连接已关闭');
} catch (error) {
console.error('关闭RabbitMQ连接失败:', error);
}
}
}
module.exports = RabbitMQConnection;
Code collapsed
3. 事件发布者
services/api-gateway/src/events/publisher.js
code
const RabbitMQConnection = require('../../shared/utils/rabbitmq');
const { WorkoutStartedEvent, WorkoutCompletedEvent } = require('../../shared/events/types');
class EventPublisher {
constructor(config) {
this.rabbitmq = new RabbitMQConnection(config.rabbitmq);
this.exchange = 'fitness.events';
}
async initialize() {
await this.rabbitmq.connect();
await this.rabbitmq.createExchange(this.exchange, 'topic', { durable: true });
}
async publishWorkoutStarted(workoutId, userId, workoutType) {
const event = new WorkoutStartedEvent(workoutId, userId, workoutType);
return await this.rabbitmq.publish(
this.exchange,
'workout.started',
event.toJSON()
);
}
async publishWorkoutCompleted(workoutId, userId, stats) {
const event = new WorkoutCompletedEvent(workoutId, userId, stats);
return await this.rabbitmq.publish(
this.exchange,
'workout.completed',
event.toJSON()
);
}
async publishAchievementUnlocked(userId, achievementId, achievementName) {
const event = new AchievementUnlockedEvent(userId, achievementId, achievementName);
return await this.rabbitmq.publish(
this.exchange,
'achievement.unlocked',
event.toJSON()
);
}
}
module.exports = EventPublisher;
Code collapsed
4. 运动数据处理服务
services/workout-processor/src/consumers/workout.js
code
const RabbitMQConnection = require('../../shared/utils/rabbitmq');
class WorkoutConsumer {
constructor(config, processor) {
this.rabbitmq = new RabbitMQConnection(config.rabbitmq);
this.processor = processor;
this.exchange = 'fitness.events';
this.queue = 'workout.processor';
}
async initialize() {
await this.rabbitmq.connect();
// 创建exchange
await this.rabbitmq.createExchange(this.exchange, 'topic');
// 创建queue
await this.rabbitmq.createQueue(this.queue, {
durable: true,
deadLetterExchange: `${this.exchange}.dlx`,
});
// 绑定routing key
await this.rabbitmq.bindQueue(this.queue, this.exchange, 'workout.#');
await this.rabbitmq.bindQueue(this.queue, this.exchange, 'user.goal_achieved');
}
async start() {
await this.rabbitmq.consume(this.queue, async (message) => {
console.log(`收到消息: ${message.type}`);
switch (message.type) {
case 'workout.started':
await this.handleWorkoutStarted(message);
break;
case 'workout.completed':
await this.handleWorkoutCompleted(message);
break;
case 'workout.data_uploaded':
await this.handleDataUploaded(message);
break;
default:
console.warn(`未知事件类型: ${message.type}`);
}
});
}
async handleWorkoutStarted(event) {
await this.processor.processWorkoutStarted(event.data);
}
async handleWorkoutCompleted(event) {
await this.processor.processWorkoutCompleted(event.data);
}
async handleDataUploaded(event) {
await this.processor.processDataUpload(event.data);
}
}
module.exports = WorkoutConsumer;
Code collapsed
services/workout-processor/src/processors/workout.js
code
const WorkoutPublisher = require('../publishers/workout');
class WorkoutProcessor {
constructor(database, publisher) {
this.database = database;
this.publisher = publisher;
}
async processWorkoutStarted(data) {
console.log(`处理运动开始事件: ${data.workoutId}`);
// 1. 保存运动记录
await this.database.workouts.create({
id: data.workoutId,
userId: data.userId,
type: data.workoutType,
status: 'started',
startTime: new Date(),
});
// 2. 初始化实时统计
await this.database.workoutStats.create({
workoutId: data.workoutId,
userId: data.userId,
metrics: {},
});
// 3. 发布用户通知
await this.publisher.publishUserNotification(data.userId, {
type: 'workout_started',
message: '开始新的运动!',
});
}
async processWorkoutCompleted(data) {
console.log(`处理运动完成事件: ${data.workoutId}`);
// 1. 更新运动记录
await this.database.workouts.update(data.workoutId, {
status: 'completed',
endTime: new Date(),
duration: data.duration,
calories: data.calories,
distance: data.distance,
});
// 2. 计算运动指标
const metrics = await this.calculateMetrics(data);
// 3. 保存详细数据
await this.database.workoutStats.update(data.workoutId, {
metrics: metrics,
heartRate: data.heartRate,
});
// 4. 更新用户统计
await this.updateUserStats(data.userId, metrics);
// 5. 检查目标完成情况
await this.checkGoalAchievement(data.userId, metrics);
// 6. 发布运动完成事件
await this.publisher.publishWorkoutProcessed(data.workoutId, metrics);
}
async processDataUpload(data) {
console.log(`处理运动数据上传: ${data.workoutId}`);
// 1. 解析和验证数据
const parsedData = await this.parseWorkoutData(data.rawData);
// 2. 存储详细数据
await this.database.workoutDetails.bulkCreate(parsedData);
// 3. 触发数据分析
await this.publisher.publishDataAnalysisRequest(data.workoutId, parsedData);
}
async calculateMetrics(workoutData) {
// 计算各种运动指标
return {
averageHeartRate: this.calculateAverage(workoutData.heartRate),
maxHeartRate: Math.max(...workoutData.heartRate),
pace: this.calculatePace(workoutData.distance, workoutData.duration),
intensity: this.calculateIntensity(workoutData),
// ... 更多指标
};
}
async updateUserStats(userId, metrics) {
// 更新用户的累积统计
await this.database.userStats.upsert({
userId,
totalWorkouts: Sequelize.literal('total_workouts + 1'),
totalCalories: Sequelize.literal('total_calories + ' + metrics.calories),
totalDistance: Sequelize.literal('total_distance + ' + metrics.distance),
lastWorkoutAt: new Date(),
});
}
async checkGoalAchievement(userId, metrics) {
// 检查用户是否达成目标
const goals = await this.database.userGoals.findAll({
where: { userId, status: 'active' },
});
for (const goal of goals) {
if (this.isGoalAchieved(goal, metrics)) {
await this.publisher.publishGoalAchieved(userId, goal.id);
}
}
}
}
module.exports = WorkoutProcessor;
Code collapsed
5. 成就系统
services/achievement-service/src/consumers/achievement.js
code
const RabbitMQConnection = require('../../shared/utils/rabbitmq');
class AchievementConsumer {
constructor(config, evaluator) {
this.rabbitmq = new RabbitMQConnection(config.rabbitmq);
this.evaluator = evaluator;
this.exchange = 'fitness.events';
this.queue = 'achievement.evaluator';
}
async initialize() {
await this.rabbitmq.connect();
await this.rabbitmq.createExchange(this.exchange, 'topic');
await this.rabbitmq.createQueue(this.queue, { durable: true });
// 订阅所有可能影响成就的事件
const routingKeys = [
'workout.completed',
'workout.data_uploaded',
'user.goal_achieved',
'social.follow',
'social.like',
];
for (const routingKey of routingKeys) {
await this.rabbitmq.bindQueue(this.queue, this.exchange, routingKey);
}
}
async start() {
await this.rabbitmq.consume(this.queue, async (message) => {
console.log(`成就系统收到事件: ${message.type}`);
// 获取用户的所有活跃成就目标
const userAchievements = await this.evaluator.getActiveAchievements(
message.data.userId
);
// 评估每个成就
for (const achievement of userAchievements) {
const result = await this.evaluator.evaluate(
achievement,
message
);
if (result.unlocked) {
await this.handleAchievementUnlocked(
message.data.userId,
achievement,
result
);
} else if (result.progressUpdated) {
await this.handleProgressUpdated(
message.data.userId,
achievement,
result.progress
);
}
}
});
}
async handleAchievementUnlocked(userId, achievement, result) {
console.log(`成就解锁: ${userId} -> ${achievement.name}`);
// 1. 更新数据库
await this.database.userAchievements.create({
userId,
achievementId: achievement.id,
unlockedAt: new Date(),
metadata: result.metadata,
});
// 2. 发布成就解锁事件
await this.publisher.publishAchievementUnlocked(
userId,
achievement.id,
achievement.name
);
// 3. 发送通知
await this.publisher.sendNotification(userId, {
type: 'achievement',
title: '成就解锁!',
message: `恭喜解锁成就: ${achievement.name}`,
icon: achievement.icon,
});
}
async handleProgressUpdated(userId, achievement, progress) {
console.log(`成就进度更新: ${userId} -> ${achievement.name}: ${progress}%`);
// 更新进度
await this.database.userAchievementProgress.upsert({
userId,
achievementId: achievement.id,
progress,
updatedAt: new Date(),
});
}
}
module.exports = AchievementConsumer;
Code collapsed
services/achievement-service/src/evaluators/fitness.js
code
class FitnessAchievementEvaluator {
constructor(database) {
this.database = database;
}
async getActiveAchievements(userId) {
return await this.database.achievements.findAll({
where: { active: true },
include: [{
model: this.database.userAchievementProgress,
where: { userId },
required: false,
}],
});
}
async evaluate(achievement, event) {
switch (achievement.type) {
case 'total_workouts':
return await this.evaluateTotalWorkouts(achievement, event);
case 'consecutive_days':
return await this.evaluateConsecutiveDays(achievement, event);
case 'total_distance':
return await this.evaluateTotalDistance(achievement, event);
case 'calorie_burn':
return await this.evaluateCalorieBurn(achievement, event);
default:
return { unlocked: false };
}
}
async evaluateTotalWorkouts(achievement, event) {
if (event.type !== 'workout.completed') {
return { unlocked: false };
}
const userStats = await this.database.userStats.findOne({
where: { userId: event.data.userId },
});
const currentProgress = userStats.totalWorkouts || 0;
const targetProgress = achievement.requirement.totalWorkouts;
const unlocked = currentProgress >= targetProgress;
const progress = Math.min(100, (currentProgress / targetProgress) * 100);
return {
unlocked,
progressUpdated: true,
progress,
metadata: { currentProgress, targetProgress },
};
}
async evaluateConsecutiveDays(achievement, event) {
if (event.type !== 'workout.completed') {
return { unlocked: false };
}
const userId = event.data.userId;
// 获取连续运动天数
const streak = await this.calculateStreak(userId);
const targetStreak = achievement.requirement.consecutiveDays;
const unlocked = streak >= targetStreak;
const progress = Math.min(100, (streak / targetStreak) * 100);
return {
unlocked,
progressUpdated: true,
progress,
metadata: { streak, targetStreak },
};
}
async calculateStreak(userId) {
// 计算用户连续运动天数
const workouts = await this.database.workouts.findAll({
where: {
userId,
status: 'completed',
},
order: [['startTime', 'DESC']],
limit: 365, // 最多检查一年
});
if (workouts.length === 0) return 0;
let streak = 1;
let currentDate = new Date(workouts[0].startTime);
currentDate.setHours(0, 0, 0, 0);
for (let i = 1; i < workouts.length; i++) {
const workoutDate = new Date(workouts[i].startTime);
workoutDate.setHours(0, 0, 0, 0);
const dayDiff = (currentDate - workoutDate) / (1000 * 60 * 60 * 24);
if (dayDiff === 1) {
streak++;
currentDate = workoutDate;
} else {
break;
}
}
return streak;
}
}
module.exports = FitnessAchievementEvaluator;
Code collapsed
6. Docker部署配置
docker/docker-compose.yml
code
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management-alpine
ports:
- "5672:5672"
- "15672:15672"
environment:
- RABBITMQ_DEFAULT_USER=admin
- RABBITMQ_DEFAULT_PASS=admin123
volumes:
- ./rabbitmq/definitions.json:/etc/rabbitmq/definitions.json:ro
- rabbitmq_data:/var/lib/rabbitmq
api-gateway:
build: ../services/api-gateway
ports:
- "3000:3000"
environment:
- RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
- PORT=3000
depends_on:
- rabbitmq
workout-processor:
build: ../services/workout-processor
environment:
- RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
depends_on:
- rabbitmq
deploy:
replicas: 2
achievement-service:
build: ../services/achievement-service
environment:
- RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
depends_on:
- rabbitmq
notification-service:
build: ../services/notification-service
environment:
- RABBITMQ_URL=amqp://admin:admin123@rabbitmq:5672
depends_on:
- rabbitmq
volumes:
rabbitmq_data:
Code collapsed
docker/rabbitmq/definitions.json
code
{
"users": [
{
"name": "admin",
"password": "admin123",
"tags": "administrator"
}
],
"vhosts": [
{ "name": "/" }
],
"exchanges": [
{
"name": "fitness.events",
"vhost": "/",
"type": "topic",
"durable": true
}
],
"queues": [
{
"name": "workout.processor",
"vhost": "/",
"durable": true,
"arguments": {
"x-dead-letter-exchange": "fitness.events.dlx"
}
}
]
}
Code collapsed
架构优势
通过事件驱动架构,这个健身系统实现了:
- 解耦: 各服务独立开发和部署
- 可扩展: 可以单独扩展高负载服务
- 容错: 单个服务故障不影响整体
- 灵活: 容易添加新的事件消费者
- 可追溯: 事件流可以完整追踪业务流程
这种架构特别适合需要处理多种异步事件的健身应用场景。