PostgreSQL 多租户数据库:行级安全(RLS)完整指南
概述
行级安全(Row-Level Security, RLS)是 PostgreSQL 的强大功能,可以在数据库层面实现细粒度的访问控制。对于多租户 SaaS 应用,RLS 提供了租户数据隔离的最佳方案。
为什么选择 RLS?
| 特性 | RLS | 应用层过滤 |
|---|---|---|
| 数据隔离 | 数据库强制 | 应用代码负责 |
| 安全性 | 高(防御 SQL 注入) | 中(依赖代码质量) |
| 性能 | 索引友好 | 需要额外处理 |
| 维护成本 | 低 | 高 |
| 审计合规 | 内置支持 | 需手动实现 |
RLS 核心概念
- 策略(Policy):定义访问规则的数据库对象
- 表达式(Expression):使用
USING(读取)和WITH CHECK(写入) - 角色(Role):策略可以基于当前用户角色
- 租户上下文:通过
SET LOCAL或自定义 GUC 设置
数据库架构设计
1. 基础表结构
code
-- =============================================================================
-- 多租户数据库架构 - 行级安全实现
-- =============================================================================
-- 租户表
CREATE TABLE tenants (
id BIGSERIAL PRIMARY KEY,
name VARCHAR(255) NOT NULL,
slug VARCHAR(100) UNIQUE NOT NULL,
plan VARCHAR(50) DEFAULT 'free', -- 'free', 'pro', 'enterprise'
settings JSONB DEFAULT '{}',
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- 用户表(带租户关联)
CREATE TABLE users (
id BIGSERIAL PRIMARY KEY,
tenant_id BIGINT NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
email VARCHAR(255) NOT NULL,
name VARCHAR(255),
role VARCHAR(50) DEFAULT 'member', -- 'owner', 'admin', 'member'
is_active BOOLEAN DEFAULT TRUE,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
UNIQUE(tenant_id, email)
);
-- 项目表(租户数据示例)
CREATE TABLE projects (
id BIGSERIAL PRIMARY KEY,
tenant_id BIGINT NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
name VARCHAR(255) NOT NULL,
description TEXT,
status VARCHAR(50) DEFAULT 'active',
created_by BIGINT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- 任务表(关联项目的租户数据)
CREATE TABLE tasks (
id BIGSERIAL PRIMARY KEY,
tenant_id BIGINT NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
project_id BIGINT NOT NULL REFERENCES projects(id) ON DELETE CASCADE,
title VARCHAR(255) NOT NULL,
description TEXT,
status VARCHAR(50) DEFAULT 'todo',
assigned_to BIGINT REFERENCES users(id),
created_by BIGINT NOT NULL,
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
-- 创建索引
CREATE INDEX idx_users_tenant_id ON users(tenant_id);
CREATE INDEX idx_users_email ON users(email);
CREATE INDEX idx_projects_tenant_id ON projects(tenant_id);
CREATE INDEX idx_tasks_tenant_id ON tasks(tenant_id);
CREATE INDEX idx_tasks_project_id ON tasks(project_id);
Code collapsed
2. 启用行级安全
code
-- =============================================================================
-- 启用 RLS 并创建策略
-- =============================================================================
-- 为所有租户相关表启用 RLS
ALTER TABLE users ENABLE ROW LEVEL SECURITY;
ALTER TABLE projects ENABLE ROW LEVEL SECURITY;
ALTER TABLE tasks ENABLE ROW LEVEL SECURITY;
-- 租户上下文变量(自定义 GUC)
-- 使用 SET LOCAL app.current_tenant_id = '123'
DO $$
BEGIN
EXECUTE format('ALTER DATABASE %s SET app.current_tenant_id TO 0', current_database());
EXCEPTION
WHEN OTHERS THEN
NULL;
END $$;
-- 创建租户上下文设置函数
CREATE OR REPLACE FUNCTION set_tenant_context(tenant_id BIGINT)
RETURNS VOID AS $$
BEGIN
PERFORM set_config('app.current_tenant_id', tenant_id::text, FALSE);
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
-- 获取当前租户 ID
CREATE OR REPLACE FUNCTION current_tenant_id()
RETURNS BIGINT AS $$
BEGIN
RETURN NULLIF(current_setting('app.current_tenant_id', TRUE), '')::BIGINT;
EXCEPTION
WHEN OTHERS THEN
RETURN NULL;
END;
$$ LANGUAGE plpgsql STABLE;
Code collapsed
3. 创建 RLS 策略
code
-- =============================================================================
-- 用户表策略
-- =============================================================================
-- 用户只能读取自己租户的用户
CREATE POLICY users_select_own_tenant
ON users
FOR SELECT
USING (
tenant_id = current_tenant_id()
);
-- 只有租户管理员可以创建用户
CREATE POLICY users_insert_admin_only
ON users
FOR INSERT
WITH CHECK (
tenant_id = current_tenant_id() AND
EXISTS (
SELECT 1 FROM users
WHERE id = current_user_id()
AND tenant_id = current_tenant_id()
AND role IN ('owner', 'admin')
)
);
-- 用户只能更新自己租户的用户
CREATE POLICY users_update_own_tenant
ON users
FOR UPDATE
USING (
tenant_id = current_tenant_id()
)
WITH CHECK (
tenant_id = current_tenant_id()
);
-- 只有租户所有者可以删除用户
CREATE POLICY users_delete_owner_only
ON users
FOR DELETE
USING (
tenant_id = current_tenant_id() AND
EXISTS (
SELECT 1 FROM users
WHERE id = current_user_id()
AND tenant_id = current_tenant_id()
AND role = 'owner'
)
);
-- =============================================================================
-- 项目表策略
-- =============================================================================
-- 只能读取自己租户的项目
CREATE POLICY projects_select_own_tenant
ON projects
FOR SELECT
USING (tenant_id = current_tenant_id());
-- 只能创建自己租户的项目
CREATE POLICY projects_insert_own_tenant
ON projects
FOR INSERT
WITH CHECK (
tenant_id = current_tenant_id() AND
created_by = current_user_id()
);
-- 只能更新自己租户的项目
CREATE POLICY projects_update_own_tenant
ON projects
FOR UPDATE
USING (tenant_id = current_tenant_id())
WITH CHECK (
tenant_id = current_tenant_id()
);
-- 只有项目创建者或管理员可以删除
CREATE POLICY projects_delete_creator_or_admin
ON projects
FOR DELETE
USING (
tenant_id = current_tenant_id() AND
(
created_by = current_user_id() OR
EXISTS (
SELECT 1 FROM users
WHERE id = current_user_id()
AND tenant_id = current_tenant_id()
AND role IN ('owner', 'admin')
)
)
);
-- =============================================================================
-- 任务表策略
-- =============================================================================
-- 只能读取自己租户的任务
CREATE POLICY tasks_select_own_tenant
ON tasks
FOR SELECT
USING (tenant_id = current_tenant_id());
-- 只能创建自己租户的任务
CREATE POLICY tasks_insert_own_tenant
ON tasks
FOR INSERT
WITH CHECK (
tenant_id = current_tenant_id() AND
project_id IN (
SELECT id FROM projects
WHERE tenant_id = current_tenant_id()
)
);
-- 只能更新自己租户的任务
CREATE POLICY tasks_update_own_tenant
ON tasks
FOR UPDATE
USING (tenant_id = current_tenant_id())
WITH CHECK (
tenant_id = current_tenant_id() AND
project_id IN (
SELECT id FROM projects
WHERE tenant_id = current_tenant_id()
)
);
-- 删除策略:创建者、受派者或管理员
CREATE POLICY tasks_delete_authorized_users
ON tasks
FOR DELETE
USING (
tenant_id = current_tenant_id() AND
(
created_by = current_user_id() OR
assigned_to = current_user_id() OR
EXISTS (
SELECT 1 FROM users
WHERE id = current_user_id()
AND tenant_id = current_tenant_id()
AND role IN ('owner', 'admin')
)
)
);
Code collapsed
4. 高级策略:基于角色的访问控制
code
-- =============================================================================
-- 基于角色的细粒度访问控制
-- =============================================================================
-- 创建角色权限表
CREATE TABLE role_permissions (
id BIGSERIAL PRIMARY KEY,
tenant_id BIGINT NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
role VARCHAR(50) NOT NULL,
resource VARCHAR(100) NOT NULL, -- 'projects', 'tasks', 'users'
permissions TEXT[] NOT NULL, -- ['read', 'create', 'update', 'delete']
UNIQUE(tenant_id, role, resource)
);
-- 插入默认权限
INSERT INTO role_permissions (tenant_id, role, resource, permissions)
SELECT
id,
'owner',
unnest(ARRAY['projects', 'tasks', 'users', 'settings']),
ARRAY['read', 'create', 'update', 'delete']
FROM tenants;
INSERT INTO role_permissions (tenant_id, role, resource, permissions)
SELECT
id,
'admin',
unnest(ARRAY['projects', 'tasks', 'users']),
ARRAY['read', 'create', 'update', 'delete']
FROM tenants;
INSERT INTO role_permissions (tenant_id, role, resource, permissions)
SELECT
id,
'member',
unnest(ARRAY['projects', 'tasks']),
ARRAY['read', 'create']
FROM tenants;
-- 检查权限函数
CREATE OR REPLACE FUNCTION has_permission(
resource VARCHAR,
permission VARCHAR
)
RETURNS BOOLEAN AS $$
DECLARE
user_role VARCHAR;
user_permissions TEXT[];
BEGIN
-- 获取当前用户在当前租户的角色
SELECT role INTO user_role
FROM users
WHERE id = current_user_id()
AND tenant_id = current_tenant_id();
IF user_role IS NULL THEN
RETURN FALSE;
END IF;
-- 检查是否有该权限
SELECT permissions INTO user_permissions
FROM role_permissions
WHERE tenant_id = current_tenant_id()
AND role = user_role
AND resource = resource;
RETURN user_permissions @> ARRAY[permission];
END;
$$ LANGUAGE plpgsql STABLE;
-- 使用权限的动态策略
DROP POLICY IF EXISTS projects_delete_creator_or_admin ON projects;
CREATE POLICY projects_delete_with_permission
ON projects
FOR DELETE
USING (
tenant_id = current_tenant_id() AND
(created_by = current_user_id() OR has_permission('projects', 'delete'))
);
Code collapsed
Node.js 实现
1. 数据库连接中间件
code
// lib/db/tenant-context.ts
import { Pool, PoolClient } from 'pg';
export class TenantContextManager {
private pool: Pool;
constructor(pool: Pool) {
this.pool = pool;
}
/**
* 为请求设置租户上下文
*/
async withTenant<T>(
tenantId: number,
callback: (client: PoolClient) => Promise<T>
): Promise<T> {
const client = await this.pool.connect();
try {
// 设置租户上下文
await client.query('SELECT set_tenant_context($1)', [tenantId]);
// 执行回调
const result = await callback(client);
return result;
} finally {
// 清理上下文
await client.query('SELECT set_config(%s, %s, %s)', [
'app.current_tenant_id',
'',
false,
]);
client.release();
}
}
/**
* 验证用户是否有权访问租户
*/
async validateTenantAccess(userId: number, tenantId: number): Promise<boolean> {
const result = await this.pool.query(
`SELECT 1 FROM users
WHERE id = $1 AND tenant_id = $2 AND is_active = TRUE`,
[userId, tenantId]
);
return result.rowCount > 0;
}
/**
* 获取用户在租户中的角色
*/
async getUserRole(userId: number, tenantId: number): Promise<string | null> {
const result = await this.pool.query(
`SELECT role FROM users
WHERE id = $1 AND tenant_id = $2`,
[userId, tenantId]
);
return result.rows[0]?.role || null;
}
}
Code collapsed
2. 请求中间件
code
// middleware/tenant-middleware.ts
import { NextRequest, NextResponse } from 'next/server';
import { TenantContextManager } from '@/lib/db/tenant-context';
import { getSession } from '@/lib/auth';
const tenantManager = new TenantContextManager(pool);
/**
* 从请求中提取租户 ID
*/
function extractTenantId(request: NextRequest): number | null {
// 方法 1: 从子域名提取(tenant.example.com)
const host = request.headers.get('host');
if (host) {
const subdomain = host.split('.')[0];
// 将子域名转换为租户 ID
const tenantId = getTenantIdFromSlug(subdomain);
if (tenantId) return tenantId;
}
// 方法 2: 从请求头提取(X-Tenant-ID)
const headerTenantId = request.headers.get('X-Tenant-ID');
if (headerTenantId) {
return parseInt(headerTenantId);
}
// 方法 3: 从路径参数提取(/t/:tenantId/...)
const url = new URL(request.url);
const pathTenantId = url.pathname.split('/')[2];
if (pathTenantId && !isNaN(parseInt(pathTenantId))) {
return parseInt(pathTenantId);
}
return null;
}
/**
* 租户中间件
*/
export async function tenantMiddleware(
request: NextRequest
): Promise<NextResponse | null> {
// 获取租户 ID
const tenantId = extractTenantId(request);
if (!tenantId) {
return NextResponse.json(
{ error: '租户 ID 缺失' },
{ status: 400 }
);
}
// 获取当前用户
const session = await getSession(request);
if (!session?.userId) {
return NextResponse.json(
{ error: '未授权' },
{ status: 401 }
);
}
// 验证用户是否有权访问该租户
const hasAccess = await tenantManager.validateTenantAccess(
session.userId,
tenantId
);
if (!hasAccess) {
return NextResponse.json(
{ error: '无权访问此租户' },
{ status: 403 }
);
}
// 将租户 ID 添加到请求头供后续使用
const requestHeaders = new Headers(request.headers);
requestHeaders.set('X-Tenant-ID', tenantId.toString());
return null; // 继续处理请求
}
/**
* 获取租户 ID 的包装器
*/
export function getTenantId(request: NextRequest): number {
const tenantId = request.headers.get('X-Tenant-ID');
if (!tenantId) {
throw new Error('租户 ID 未设置');
}
return parseInt(tenantId);
}
Code collapsed
3. API 路由示例
code
// app/api/[tenantId]/projects/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { tenantManager } from '@/lib/db/tenant-context';
import { getTenantId } from '@/middleware/tenant-middleware';
/**
* 获取项目列表
*/
export async function GET(
request: NextRequest,
{ params }: { params: { tenantId: string } }
) {
try {
const tenantId = parseInt(params.tenantId);
// 使用租户上下文执行查询
const projects = await tenantManager.withTenant(tenantId, async (client) => {
const result = await client.query(
`SELECT p.*, u.name as creator_name
FROM projects p
LEFT JOIN users u ON p.created_by = u.id
ORDER BY p.created_at DESC`
);
return result.rows;
});
return NextResponse.json({ projects });
} catch (error) {
return NextResponse.json(
{ error: '获取项目失败' },
{ status: 500 }
);
}
}
/**
* 创建新项目
*/
export async function POST(
request: NextRequest,
{ params }: { params: { tenantId: string } }
) {
try {
const tenantId = parseInt(params.tenantId);
const { name, description } = await request.json();
const session = await getSession(request);
const project = await tenantManager.withTenant(tenantId, async (client) => {
const result = await client.query(
`INSERT INTO projects (tenant_id, name, description, created_by)
VALUES ($1, $2, $3, $4)
RETURNING *`,
[tenantId, name, description, session.userId]
);
return result.rows[0];
});
return NextResponse.json({ project }, { status: 201 });
} catch (error) {
return NextResponse.json(
{ error: '创建项目失败' },
{ status: 500 }
);
}
}
Code collapsed
4. 任务管理 API
code
// app/api/[tenantId]/tasks/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { tenantManager } from '@/lib/db/tenant-context';
/**
* 获取任务列表(支持过滤和分页)
*/
export async function GET(
request: NextRequest,
{ params }: { params: { tenantId: string } }
) {
try {
const tenantId = parseInt(params.tenantId);
const url = new URL(request.url);
const projectId = url.searchParams.get('project_id');
const status = url.searchParams.get('status');
const page = parseInt(url.searchParams.get('page') || '1');
const limit = parseInt(url.searchParams.get('limit') || '20');
const offset = (page - 1) * limit;
const tasks = await tenantManager.withTenant(tenantId, async (client) => {
let query = `
SELECT t.*, p.name as project_name,
u.name as assigned_to_name,
c.name as created_by_name
FROM tasks t
JOIN projects p ON t.project_id = p.id
LEFT JOIN users u ON t.assigned_to = u.id
LEFT JOIN users c ON t.created_by = c.id
WHERE 1=1
`;
const params: any[] = [];
let paramCount = 1;
if (projectId) {
query += ` AND t.project_id = $${paramCount}`;
params.push(projectId);
paramCount++;
}
if (status) {
query += ` AND t.status = $${paramCount}`;
params.push(status);
paramCount++;
}
query += ` ORDER BY t.created_at DESC LIMIT $${paramCount} OFFSET $${paramCount + 1}`;
params.push(limit, offset);
const result = await client.query(query, params);
// 获取总数
let countQuery = 'SELECT COUNT(*) FROM tasks WHERE 1=1';
const countParams: any[] = [];
let countParamCount = 1;
if (projectId) {
countQuery += ` AND project_id = $${countParamCount}`;
countParams.push(projectId);
countParamCount++;
}
if (status) {
countQuery += ` AND status = $${countParamCount}`;
countParams.push(status);
countParamCount++;
}
const countResult = await client.query(countQuery, countParams);
return {
tasks: result.rows,
total: parseInt(countResult.rows[0].count),
page,
limit,
};
});
return NextResponse.json(tasks);
} catch (error) {
return NextResponse.json(
{ error: '获取任务失败' },
{ status: 500 }
);
}
}
/**
* 创建任务
*/
export async function POST(
request: NextRequest,
{ params }: { params: { tenantId: string } }
) {
try {
const tenantId = parseInt(params.tenantId);
const { project_id, title, description, assigned_to } = await request.json();
const session = await getSession(request);
const task = await tenantManager.withTenant(tenantId, async (client) => {
// 先验证项目属于当前租户(RLS 会自动处理)
const projectCheck = await client.query(
'SELECT id FROM projects WHERE id = $1',
[project_id]
);
if (projectCheck.rowCount === 0) {
throw new Error('项目不存在或无权访问');
}
// 如果指定了受派人,验证其属于当前租户
if (assigned_to) {
const userCheck = await client.query(
'SELECT id FROM users WHERE id = $1',
[assigned_to]
);
if (userCheck.rowCount === 0) {
throw new Error('受指派用户不存在或无权访问');
}
}
// 创建任务
const result = await client.query(
`INSERT INTO tasks (tenant_id, project_id, title, description, assigned_to, created_by)
VALUES ($1, $2, $3, $4, $5, $6)
RETURNING *`,
[tenantId, project_id, title, description, assigned_to, session.userId]
);
return result.rows[0];
});
return NextResponse.json({ task }, { status: 201 });
} catch (error: any) {
return NextResponse.json(
{ error: error.message || '创建任务失败' },
{ status: error.message?.includes('不存在') ? 400 : 500 }
);
}
}
Code collapsed
性能优化
1. 索引策略
code
-- 租户 ID 应该是所有查询的首选过滤条件
-- 复合索引应将 tenant_id 放在第一位
-- 示例:按状态查询任务的任务索引
CREATE INDEX idx_tasks_tenant_status
ON tasks(tenant_id, status)
WHERE status IN ('todo', 'in_progress');
-- 示例:按项目查询任务
CREATE INDEX idx_tasks_tenant_project
ON tasks(tenant_id, project_id);
-- 示例:按创建者查询
CREATE INDEX idx_tasks_tenant_creator
ON tasks(tenant_id, created_by);
-- 部分索引(只索引活跃租户)
CREATE INDEX idx_projects_active_tenants
ON projects(tenant_id, created_at DESC)
WHERE tenant_id IN (SELECT id FROM tenants WHERE status = 'active');
Code collapsed
2. 查询优化
code
// lib/db/query-optimizer.ts
export class OptimizedQueries {
/**
* 批量获取租户数据(减少数据库往返)
*/
static async getTasksWithRelations(tenantId: number, taskIds: number[]) {
return await tenantManager.withTenant(tenantId, async (client) => {
// 单次查询获取所有相关数据
const result = await client.query(`
WITH task_data AS (
SELECT * FROM tasks WHERE id = ANY($1)
),
project_data AS (
SELECT p.* FROM projects p
JOIN task_data t ON t.project_id = p.id
),
user_data AS (
SELECT DISTINCT u.*
FROM users u
JOIN task_data t ON t.assigned_to = u.id OR t.created_by = u.id
)
SELECT
jsonb_build_object(
'tasks', jsonb_agg(jsonb_build_object(
'id', t.id,
'title', t.title,
'status', t.status,
'project', (SELECT row_to_json(p) FROM project_data p WHERE p.id = t.project_id),
'assignee', (SELECT row_to_json(u) FROM user_data u WHERE u.id = t.assigned_to)
))
) as data
FROM task_data t
`, [taskIds]);
return result.rows[0].data.tasks;
});
}
/**
* 使用物化视图加速常用查询
*/
static async getTenantStats(tenantId: number) {
return await tenantManager.withTenant(tenantId, async (client) => {
// 使用预计算的统计数据
const result = await client.query(`
SELECT * FROM tenant_stats_mv
WHERE tenant_id = $1
`, [tenantId]);
return result.rows[0];
});
}
}
Code collapsed
3. 连接池配置
code
// lib/db/pool-config.ts
import { Pool } from 'pg';
export const pool = new Pool({
host: process.env.DB_HOST,
port: parseInt(process.env.DB_PORT || '5432'),
database: process.env.DB_NAME,
user: process.env.DB_USER,
password: process.env.DB_PASSWORD,
// 连接池配置
max: 20, // 最大连接数
idleTimeoutMillis: 30000,
connectionTimeoutMillis: 2000,
// RLS 优化
statement_timeout: 10000,
// 启用 prepared statements
prepare: true,
});
// 监控连接池
pool.on('connect', () => {
console.log('新建数据库连接');
});
pool.on('remove', () => {
console.log('释放数据库连接');
});
pool.on('error', (err) => {
console.error('数据库连接池错误:', err);
});
Code collapsed
安全检查清单
RLS 配置
- 所有租户相关表启用 RLS
- 策略覆盖所有 SQL 操作(SELECT、INSERT、UPDATE、DELETE)
- 租户 ID 不可伪造(由应用层验证)
- 没有
ALL或默认允许策略
数据访问
- 使用租户上下文而非拼接 SQL
- 验证用户对租户的访问权限
- 敏感操作记录审计日志
- 实施数据加密(静态和传输)
性能监控
- 监控 RLS 策略性能影响
- 使用 EXPLAIN ANALYZE 优化查询
- 定期审查和优化索引
- 监控连接池使用情况
租户隔离
- 租户间数据完全隔离
- 跨租户查询被阻止
- 租户删除时数据正确清理
- 支持租户数据导出/删除
参考资料
免责声明:RLS 提供了强大的安全层,但仍应结合应用层安全措施。在实施前请进行全面的安全测试。