使用Node.js构建实时排行榜 (PostgreSQL + Redis)
在健身应用、游戏或竞赛平台中,实时排行榜是提高用户参与度的关键功能。本文将教你如何使用Node.js、PostgreSQL和Redis构建一个高性能的实时排行榜系统。
为什么使用PostgreSQL + Redis?
| 方面 | PostgreSQL | Redis |
|---|---|---|
| 用途 | 持久化存储、复杂查询 | 高速缓存、排名计算 |
| 优势 | ACID保证、关系数据 | 内存操作、Sorted Set |
| 角色 | 数据主存储、历史记录 | 实时排名、临时数据 |
组合优势:
- PostgreSQL保证数据持久性和一致性
- Redis提供毫秒级排名查询
- 两者结合实现性能与可靠性的平衡
项目架构
code
leaderboard-system/
├── src/
│ ├── config/
│ │ ├── database.js # 数据库配置
│ │ └── redis.js # Redis配置
│ ├── models/
│ │ ├── User.js # 用户模型
│ │ └── Score.js # 分数模型
│ ├── services/
│ │ ├── leaderboard.service.js # 排行榜服务
│ │ └── cache.service.js # 缓存服务
│ ├── routes/
│ │ └── leaderboard.js # API路由
│ ├── websocket/
│ │ └── leaderboard.handler.js # WebSocket处理
│ └── app.js # 应用入口
├── tests/
├── migrations/
├── .env.example
├── package.json
└── README.md
Code collapsed
1. 项目初始化
package.json
code
{
"name": "realtime-leaderboard",
"version": "1.0.0",
"description": "实时排行榜系统",
"main": "src/app.js",
"scripts": {
"start": "node src/app.js",
"dev": "nodemon src/app.js",
"test": "jest",
"migrate": "node migrations/run.js"
},
"dependencies": {
"express": "^4.18.2",
"pg": "^8.11.3",
"redis": "^4.6.10",
"ws": "^8.14.2",
"dotenv": "^16.3.1",
"joi": "^17.11.0",
"winston": "^3.11.0"
},
"devDependencies": {
"nodemon": "^3.0.1",
"jest": "^29.7.0",
"supertest": "^6.3.3"
}
}
Code collapsed
.env.example
code
# 应用配置
PORT=3000
NODE_ENV=development
# PostgreSQL配置
DB_HOST=localhost
DB_PORT=5432
DB_NAME=leaderboard
DB_USER=postgres
DB_PASSWORD=your_password
# Redis配置
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0
# WebSocket配置
WS_PORT=3001
Code collapsed
2. 数据库设计
migrations/001_create_tables.sql
code
-- 用户表
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
username VARCHAR(50) UNIQUE NOT NULL,
display_name VARCHAR(100),
avatar_url VARCHAR(255),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
-- 分数历史表
CREATE TABLE IF NOT EXISTS scores (
id SERIAL PRIMARY KEY,
user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
score INTEGER NOT NULL,
category VARCHAR(50) DEFAULT 'default',
achieved_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
metadata JSONB,
INDEX idx_user_scores (user_id, achieved_at DESC),
INDEX idx_category_scores (category, score DESC)
);
-- 排行榜快照表(每日备份)
CREATE TABLE IF NOT EXISTS leaderboard_snapshots (
id SERIAL PRIMARY KEY,
category VARCHAR(50) NOT NULL,
snapshot_date DATE NOT NULL,
rankings JSONB NOT NULL,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(category, snapshot_date)
);
-- 创建更新时间触发器
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
NEW.updated_at = CURRENT_TIMESTAMP;
RETURN NEW;
END;
$$ language 'plpgsql';
CREATE TRIGGER update_users_updated_at
BEFORE UPDATE ON users
FOR EACH ROW
EXECUTE FUNCTION update_updated_at_column();
Code collapsed
3. 数据库连接层
src/config/database.js
code
const { Pool } = require('pg');
const logger = require('../utils/logger');
class Database {
constructor() {
this.pool = new Pool({
host: process.env.DB_HOST,
port: process.env.DB_PORT,
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
max: 20,
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
});
this.pool.on('error', (err) => {
logger.error('Unexpected database error', err);
process.exit(-1);
});
}
async query(text, params) {
const start = Date.now();
try {
const result = await this.pool.query(text, params);
const duration = Date.now() - start;
logger.debug('Executed query', { text, duration, rows: result.rowCount });
return result;
} catch (error) {
logger.error('Database query error', { text, error: error.message });
throw error;
}
}
async transaction(callback) {
const client = await this.pool.connect();
try {
await client.query('BEGIN');
const result = await callback(client);
await client.query('COMMIT');
return result;
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async close() {
await this.pool.end();
logger.info('Database connection closed');
}
}
module.exports = new Database();
Code collapsed
src/config/redis.js
code
const redis = require('redis');
const logger = require('../utils/logger');
class RedisClient {
constructor() {
this.client = null;
this.subscriber = null;
}
async connect() {
this.client = redis.createClient({
socket: {
host: process.env.REDIS_HOST,
port: process.env.REDIS_PORT,
},
password: process.env.REDIS_PASSWORD,
database: process.env.REDIS_DB || 0,
});
this.subscriber = this.client.duplicate();
await Promise.all([
this.client.connect(),
this.subscriber.connect()
]);
this.client.on('error', (err) => {
logger.error('Redis client error', err);
});
logger.info('Redis connected successfully');
}
async disconnect() {
await Promise.all([
this.client.quit(),
this.subscriber.quit()
]);
logger.info('Redis disconnected');
}
// 排行榜操作
async addToLeaderboard(key, score, member) {
return await this.client.zAdd(key, {
score,
value: member
});
}
async getRank(key, member) {
// zRevRank返回从0开始的排名(降序)
const rank = await this.client.zRevRank(key, member);
return rank !== null ? rank + 1 : null;
}
async getScore(key, member) {
return await this.client.zScore(key, member);
}
async getTopMembers(key, start = 0, stop = 9) {
return await this.client.zRevRangeWithScores(
key,
start,
stop
);
}
async getMemberRange(key, member, withScores = false) {
const rank = await this.getRank(key, member);
if (rank === null) return null;
const start = Math.max(0, rank - 2);
const stop = rank + 2;
return await this.client.zRevRangeWithScores(key, start, stop);
}
async getMemberCount(key) {
return await this.client.zCard(key);
}
async removeMember(key, member) {
return await this.client.zRem(key, member);
}
// 缓存操作
async set(key, value, ttl = 3600) {
const serialized = JSON.stringify(value);
if (ttl) {
await this.client.setEx(key, ttl, serialized);
} else {
await this.client.set(key, serialized);
}
}
async get(key) {
const data = await this.client.get(key);
return data ? JSON.parse(data) : null;
}
async del(key) {
return await this.client.del(key);
}
// 发布/订阅
async publish(channel, message) {
return await this.client.publish(channel, JSON.stringify(message));
}
async subscribe(channel, callback) {
await this.subscriber.subscribe(channel, (message) => {
try {
const data = JSON.parse(message);
callback(data);
} catch (error) {
logger.error('Failed to parse subscribed message', error);
}
});
}
}
module.exports = new RedisClient();
Code collapsed
4. 排行榜服务
src/services/leaderboard.service.js
code
const db = require('../config/database');
const redis = require('../config/redis');
const logger = require('../utils/logger');
class LeaderboardService {
constructor() {
this.CACHE_PREFIX = 'leaderboard';
this.UPDATE_CHANNEL = 'leaderboard:updates';
}
/**
* 提交分数
*/
async submitScore(userId, score, category = 'default', metadata = {}) {
return await db.transaction(async (client) => {
// 1. 获取用户信息
const userResult = await client.query(
'SELECT id, username, display_name, avatar_url FROM users WHERE id = $1',
[userId]
);
if (userResult.rows.length === 0) {
throw new Error('User not found');
}
const user = userResult.rows[0];
// 2. 保存分数到PostgreSQL
const scoreResult = await client.query(
`INSERT INTO scores (user_id, score, category, metadata)
VALUES ($1, $2, $3, $4)
RETURNING id, achieved_at`,
[userId, score, category, metadata]
);
// 3. 更新Redis排行榜
const leaderboardKey = this.getLeaderboardKey(category);
await redis.addToLeaderboard(
leaderboardKey,
score,
userId.toString()
);
// 4. 获取新排名
const newRank = await redis.getRank(leaderboardKey, userId.toString());
// 5. 获取周围排名
const neighbors = await this._getNeighborRanks(
leaderboardKey,
userId.toString(),
user
);
// 6. 发布更新事件
await redis.publish(this.UPDATE_CHANNEL, {
type: 'score_updated',
category,
userId,
score,
rank: newRank,
neighbors
});
// 7. 清除缓存
await this._clearCache(leaderboardKey);
return {
success: true,
scoreId: scoreResult.rows[0].id,
achievedAt: scoreResult.rows[0].achieved_at,
rank: newRank,
neighbors
};
});
}
/**
* 获取排行榜TOP N
*/
async getTopScores(category = 'default', limit = 10) {
const cacheKey = `${this.CACHE_PREFIX}:${category}:top:${limit}`;
// 尝试从缓存获取
const cached = await redis.get(cacheKey);
if (cached) {
return cached;
}
const leaderboardKey = this.getLeaderboardKey(category);
const members = await redis.getTopMembers(leaderboardKey, 0, limit - 1);
if (members.length === 0) {
return { rankings: [], total: 0 };
}
// 批量获取用户信息
const userIds = members.map(m => parseInt(m.value));
const users = await this._getUsersByIds(userIds);
// 组装排名数据
const rankings = members.map((member, index) => {
const user = users.get(parseInt(member.value));
return {
rank: index + 1,
score: member.score,
user: user ? {
id: user.id,
username: user.username,
displayName: user.display_name,
avatarUrl: user.avatar_url
} : null
};
});
const result = {
rankings,
total: await redis.getMemberCount(leaderboardKey)
};
// 缓存结果(30秒)
await redis.set(cacheKey, result, 30);
return result;
}
/**
* 获取用户的排名和周围玩家
*/
async getUserRanking(userId, category = 'default') {
const leaderboardKey = this.getLeaderboardKey(category);
// 获取用户分数和排名
const [score, rank] = await Promise.all([
redis.getScore(leaderboardKey, userId.toString()),
redis.getRank(leaderboardKey, userId.toString())
]);
if (score === null || rank === null) {
return null;
}
// 获取用户信息
const userResult = await db.query(
'SELECT id, username, display_name, avatar_url FROM users WHERE id = $1',
[userId]
);
if (userResult.rows.length === 0) {
return null;
}
const user = userResult.rows[0];
// 获取周围排名
const neighbors = await this._getNeighborRanks(
leaderboardKey,
userId.toString(),
user
);
return {
rank,
score,
user,
neighbors
};
}
/**
* 获取分数历史
*/
async getUserScoreHistory(userId, category = 'default', limit = 10) {
const result = await db.query(
`SELECT score, achieved_at, metadata
FROM scores
WHERE user_id = $1 AND category = $2
ORDER BY achieved_at DESC
LIMIT $3`,
[userId, category, limit]
);
return result.rows;
}
/**
* 创建每日快照
*/
async createDailySnapshot(category = 'default') {
const today = new Date().toISOString().split('T')[0];
const leaderboardKey = this.getLeaderboardKey(category);
const members = await redis.getTopMembers(leaderboardKey, 0, -1);
const userIds = members.map(m => parseInt(m.value));
const users = await this._getUsersByIds(userIds);
const rankings = members.map((member, index) => {
const user = users.get(parseInt(member.value));
return {
rank: index + 1,
userId: parseInt(member.value),
username: user?.username,
score: member.score
};
});
await db.query(
`INSERT INTO leaderboard_snapshots (category, snapshot_date, rankings)
VALUES ($1, $2, $3)
ON CONFLICT (category, snapshot_date)
DO UPDATE SET rankings = $3`,
[category, today, JSON.stringify(rankings)]
);
logger.info(`Created daily snapshot for category: ${category}`);
}
/**
* 获取辅助方法
*/
getLeaderboardKey(category) {
return `leaderboard:${category}`;
}
async _getNeighborRanks(leaderboardKey, userIdStr, user) {
const range = await redis.getMemberRange(leaderboardKey, userIdStr);
if (!range) return [];
const userIds = range.map(m => parseInt(m.value));
const users = await this._getUsersByIds(userIds);
return range.map((member) => {
const memberUser = users.get(parseInt(member.value));
return {
userId: parseInt(member.value),
username: memberUser?.username,
displayName: memberUser?.display_name,
score: member.score
};
});
}
async _getUsersByIds(userIds) {
if (userIds.length === 0) return new Map();
const result = await db.query(
`SELECT id, username, display_name, avatar_url
FROM users
WHERE id = ANY($1)`,
[userIds]
);
return new Map(result.rows.map(row => [row.id, row]));
}
async _clearCache(leaderboardKey) {
// 清除相关缓存
const pattern = `${this.CACHE_PREFIX}:${leaderboardKey.split(':')[1]}:*`;
// 实际项目中可能需要SCAN命令来查找匹配的键
await redis.del(`${leaderboardKey}:top:10`);
await redis.del(`${leaderboardKey}:top:50`);
}
}
module.exports = new LeaderboardService();
Code collapsed
5. API路由
src/routes/leaderboard.js
code
const express = require('express');
const Joi = require('joi');
const leaderboardService = require('../services/leaderboard.service');
const logger = require('../utils/logger');
const router = express.Router();
// 验证中间件
const validate = (schema) => (req, res, next) => {
const { error } = schema.validate(req.body);
if (error) {
return res.status(400).json({
success: false,
error: error.details[0].message
});
}
next();
};
// 提交分数
router.post('/scores', validate({
userId: Joi.number().integer().positive().required(),
score: Joi.number().integer().required(),
category: Joi.string().default('default'),
metadata: Joi.object().default({})
}), async (req, res) => {
try {
const { userId, score, category, metadata } = req.body;
const result = await leaderboardService.submitScore(
userId,
score,
category,
metadata
);
res.json({
success: true,
data: result
});
} catch (error) {
logger.error('Error submitting score', error);
res.status(500).json({
success: false,
error: error.message
});
}
});
// 获取排行榜
router.get('/leaderboard', async (req, res) => {
try {
const category = req.query.category || 'default';
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const result = await leaderboardService.getTopScores(category, limit);
res.json({
success: true,
data: result
});
} catch (error) {
logger.error('Error fetching leaderboard', error);
res.status(500).json({
success: false,
error: error.message
});
}
});
// 获取用户排名
router.get('/users/:userId/ranking', async (req, res) => {
try {
const userId = parseInt(req.params.userId);
const category = req.query.category || 'default';
const result = await leaderboardService.getUserRanking(userId, category);
if (!result) {
return res.status(404).json({
success: false,
error: 'User ranking not found'
});
}
res.json({
success: true,
data: result
});
} catch (error) {
logger.error('Error fetching user ranking', error);
res.status(500).json({
success: false,
error: error.message
});
}
});
// 获取用户历史
router.get('/users/:userId/history', async (req, res) => {
try {
const userId = parseInt(req.params.userId);
const category = req.query.category || 'default';
const limit = Math.min(parseInt(req.query.limit) || 10, 100);
const history = await leaderboardService.getUserScoreHistory(
userId,
category,
limit
);
res.json({
success: true,
data: history
});
} catch (error) {
logger.error('Error fetching score history', error);
res.status(500).json({
success: false,
error: error.message
});
}
});
module.exports = router;
Code collapsed
6. WebSocket实时更新
src/websocket/leaderboard.handler.js
code
const WebSocket = require('ws');
const redis = require('../config/redis');
const leaderboardService = require('../services/leaderboard.service');
const logger = require('../utils/logger');
class LeaderboardWebSocket {
constructor(port = 3001) {
this.wss = new WebSocket.Server({ port });
this.clients = new Set();
this.wss.on('connection', (ws) => {
this.handleConnection(ws);
});
// 订阅Redis更新事件
this.subscribeToUpdates();
logger.info(`WebSocket server started on port ${port}`);
}
handleConnection(ws) {
this.clients.add(ws);
logger.info('Client connected', { clientCount: this.clients.size });
ws.on('message', async (message) => {
try {
const data = JSON.parse(message);
await this.handleMessage(ws, data);
} catch (error) {
logger.error('Error handling WebSocket message', error);
}
});
ws.on('close', () => {
this.clients.delete(ws);
logger.info('Client disconnected', { clientCount: this.clients.size });
});
// 发送欢迎消息
ws.send(JSON.stringify({
type: 'connected',
message: 'Connected to leaderboard updates'
}));
}
async handleMessage(ws, data) {
switch (data.type) {
case 'subscribe':
// 订阅特定分类的更新
ws.category = data.category || 'default';
ws.send(JSON.stringify({
type: 'subscribed',
category: ws.category
}));
break;
case 'get_ranking':
if (data.userId) {
const ranking = await leaderboardService.getUserRanking(
data.userId,
data.category || 'default'
);
ws.send(JSON.stringify({
type: 'ranking',
data: ranking
}));
}
break;
}
}
async subscribeToUpdates() {
await redis.subscribe(
leaderboardService.UPDATE_CHANNEL,
(message) => {
this.broadcast(message);
}
);
}
broadcast(message) {
const data = JSON.stringify(message);
this.clients.forEach((client) => {
// 发送给订阅了该分类的客户端
if (!client.category || client.category === message.category) {
if (client.readyState === WebSocket.OPEN) {
client.send(data);
}
}
});
}
}
module.exports = LeaderboardWebSocket;
Code collapsed
7. 应用入口
src/app.js
code
require('dotenv').config();
const express = require('express');
const db = require('./config/database');
const redis = require('./config/redis');
const leaderboardRoutes = require('./routes/leaderboard');
const LeaderboardWebSocket = require('./websocket/leaderboard.handler');
const logger = require('./utils/logger');
const app = express();
const PORT = process.env.PORT || 3000;
// 中间件
app.use(express.json());
app.use(express.urlencoded({ extended: true }));
// 路由
app.use('/api', leaderboardRoutes);
// 健康检查
app.get('/health', async (req, res) => {
try {
await db.query('SELECT 1');
await redis.client.ping();
res.json({
status: 'healthy',
database: 'connected',
redis: 'connected',
timestamp: new Date().toISOString()
});
} catch (error) {
res.status(503).json({
status: 'unhealthy',
error: error.message
});
}
});
// 启动服务器
async function start() {
try {
// 连接数据库
logger.info('Connecting to PostgreSQL...');
await db.query('SELECT 1');
// 连接Redis
logger.info('Connecting to Redis...');
await redis.connect();
// 启动HTTP服务器
app.listen(PORT, () => {
logger.info(`HTTP server started on port ${PORT}`);
});
// 启动WebSocket服务器
new LeaderboardWebSocket(process.env.WS_PORT);
// 定时创建快照(每天凌晨)
setInterval(() => {
const now = new Date();
if (now.getHours() === 0 && now.getMinutes() === 0) {
leaderboardService.createDailySnapshot();
}
}, 60000);
logger.info('All services started successfully');
} catch (error) {
logger.error('Failed to start server', error);
process.exit(1);
}
}
// 优雅关闭
process.on('SIGTERM', async () => {
logger.info('SIGTERM received, shutting down gracefully...');
await redis.disconnect();
await db.close();
process.exit(0);
});
start();
Code collapsed
API端点文档
提交分数
code
POST /api/scores
Content-Type: application/json
{
"userId": 1,
"score": 1000,
"category": "daily_challenge",
"metadata": {
"level": 5,
"timeSpent": 300
}
}
Code collapsed
获取排行榜
code
GET /api/leaderboard?category=daily_challenge&limit=10
Code collapsed
获取用户排名
code
GET /api/users/1/ranking?category=daily_challenge
Code collapsed
获取分数历史
code
GET /api/users/1/history?category=daily_challenge&limit=10
Code collapsed
性能优化建议
-
Redis优化:
- 使用Pipeline批量操作
- 设置合理的maxmemory-policy
- 考虑使用Redis Cluster做水平扩展
-
PostgreSQL优化:
- 为score表添加合适的索引
- 使用连接池管理连接
- 定期VACUUM和ANALYZE
-
应用层优化:
- 实现请求限流
- 使用HTTP缓存头
- 考虑CDN分发静态资源
通过本教程,你已掌握使用Node.js、PostgreSQL和Redis构建高性能实时排行榜系统的核心技术。这个架构可轻松扩展到各种竞赛、游戏或健身应用场景。