康心伴Logo
康心伴WellAlly
Backend Development

使用Node.js构建实时排行榜(PostgreSQL+Redis)

5 分钟阅读

使用Node.js构建实时排行榜 (PostgreSQL + Redis)

在健身应用、游戏或竞赛平台中,实时排行榜是提高用户参与度的关键功能。本文将教你如何使用Node.js、PostgreSQL和Redis构建一个高性能的实时排行榜系统。

为什么使用PostgreSQL + Redis?

方面PostgreSQLRedis
用途持久化存储、复杂查询高速缓存、排名计算
优势ACID保证、关系数据内存操作、Sorted Set
角色数据主存储、历史记录实时排名、临时数据

组合优势:

  • PostgreSQL保证数据持久性和一致性
  • Redis提供毫秒级排名查询
  • 两者结合实现性能与可靠性的平衡

项目架构

code
leaderboard-system/
├── src/
│   ├── config/
│   │   ├── database.js         # 数据库配置
│   │   └── redis.js            # Redis配置
│   ├── models/
│   │   ├── User.js             # 用户模型
│   │   └── Score.js            # 分数模型
│   ├── services/
│   │   ├── leaderboard.service.js   # 排行榜服务
│   │   └── cache.service.js         # 缓存服务
│   ├── routes/
│   │   └── leaderboard.js      # API路由
│   ├── websocket/
│   │   └── leaderboard.handler.js   # WebSocket处理
│   └── app.js                  # 应用入口
├── tests/
├── migrations/
├── .env.example
├── package.json
└── README.md
Code collapsed

1. 项目初始化

package.json

code
{
  "name": "realtime-leaderboard",
  "version": "1.0.0",
  "description": "实时排行榜系统",
  "main": "src/app.js",
  "scripts": {
    "start": "node src/app.js",
    "dev": "nodemon src/app.js",
    "test": "jest",
    "migrate": "node migrations/run.js"
  },
  "dependencies": {
    "express": "^4.18.2",
    "pg": "^8.11.3",
    "redis": "^4.6.10",
    "ws": "^8.14.2",
    "dotenv": "^16.3.1",
    "joi": "^17.11.0",
    "winston": "^3.11.0"
  },
  "devDependencies": {
    "nodemon": "^3.0.1",
    "jest": "^29.7.0",
    "supertest": "^6.3.3"
  }
}
Code collapsed

.env.example

code
# 应用配置
PORT=3000
NODE_ENV=development

# PostgreSQL配置
DB_HOST=localhost
DB_PORT=5432
DB_NAME=leaderboard
DB_USER=postgres
DB_PASSWORD=your_password

# Redis配置
REDIS_HOST=localhost
REDIS_PORT=6379
REDIS_PASSWORD=
REDIS_DB=0

# WebSocket配置
WS_PORT=3001
Code collapsed

2. 数据库设计

migrations/001_create_tables.sql

code
-- 用户表
CREATE TABLE IF NOT EXISTS users (
    id SERIAL PRIMARY KEY,
    username VARCHAR(50) UNIQUE NOT NULL,
    display_name VARCHAR(100),
    avatar_url VARCHAR(255),
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);

-- 分数历史表
CREATE TABLE IF NOT EXISTS scores (
    id SERIAL PRIMARY KEY,
    user_id INTEGER REFERENCES users(id) ON DELETE CASCADE,
    score INTEGER NOT NULL,
    category VARCHAR(50) DEFAULT 'default',
    achieved_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    metadata JSONB,
    INDEX idx_user_scores (user_id, achieved_at DESC),
    INDEX idx_category_scores (category, score DESC)
);

-- 排行榜快照表(每日备份)
CREATE TABLE IF NOT EXISTS leaderboard_snapshots (
    id SERIAL PRIMARY KEY,
    category VARCHAR(50) NOT NULL,
    snapshot_date DATE NOT NULL,
    rankings JSONB NOT NULL,
    created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
    UNIQUE(category, snapshot_date)
);

-- 创建更新时间触发器
CREATE OR REPLACE FUNCTION update_updated_at_column()
RETURNS TRIGGER AS $$
BEGIN
    NEW.updated_at = CURRENT_TIMESTAMP;
    RETURN NEW;
END;
$$ language 'plpgsql';

CREATE TRIGGER update_users_updated_at
    BEFORE UPDATE ON users
    FOR EACH ROW
    EXECUTE FUNCTION update_updated_at_column();
Code collapsed

3. 数据库连接层

src/config/database.js

code
const { Pool } = require('pg');
const logger = require('../utils/logger');

class Database {
    constructor() {
        this.pool = new Pool({
            host: process.env.DB_HOST,
            port: process.env.DB_PORT,
            database: process.env.DB_NAME,
            user: process.env.DB_USER,
            password: process.env.DB_PASSWORD,
            max: 20,
            idleTimeoutMillis: 30000,
            connectionTimeoutMillis: 2000,
        });

        this.pool.on('error', (err) => {
            logger.error('Unexpected database error', err);
            process.exit(-1);
        });
    }

    async query(text, params) {
        const start = Date.now();
        try {
            const result = await this.pool.query(text, params);
            const duration = Date.now() - start;
            logger.debug('Executed query', { text, duration, rows: result.rowCount });
            return result;
        } catch (error) {
            logger.error('Database query error', { text, error: error.message });
            throw error;
        }
    }

    async transaction(callback) {
        const client = await this.pool.connect();
        try {
            await client.query('BEGIN');
            const result = await callback(client);
            await client.query('COMMIT');
            return result;
        } catch (error) {
            await client.query('ROLLBACK');
            throw error;
        } finally {
            client.release();
        }
    }

    async close() {
        await this.pool.end();
        logger.info('Database connection closed');
    }
}

module.exports = new Database();
Code collapsed

src/config/redis.js

code
const redis = require('redis');
const logger = require('../utils/logger');

class RedisClient {
    constructor() {
        this.client = null;
        this.subscriber = null;
    }

    async connect() {
        this.client = redis.createClient({
            socket: {
                host: process.env.REDIS_HOST,
                port: process.env.REDIS_PORT,
            },
            password: process.env.REDIS_PASSWORD,
            database: process.env.REDIS_DB || 0,
        });

        this.subscriber = this.client.duplicate();

        await Promise.all([
            this.client.connect(),
            this.subscriber.connect()
        ]);

        this.client.on('error', (err) => {
            logger.error('Redis client error', err);
        });

        logger.info('Redis connected successfully');
    }

    async disconnect() {
        await Promise.all([
            this.client.quit(),
            this.subscriber.quit()
        ]);
        logger.info('Redis disconnected');
    }

    // 排行榜操作
    async addToLeaderboard(key, score, member) {
        return await this.client.zAdd(key, {
            score,
            value: member
        });
    }

    async getRank(key, member) {
        // zRevRank返回从0开始的排名(降序)
        const rank = await this.client.zRevRank(key, member);
        return rank !== null ? rank + 1 : null;
    }

    async getScore(key, member) {
        return await this.client.zScore(key, member);
    }

    async getTopMembers(key, start = 0, stop = 9) {
        return await this.client.zRevRangeWithScores(
            key,
            start,
            stop
        );
    }

    async getMemberRange(key, member, withScores = false) {
        const rank = await this.getRank(key, member);
        if (rank === null) return null;

        const start = Math.max(0, rank - 2);
        const stop = rank + 2;
        return await this.client.zRevRangeWithScores(key, start, stop);
    }

    async getMemberCount(key) {
        return await this.client.zCard(key);
    }

    async removeMember(key, member) {
        return await this.client.zRem(key, member);
    }

    // 缓存操作
    async set(key, value, ttl = 3600) {
        const serialized = JSON.stringify(value);
        if (ttl) {
            await this.client.setEx(key, ttl, serialized);
        } else {
            await this.client.set(key, serialized);
        }
    }

    async get(key) {
        const data = await this.client.get(key);
        return data ? JSON.parse(data) : null;
    }

    async del(key) {
        return await this.client.del(key);
    }

    // 发布/订阅
    async publish(channel, message) {
        return await this.client.publish(channel, JSON.stringify(message));
    }

    async subscribe(channel, callback) {
        await this.subscriber.subscribe(channel, (message) => {
            try {
                const data = JSON.parse(message);
                callback(data);
            } catch (error) {
                logger.error('Failed to parse subscribed message', error);
            }
        });
    }
}

module.exports = new RedisClient();
Code collapsed

4. 排行榜服务

src/services/leaderboard.service.js

code
const db = require('../config/database');
const redis = require('../config/redis');
const logger = require('../utils/logger');

class LeaderboardService {
    constructor() {
        this.CACHE_PREFIX = 'leaderboard';
        this.UPDATE_CHANNEL = 'leaderboard:updates';
    }

    /**
     * 提交分数
     */
    async submitScore(userId, score, category = 'default', metadata = {}) {
        return await db.transaction(async (client) => {
            // 1. 获取用户信息
            const userResult = await client.query(
                'SELECT id, username, display_name, avatar_url FROM users WHERE id = $1',
                [userId]
            );

            if (userResult.rows.length === 0) {
                throw new Error('User not found');
            }

            const user = userResult.rows[0];

            // 2. 保存分数到PostgreSQL
            const scoreResult = await client.query(
                `INSERT INTO scores (user_id, score, category, metadata)
                 VALUES ($1, $2, $3, $4)
                 RETURNING id, achieved_at`,
                [userId, score, category, metadata]
            );

            // 3. 更新Redis排行榜
            const leaderboardKey = this.getLeaderboardKey(category);
            await redis.addToLeaderboard(
                leaderboardKey,
                score,
                userId.toString()
            );

            // 4. 获取新排名
            const newRank = await redis.getRank(leaderboardKey, userId.toString());

            // 5. 获取周围排名
            const neighbors = await this._getNeighborRanks(
                leaderboardKey,
                userId.toString(),
                user
            );

            // 6. 发布更新事件
            await redis.publish(this.UPDATE_CHANNEL, {
                type: 'score_updated',
                category,
                userId,
                score,
                rank: newRank,
                neighbors
            });

            // 7. 清除缓存
            await this._clearCache(leaderboardKey);

            return {
                success: true,
                scoreId: scoreResult.rows[0].id,
                achievedAt: scoreResult.rows[0].achieved_at,
                rank: newRank,
                neighbors
            };
        });
    }

    /**
     * 获取排行榜TOP N
     */
    async getTopScores(category = 'default', limit = 10) {
        const cacheKey = `${this.CACHE_PREFIX}:${category}:top:${limit}`;

        // 尝试从缓存获取
        const cached = await redis.get(cacheKey);
        if (cached) {
            return cached;
        }

        const leaderboardKey = this.getLeaderboardKey(category);
        const members = await redis.getTopMembers(leaderboardKey, 0, limit - 1);

        if (members.length === 0) {
            return { rankings: [], total: 0 };
        }

        // 批量获取用户信息
        const userIds = members.map(m => parseInt(m.value));
        const users = await this._getUsersByIds(userIds);

        // 组装排名数据
        const rankings = members.map((member, index) => {
            const user = users.get(parseInt(member.value));
            return {
                rank: index + 1,
                score: member.score,
                user: user ? {
                    id: user.id,
                    username: user.username,
                    displayName: user.display_name,
                    avatarUrl: user.avatar_url
                } : null
            };
        });

        const result = {
            rankings,
            total: await redis.getMemberCount(leaderboardKey)
        };

        // 缓存结果(30秒)
        await redis.set(cacheKey, result, 30);

        return result;
    }

    /**
     * 获取用户的排名和周围玩家
     */
    async getUserRanking(userId, category = 'default') {
        const leaderboardKey = this.getLeaderboardKey(category);

        // 获取用户分数和排名
        const [score, rank] = await Promise.all([
            redis.getScore(leaderboardKey, userId.toString()),
            redis.getRank(leaderboardKey, userId.toString())
        ]);

        if (score === null || rank === null) {
            return null;
        }

        // 获取用户信息
        const userResult = await db.query(
            'SELECT id, username, display_name, avatar_url FROM users WHERE id = $1',
            [userId]
        );

        if (userResult.rows.length === 0) {
            return null;
        }

        const user = userResult.rows[0];

        // 获取周围排名
        const neighbors = await this._getNeighborRanks(
            leaderboardKey,
            userId.toString(),
            user
        );

        return {
            rank,
            score,
            user,
            neighbors
        };
    }

    /**
     * 获取分数历史
     */
    async getUserScoreHistory(userId, category = 'default', limit = 10) {
        const result = await db.query(
            `SELECT score, achieved_at, metadata
             FROM scores
             WHERE user_id = $1 AND category = $2
             ORDER BY achieved_at DESC
             LIMIT $3`,
            [userId, category, limit]
        );

        return result.rows;
    }

    /**
     * 创建每日快照
     */
    async createDailySnapshot(category = 'default') {
        const today = new Date().toISOString().split('T')[0];
        const leaderboardKey = this.getLeaderboardKey(category);
        const members = await redis.getTopMembers(leaderboardKey, 0, -1);

        const userIds = members.map(m => parseInt(m.value));
        const users = await this._getUsersByIds(userIds);

        const rankings = members.map((member, index) => {
            const user = users.get(parseInt(member.value));
            return {
                rank: index + 1,
                userId: parseInt(member.value),
                username: user?.username,
                score: member.score
            };
        });

        await db.query(
            `INSERT INTO leaderboard_snapshots (category, snapshot_date, rankings)
             VALUES ($1, $2, $3)
             ON CONFLICT (category, snapshot_date)
             DO UPDATE SET rankings = $3`,
            [category, today, JSON.stringify(rankings)]
        );

        logger.info(`Created daily snapshot for category: ${category}`);
    }

    /**
     * 获取辅助方法
     */
    getLeaderboardKey(category) {
        return `leaderboard:${category}`;
    }

    async _getNeighborRanks(leaderboardKey, userIdStr, user) {
        const range = await redis.getMemberRange(leaderboardKey, userIdStr);
        if (!range) return [];

        const userIds = range.map(m => parseInt(m.value));
        const users = await this._getUsersByIds(userIds);

        return range.map((member) => {
            const memberUser = users.get(parseInt(member.value));
            return {
                userId: parseInt(member.value),
                username: memberUser?.username,
                displayName: memberUser?.display_name,
                score: member.score
            };
        });
    }

    async _getUsersByIds(userIds) {
        if (userIds.length === 0) return new Map();

        const result = await db.query(
            `SELECT id, username, display_name, avatar_url
             FROM users
             WHERE id = ANY($1)`,
            [userIds]
        );

        return new Map(result.rows.map(row => [row.id, row]));
    }

    async _clearCache(leaderboardKey) {
        // 清除相关缓存
        const pattern = `${this.CACHE_PREFIX}:${leaderboardKey.split(':')[1]}:*`;
        // 实际项目中可能需要SCAN命令来查找匹配的键
        await redis.del(`${leaderboardKey}:top:10`);
        await redis.del(`${leaderboardKey}:top:50`);
    }
}

module.exports = new LeaderboardService();
Code collapsed

5. API路由

src/routes/leaderboard.js

code
const express = require('express');
const Joi = require('joi');
const leaderboardService = require('../services/leaderboard.service');
const logger = require('../utils/logger');

const router = express.Router();

// 验证中间件
const validate = (schema) => (req, res, next) => {
    const { error } = schema.validate(req.body);
    if (error) {
        return res.status(400).json({
            success: false,
            error: error.details[0].message
        });
    }
    next();
};

// 提交分数
router.post('/scores', validate({
    userId: Joi.number().integer().positive().required(),
    score: Joi.number().integer().required(),
    category: Joi.string().default('default'),
    metadata: Joi.object().default({})
}), async (req, res) => {
    try {
        const { userId, score, category, metadata } = req.body;

        const result = await leaderboardService.submitScore(
            userId,
            score,
            category,
            metadata
        );

        res.json({
            success: true,
            data: result
        });
    } catch (error) {
        logger.error('Error submitting score', error);
        res.status(500).json({
            success: false,
            error: error.message
        });
    }
});

// 获取排行榜
router.get('/leaderboard', async (req, res) => {
    try {
        const category = req.query.category || 'default';
        const limit = Math.min(parseInt(req.query.limit) || 10, 100);

        const result = await leaderboardService.getTopScores(category, limit);

        res.json({
            success: true,
            data: result
        });
    } catch (error) {
        logger.error('Error fetching leaderboard', error);
        res.status(500).json({
            success: false,
            error: error.message
        });
    }
});

// 获取用户排名
router.get('/users/:userId/ranking', async (req, res) => {
    try {
        const userId = parseInt(req.params.userId);
        const category = req.query.category || 'default';

        const result = await leaderboardService.getUserRanking(userId, category);

        if (!result) {
            return res.status(404).json({
                success: false,
                error: 'User ranking not found'
            });
        }

        res.json({
            success: true,
            data: result
        });
    } catch (error) {
        logger.error('Error fetching user ranking', error);
        res.status(500).json({
            success: false,
            error: error.message
        });
    }
});

// 获取用户历史
router.get('/users/:userId/history', async (req, res) => {
    try {
        const userId = parseInt(req.params.userId);
        const category = req.query.category || 'default';
        const limit = Math.min(parseInt(req.query.limit) || 10, 100);

        const history = await leaderboardService.getUserScoreHistory(
            userId,
            category,
            limit
        );

        res.json({
            success: true,
            data: history
        });
    } catch (error) {
        logger.error('Error fetching score history', error);
        res.status(500).json({
            success: false,
            error: error.message
        });
    }
});

module.exports = router;
Code collapsed

6. WebSocket实时更新

src/websocket/leaderboard.handler.js

code
const WebSocket = require('ws');
const redis = require('../config/redis');
const leaderboardService = require('../services/leaderboard.service');
const logger = require('../utils/logger');

class LeaderboardWebSocket {
    constructor(port = 3001) {
        this.wss = new WebSocket.Server({ port });
        this.clients = new Set();

        this.wss.on('connection', (ws) => {
            this.handleConnection(ws);
        });

        // 订阅Redis更新事件
        this.subscribeToUpdates();

        logger.info(`WebSocket server started on port ${port}`);
    }

    handleConnection(ws) {
        this.clients.add(ws);
        logger.info('Client connected', { clientCount: this.clients.size });

        ws.on('message', async (message) => {
            try {
                const data = JSON.parse(message);
                await this.handleMessage(ws, data);
            } catch (error) {
                logger.error('Error handling WebSocket message', error);
            }
        });

        ws.on('close', () => {
            this.clients.delete(ws);
            logger.info('Client disconnected', { clientCount: this.clients.size });
        });

        // 发送欢迎消息
        ws.send(JSON.stringify({
            type: 'connected',
            message: 'Connected to leaderboard updates'
        }));
    }

    async handleMessage(ws, data) {
        switch (data.type) {
            case 'subscribe':
                // 订阅特定分类的更新
                ws.category = data.category || 'default';
                ws.send(JSON.stringify({
                    type: 'subscribed',
                    category: ws.category
                }));
                break;

            case 'get_ranking':
                if (data.userId) {
                    const ranking = await leaderboardService.getUserRanking(
                        data.userId,
                        data.category || 'default'
                    );
                    ws.send(JSON.stringify({
                        type: 'ranking',
                        data: ranking
                    }));
                }
                break;
        }
    }

    async subscribeToUpdates() {
        await redis.subscribe(
            leaderboardService.UPDATE_CHANNEL,
            (message) => {
                this.broadcast(message);
            }
        );
    }

    broadcast(message) {
        const data = JSON.stringify(message);

        this.clients.forEach((client) => {
            // 发送给订阅了该分类的客户端
            if (!client.category || client.category === message.category) {
                if (client.readyState === WebSocket.OPEN) {
                    client.send(data);
                }
            }
        });
    }
}

module.exports = LeaderboardWebSocket;
Code collapsed

7. 应用入口

src/app.js

code
require('dotenv').config();
const express = require('express');
const db = require('./config/database');
const redis = require('./config/redis');
const leaderboardRoutes = require('./routes/leaderboard');
const LeaderboardWebSocket = require('./websocket/leaderboard.handler');
const logger = require('./utils/logger');

const app = express();
const PORT = process.env.PORT || 3000;

// 中间件
app.use(express.json());
app.use(express.urlencoded({ extended: true }));

// 路由
app.use('/api', leaderboardRoutes);

// 健康检查
app.get('/health', async (req, res) => {
    try {
        await db.query('SELECT 1');
        await redis.client.ping();
        res.json({
            status: 'healthy',
            database: 'connected',
            redis: 'connected',
            timestamp: new Date().toISOString()
        });
    } catch (error) {
        res.status(503).json({
            status: 'unhealthy',
            error: error.message
        });
    }
});

// 启动服务器
async function start() {
    try {
        // 连接数据库
        logger.info('Connecting to PostgreSQL...');
        await db.query('SELECT 1');

        // 连接Redis
        logger.info('Connecting to Redis...');
        await redis.connect();

        // 启动HTTP服务器
        app.listen(PORT, () => {
            logger.info(`HTTP server started on port ${PORT}`);
        });

        // 启动WebSocket服务器
        new LeaderboardWebSocket(process.env.WS_PORT);

        // 定时创建快照(每天凌晨)
        setInterval(() => {
            const now = new Date();
            if (now.getHours() === 0 && now.getMinutes() === 0) {
                leaderboardService.createDailySnapshot();
            }
        }, 60000);

        logger.info('All services started successfully');
    } catch (error) {
        logger.error('Failed to start server', error);
        process.exit(1);
    }
}

// 优雅关闭
process.on('SIGTERM', async () => {
    logger.info('SIGTERM received, shutting down gracefully...');
    await redis.disconnect();
    await db.close();
    process.exit(0);
});

start();
Code collapsed

API端点文档

提交分数

code
POST /api/scores
Content-Type: application/json

{
  "userId": 1,
  "score": 1000,
  "category": "daily_challenge",
  "metadata": {
    "level": 5,
    "timeSpent": 300
  }
}
Code collapsed

获取排行榜

code
GET /api/leaderboard?category=daily_challenge&limit=10
Code collapsed

获取用户排名

code
GET /api/users/1/ranking?category=daily_challenge
Code collapsed

获取分数历史

code
GET /api/users/1/history?category=daily_challenge&limit=10
Code collapsed

性能优化建议

  1. Redis优化:

    • 使用Pipeline批量操作
    • 设置合理的maxmemory-policy
    • 考虑使用Redis Cluster做水平扩展
  2. PostgreSQL优化:

    • 为score表添加合适的索引
    • 使用连接池管理连接
    • 定期VACUUM和ANALYZE
  3. 应用层优化:

    • 实现请求限流
    • 使用HTTP缓存头
    • 考虑CDN分发静态资源

通过本教程,你已掌握使用Node.js、PostgreSQL和Redis构建高性能实时排行榜系统的核心技术。这个架构可轻松扩展到各种竞赛、游戏或健身应用场景。

#

文章标签

Node.js
PostgreSQL
Redis
排行榜
实时应用
游戏开发

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅