康心伴Logo
康心伴WellAlly
Data & Privacy

PostgreSQL 多租户数据库架构设计:SaaS 应用完整指南

5 分钟阅读

PostgreSQL 多租户数据库架构设计:SaaS 应用完整指南

概述

多租户架构是 SaaS 应用的核心设计决策之一。正确的数据库架构选择会影响安全性、性能、成本和可扩展性。本文将详细介绍 PostgreSQL 多租户架构的三种主要模式。

多租户隔离策略对比

策略隔离级别成本复杂度适用场景
独立数据库完全隔离大型企业、合规要求高
独立 Schema逻辑隔离中型企业、需要备份隔离
共享表行级隔离初创公司、大量小租户

架构选择决策树

code
是否有严格合规要求(HIPAA、SOC2)?
├─ 是 → 独立数据库
└─ 否
   └─ 租户数量 < 100 且每个租户数据量 > 10GB?
       ├─ 是 → 独立 Schema
       └─ 否 → 共享表 + RLS
Code collapsed

架构一:共享表 + 行级安全(RLS)

数据库设计

code
-- =============================================================================
-- 共享表架构 - 适合大量小租户
-- =============================================================================

-- 租户表
CREATE TABLE tenants (
    id BIGSERIAL PRIMARY KEY,
    uuid UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    slug VARCHAR(100) UNIQUE NOT NULL,
    plan VARCHAR(50) DEFAULT 'free',
    max_users INTEGER DEFAULT 5,
    max_storage_mb INTEGER DEFAULT 1024,
    settings JSONB DEFAULT '{}',
    status VARCHAR(20) DEFAULT 'active', -- 'active', 'suspended', 'cancelled'
    trial_ends_at TIMESTAMP WITH TIME ZONE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- 租户使用配额
CREATE TABLE tenant_quotas (
    tenant_id BIGINT PRIMARY KEY REFERENCES tenants(id) ON DELETE CASCADE,
    users_count INTEGER DEFAULT 0,
    storage_used_mb INTEGER DEFAULT 0,
    api_calls_month INTEGER DEFAULT 0,
    last_reset_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- 用户表(多租户)
CREATE TABLE users (
    id BIGSERIAL PRIMARY KEY,
    tenant_id BIGINT NOT NULL REFERENCES tenants(id) ON DELETE CASCADE,
    uuid UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
    email VARCHAR(255) NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    name VARCHAR(255),
    avatar_url TEXT,
    role VARCHAR(50) DEFAULT 'member',
    permissions JSONB DEFAULT '{}',
    email_verified_at TIMESTAMP WITH TIME ZONE,
    last_login_at TIMESTAMP WITH TIME ZONE,
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    UNIQUE(tenant_id, email)
);

-- 启用 RLS
ALTER TABLE tenants ENABLE ROW LEVEL SECURITY;
ALTER TABLE tenant_quotas ENABLE ROW LEVEL SECURITY;
ALTER TABLE users ENABLE ROW LEVEL SECURITY;

-- 索引优化
CREATE INDEX idx_users_tenant_email ON users(tenant_id, email);
CREATE INDEX idx_users_tenant_role ON users(tenant_id, role);
CREATE INDEX idx_users_last_login ON users(last_login_at) WHERE last_login_at > NOW() - INTERVAL '30 days';
CREATE INDEX idx_tenants_slug ON tenants(slug) WHERE status = 'active';
CREATE INDEX idx_tenants_plan ON tenants(plan);
Code collapsed

租户上下文管理

code
-- 租户上下文管理
CREATE OR REPLACE FUNCTION set_tenant_context(tenant_uuid UUID)
RETURNS BIGINT AS $$
DECLARE
    v_tenant_id BIGINT;
BEGIN
    -- 验证租户存在且活跃
    SELECT id INTO v_tenant_id
    FROM tenants
    WHERE uuid = tenant_uuid AND status = 'active';

    IF v_tenant_id IS NULL THEN
        RAISE EXCEPTION '租户不存在或已停用';
    END IF;

    -- 设置租户上下文
    PERFORM set_config('app.current_tenant_id', v_tenant_id::text, FALSE);
    PERFORM set_config('app.current_tenant_uuid', tenant_uuid::text, FALSE);

    RETURN v_tenant_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 获取当前租户 ID
CREATE OR REPLACE FUNCTION current_tenant_id()
RETURNS BIGINT AS $$
BEGIN
    RETURN NULLIF(current_setting('app.current_tenant_id', TRUE), '')::BIGINT;
EXCEPTION
    WHEN OTHERS THEN
        RETURN NULL;
END;
$$ LANGUAGE plpgsql STABLE;

-- 获取当前租户 UUID
CREATE OR REPLACE FUNCTION current_tenant_uuid()
RETURNS UUID AS $$
BEGIN
    RETURN NULLIF(current_setting('app.current_tenant_uuid', TRUE), '')::UUID;
EXCEPTION
    WHEN OTHERS THEN
        RETURN NULL;
END;
$$ LANGUAGE plpgsql STABLE;
Code collapsed

RLS 策略

code
-- =============================================================================
-- RLS 策略定义
-- =============================================================================

-- 租户只能读取自己的数据
CREATE POLICY tenants_select_own
ON tenants
FOR SELECT
USING (id = current_tenant_id());

-- 用户策略
CREATE POLICY users_select_own_tenant
ON users
FOR SELECT
USING (tenant_id = current_tenant_id());

CREATE POLICY users_insert_own_tenant
ON users
FOR INSERT
WITH CHECK (tenant_id = current_tenant_id());

CREATE POLICY users_update_own_tenant
ON users
FOR UPDATE
USING (tenant_id = current_tenant_id())
WITH CHECK (tenant_id = current_tenant_id());

-- 配额策略
CREATE POLICY quotas_select_own_tenant
ON tenant_quotas
FOR SELECT
USING (tenant_id = current_tenant_id());
Code collapsed

架构二:独立 Schema

Schema 管理系统

code
-- =============================================================================
-- 独立 Schema 架构 - 适合中型企业
-- =============================================================================

-- 主 Schema 存储租户元数据
CREATE SCHEMA main;

-- 租户注册表
CREATE TABLE main.tenants (
    id BIGSERIAL PRIMARY KEY,
    uuid UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    slug VARCHAR(100) UNIQUE NOT NULL,
    schema_name VARCHAR(100) UNIQUE NOT NULL,
    plan VARCHAR(50) DEFAULT 'professional',
    db_size_limit_mb INTEGER DEFAULT 10240,
    status VARCHAR(20) DEFAULT 'active',
    settings JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- 模板 Schema(用于创建新租户)
CREATE SCHEMA tenant_template;

-- 在模板中创建表结构
CREATE TABLE tenant_template.users (
    id BIGSERIAL PRIMARY KEY,
    uuid UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
    email VARCHAR(255) UNIQUE NOT NULL,
    password_hash VARCHAR(255) NOT NULL,
    name VARCHAR(255),
    role VARCHAR(50) DEFAULT 'member',
    is_active BOOLEAN DEFAULT TRUE,
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

CREATE TABLE tenant_template.projects (
    id BIGSERIAL PRIMARY KEY,
    uuid UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    description TEXT,
    status VARCHAR(50) DEFAULT 'active',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- 创建索引
CREATE INDEX idx_users_email ON tenant_template.users(email);
CREATE INDEX idx_projects_status ON tenant_template.projects(status);
Code collapsed

Schema 创建函数

code
-- =============================================================================
-- Schema 管理函数
-- =============================================================================

-- 创建新租户 Schema
CREATE OR REPLACE FUNCTION main.create_tenant_schema(
    p_tenant_id BIGINT,
    p_schema_name VARCHAR
)
RETURNS VOID AS $$
DECLARE
    v_template_table TEXT;
BEGIN
    -- 创建新 Schema
    EXECUTE format('CREATE SCHEMA IF NOT EXISTS %I', p_schema_name);

    -- 从模板复制表结构
    FOR v_template_table IN
        SELECT tablename FROM pg_tables
        WHERE schemaname = 'tenant_template'
    LOOP
        EXECUTE format(
            'CREATE TABLE %I.%I (LIKE %I.%I INCLUDING ALL)',
            p_schema_name, v_template_table,
            'tenant_template', v_template_table
        );
    END LOOP;

    -- 设置默认权限
    EXECUTE format(
        'GRANT ALL ON SCHEMA %I TO app_user',
        p_schema_name
    );

    -- 记录创建时间
    UPDATE main.tenants
    SET schema_created_at = NOW()
    WHERE id = p_tenant_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 删除租户 Schema
CREATE OR REPLACE FUNCTION main.drop_tenant_schema(p_tenant_id BIGINT)
RETURNS VOID AS $$
DECLARE
    v_schema_name VARCHAR;
BEGIN
    -- 获取 Schema 名称
    SELECT schema_name INTO v_schema_name
    FROM main.tenants
    WHERE id = p_tenant_id;

    IF v_schema_name IS NULL THEN
        RAISE EXCEPTION '租户不存在';
    END IF;

    -- 删除 Schema 及其所有对象
    EXECUTE format('DROP SCHEMA IF EXISTS %I CASCADE', v_schema_name);

    -- 更新租户状态
    UPDATE main.tenants
    SET status = 'deleted', schema_dropped_at = NOW()
    WHERE id = p_tenant_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;

-- 重命名 Schema(租户改名时)
CREATE OR REPLACE FUNCTION main.rename_tenant_schema(
    p_tenant_id BIGINT,
    p_new_schema_name VARCHAR
)
RETURNS VOID AS $$
DECLARE
    v_old_schema_name VARCHAR;
BEGIN
    -- 获取当前 Schema 名称
    SELECT schema_name INTO v_old_schema_name
    FROM main.tenants
    WHERE id = p_tenant_id;

    IF v_old_schema_name IS NULL THEN
        RAISE EXCEPTION '租户不存在';
    END IF;

    -- 重命名 Schema
    EXECUTE format(
        'ALTER SCHEMA %I RENAME TO %I',
        v_old_schema_name, p_new_schema_name
    );

    -- 更新记录
    UPDATE main.tenants
    SET schema_name = p_new_schema_name
    WHERE id = p_tenant_id;
END;
$$ LANGUAGE plpgsql SECURITY DEFINER;
Code collapsed

Node.js Schema 管理

code
// lib/db/schema-manager.ts
import { pool } from '@/lib/db';

export class TenantSchemaManager {
  /**
   * 为新租户创建独立 Schema
   */
  static async createForTenant(tenantId: number, slug: string): Promise<string> {
    const schemaName = `tenant_${slug}`;

    await pool.query('BEGIN');

    try {
      // 更新租户记录
      await pool.query(
        `UPDATE main.tenants
         SET schema_name = $1
         WHERE id = $2
         RETURNING uuid`,
        [schemaName, tenantId]
      );

      // 调用数据库函数创建 Schema
      await pool.query(
        'SELECT main.create_tenant_schema($1, $2)',
        [tenantId, schemaName]
      );

      await pool.query('COMMIT');

      return schemaName;
    } catch (error) {
      await pool.query('ROLLBACK');
      throw error;
    }
  }

  /**
   * 获取租户的 Schema 名称
   */
  static async getSchemaName(tenantUuid: string): Promise<string | null> {
    const result = await pool.query(
      `SELECT schema_name FROM main.tenants
       WHERE uuid = $1 AND status = 'active'`,
      [tenantUuid]
    );

    return result.rows[0]?.schema_name || null;
  }

  /**
   * 在租户 Schema 中执行查询
   */
  static async withTenantSchema<T>(
    tenantUuid: string,
    callback: (schemaName: string) => Promise<T>
  ): Promise<T> {
    const schemaName = await this.getSchemaName(tenantUuid);

    if (!schemaName) {
      throw new Error('租户不存在或已停用');
    }

    return await callback(schemaName);
  }

  /**
   * 删除租户 Schema
   */
  static async dropForTenant(tenantId: number): Promise<void> {
    await pool.query(
      'SELECT main.drop_tenant_schema($1)',
      [tenantId]
    );
  }

  /**
   * 获取租户数据库大小
   */
  static async getTenantStorageUsage(tenantId: number): Promise<number> {
    const result = await pool.query(`
      SELECT pg_size_pretty(pg_database_size(current_database())) as db_size
    `);

    const schemaName = await pool.query(
      `SELECT schema_name FROM main.tenants WHERE id = $1`,
      [tenantId]
    );

    // 获取特定 Schema 的大小(需要更复杂的查询)
    const sizeResult = await pool.query(`
      SELECT
        pg_size_pretty(sum(pg_relation_size(quote_ident(schemaname)||'.'||quote_ident(tablename))))::bigint as size
      FROM pg_tables
      WHERE schemaname = $1
    `, [schemaName.rows[0].schema_name]);

    return parseInt(sizeResult.rows[0]?.size || '0');
  }
}
Code collapsed

Schema 架构的 API 实现

code
// app/api/[tenantSlug]/projects/route.ts
import { NextRequest, NextResponse } from 'next/server';
import { TenantSchemaManager } from '@/lib/db/schema-manager';

/**
 * 获取租户项目列表
 */
export async function GET(
  request: NextRequest,
  { params }: { params: { tenantSlug: string } }
) {
  try {
    const tenantSlug = params.tenantSlug;

    // 获取租户 UUID(从子域名或请求头)
    const tenantUuid = await getTenantUuidFromSlug(tenantSlug);

    // 在租户 Schema 中执行查询
    const projects = await TenantSchemaManager.withTenantSchema(
      tenantUuid,
      async (schemaName) => {
        const result = await pool.query(
          `SELECT * FROM ${schemaName}.projects ORDER BY created_at DESC`
        );
        return result.rows;
      }
    );

    return NextResponse.json({ projects });
  } catch (error) {
    return NextResponse.json(
      { error: '获取项目失败' },
      { status: 500 }
    );
  }
}

/**
 * 创建新项目
 */
export async function POST(
  request: NextRequest,
  { params }: { params: { tenantSlug: string } }
) {
  try {
    const tenantSlug = params.tenantSlug;
    const { name, description } = await request.json();
    const tenantUuid = await getTenantUuidFromSlug(tenantSlug);

    const project = await TenantSchemaManager.withTenantSchema(
      tenantUuid,
      async (schemaName) => {
        const result = await pool.query(
          `INSERT INTO ${schemaName}.projects (name, description)
           VALUES ($1, $2)
           RETURNING *`,
          [name, description]
        );
        return result.rows[0];
      }
    );

    return NextResponse.json({ project }, { status: 201 });
  } catch (error) {
    return NextResponse.json(
      { error: '创建项目失败' },
      { status: 500 }
    );
  }
}
Code collapsed

架构三:独立数据库

数据库管理系统

code
-- =============================================================================
-- 租户管理数据库
-- =============================================================================

-- 创建专门的租户管理数据库
CREATE DATABASE tenant_registry;

-- 连接到租户注册数据库
\c tenant_registry

-- 租户注册表
CREATE TABLE tenant_databases (
    id BIGSERIAL PRIMARY KEY,
    uuid UUID DEFAULT gen_random_uuid() UNIQUE NOT NULL,
    name VARCHAR(255) NOT NULL,
    slug VARCHAR(100) UNIQUE NOT NULL,
    db_name VARCHAR(100) UNIQUE NOT NULL,
    db_host VARCHAR(255) DEFAULT 'localhost',
    db_port INTEGER DEFAULT 5432,
    plan VARCHAR(50) DEFAULT 'enterprise',
    max_storage_gb INTEGER DEFAULT 100,
    backup_retention_days INTEGER DEFAULT 30,
    status VARCHAR(20) DEFAULT 'active',
    settings JSONB DEFAULT '{}',
    created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);

-- 数据库使用统计
CREATE TABLE tenant_db_stats (
    tenant_id BIGINT PRIMARY KEY REFERENCES tenant_databases(id) ON DELETE CASCADE,
    db_size_gb DECIMAL(10, 2) DEFAULT 0,
    table_count INTEGER DEFAULT 0,
    index_count INTEGER DEFAULT 0,
    row_count_estimate BIGINT DEFAULT 0,
    last_backup_at TIMESTAMP WITH TIME ZONE,
    last_vacuum_at TIMESTAMP WITH TIME ZONE,
    last_analyze_at TIMESTAMP WITH TIME ZONE,
    updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
);
Code collapsed

自动化数据库创建

code
#!/bin/bash
# scripts/create-tenant-db.sh

set -e

TENANT_SLUG=$1
TENANT_NAME=$2
DB_NAME="tenant_${TENANT_SLUG}"
DB_USER="${DB_NAME}_user"
DB_PASSWORD=$(openssl rand -base64 32)

# 检查参数
if [ -z "$TENANT_SLUG" ] || [ -z "$TENANT_NAME" ]; then
    echo "用法: $0 <tenant_slug> <tenant_name>"
    exit 1
fi

# 创建数据库
echo "创建数据库: $DB_NAME"
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL
    -- 创建数据库
    CREATE DATABASE $DB_NAME;

    -- 创建用户
    CREATE USER $DB_USER WITH PASSWORD '$DB_PASSWORD';

    -- 授权
    GRANT ALL PRIVILEGES ON DATABASE $DB_NAME TO $DB_USER;

    -- 记录到注册表
    INSERT INTO tenant_databases (slug, name, db_name, db_host, db_port)
    VALUES ('$TENANT_SLUG', '$TENANT_NAME', '$DB_NAME', 'localhost', 5432);
EOSQL

# 连接到新数据库并创建表结构
echo "初始化数据库结构..."
psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$DB_NAME" < /docker-entrypoint-initdb.d/tenant-schema.sql

echo "数据库创建完成: $DB_NAME"
echo "用户: $DB_USER"
echo "密码: $DB_PASSWORD"
Code collapsed

Node.js 数据库管理

code
// lib/db/tenant-db-manager.ts
import { Pool } from 'pg';

interface TenantDatabaseConfig {
  dbName: string;
  dbHost: string;
  dbPort: number;
  user: string;
  password: string;
}

export class TenantDatabaseManager {
  private static registryPool = new Pool({
    host: process.env.REGISTRY_DB_HOST,
    database: 'tenant_registry',
    user: process.env.DB_USER,
    password: process.env.DB_PASSWORD,
  });

  private static tenantPools: Map<string, Pool> = new Map();

  /**
   * 为新租户创建独立数据库
   */
  static async createForTenant(
    tenantId: number,
    slug: string,
    name: string
  ): Promise<string> {
    const dbName = `tenant_${slug}`;
    const dbUser = `${dbName}_user`;
    const dbPassword = this.generateSecurePassword();

    const client = await this.registryPool.connect();

    try {
      await client.query('BEGIN');

      // 创建数据库
      await client.query(
        `SELECT create_database_with_user($1, $2, $3)`,
        [dbName, dbUser, dbPassword]
      );

      // 记录到注册表
      const result = await client.query(
        `INSERT INTO tenant_databases (slug, name, db_name, db_user, db_password)
         VALUES ($1, $2, $3, $4, $5)
         RETURNING uuid`,
        [slug, name, dbName, dbUser, dbPassword]
      );

      // 初始化数据库结构
      await this.initializeDatabase(dbName, dbUser, dbPassword);

      await client.query('COMMIT');

      return result.rows[0].uuid;
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  /**
   * 获取租户数据库连接池
   */
  static async getTenantPool(tenantUuid: string): Promise<Pool> {
    // 检查是否已有缓存的连接池
    if (this.tenantPools.has(tenantUuid)) {
      return this.tenantPools.get(tenantUuid)!;
    }

    // 从注册表获取数据库配置
    const result = await this.registryPool.query(
      `SELECT db_name, db_host, db_port, db_user, db_password
       FROM tenant_databases
       WHERE uuid = $1 AND status = 'active'`,
      [tenantUuid]
    );

    if (result.rows.length === 0) {
      throw new Error('租户数据库不存在或已停用');
    }

    const config = result.rows[0];

    // 创建新连接池
    const pool = new Pool({
      host: config.db_host,
      database: config.db_name,
      port: config.db_port,
      user: config.db_user,
      password: config.db_password,
      max: 10, // 租户独立连接池可以小一些
    });

    this.tenantPools.set(tenantUuid, pool);

    return pool;
  }

  /**
   * 在租户数据库中执行查询
   */
  static async withTenantDatabase<T>(
    tenantUuid: string,
    callback: (pool: Pool) => Promise<T>
  ): Promise<T> {
    const pool = await this.getTenantPool(tenantUuid);
    return await callback(pool);
  }

  /**
   * 删除租户数据库
   */
  static async dropForTenant(tenantUuid: string): Promise<void> {
    const client = await this.registryPool.connect();

    try {
      await client.query('BEGIN');

      // 获取数据库名称
      const result = await client.query(
        'SELECT db_name FROM tenant_databases WHERE uuid = $1',
        [tenantUuid]
      );

      if (result.rows.length === 0) {
        throw new Error('租户数据库不存在');
      }

      const dbName = result.rows[0].db_name;

      // 终止所有连接
      await client.query(
        `SELECT pg_terminate_backend(pid)
         FROM pg_stat_activity
         WHERE datname = $1`,
        [dbName]
      );

      // 删除数据库
      await client.query(`DROP DATABASE IF EXISTS ${dbName}`);

      // 更新注册表
      await client.query(
        `UPDATE tenant_databases
         SET status = 'deleted', dropped_at = NOW()
         WHERE uuid = $1`,
        [tenantUuid]
      );

      await client.query('COMMIT');

      // 清理连接池缓存
      this.tenantPools.delete(tenantUuid);
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  /**
   * 获取租户数据库统计信息
   */
  static async getTenantStats(tenantUuid: string) {
    return await this.withTenantDatabase(tenantUuid, async (pool) => {
      const result = await pool.query(`
        SELECT
          pg_size_pretty(pg_database_size(current_database())) as db_size,
          (SELECT count(*) FROM pg_tables) as table_count,
          (SELECT count(*) FROM pg_indexes) as index_count
      `);

      return result.rows[0];
    });
  }

  private static generateSecurePassword(): string {
    return require('crypto').randomBytes(32).toString('base64');
  }

  private static async initializeDatabase(
    dbName: string,
    user: string,
    password: string
  ): Promise<void> {
    const pool = new Pool({
      database: dbName,
      user,
      password,
      host: 'localhost',
    });

    // 执行初始化 SQL
    const initSQL = await require('fs').promises.readFile(
      '/docker-entrypoint-initdb.d/tenant-schema.sql',
      'utf8'
    );

    await pool.query(initSQL);
    await pool.end();
  }
}
Code collapsed

迁移策略

从共享表迁移到独立 Schema

code
-- =============================================================================
-- 迁移脚本:共享表 → 独立 Schema
-- =============================================================================

CREATE OR REPLACE FUNCTION migrate_tenant_to_schema(
    p_tenant_id BIGINT
)
RETURNS VOID AS $$
DECLARE
    v_schema_name VARCHAR;
    v_tenant_uuid UUID;
BEGIN
    -- 获取租户信息
    SELECT id, uuid INTO v_tenant_id, v_tenant_uuid
    FROM tenants
    WHERE id = p_tenant_id;

    -- 创建 Schema
    v_schema_name := 'tenant_' || v_tenant_id;
    PERFORM main.create_tenant_schema(v_tenant_id, v_schema_name);

    -- 迁移用户数据
    EXECUTE format(
        'INSERT INTO %I.users SELECT * FROM users WHERE tenant_id = %L',
        v_schema_name, p_tenant_id
    );

    -- 迁移项目数据
    EXECUTE format(
        'INSERT INTO %I.projects SELECT * FROM projects WHERE tenant_id = %L',
        v_schema_name, p_tenant_id
    );

    -- 迁移任务数据
    EXECUTE format(
        'INSERT INTO %I.tasks SELECT * FROM tasks WHERE tenant_id = %L',
        v_schema_name, p_tenant_id
    );

    -- 更新租户记录
    UPDATE tenants
    SET architecture = 'schema', schema_name = v_schema_name
    WHERE id = p_tenant_id;

    -- 删除共享表中的数据(可选,也可以保留)
    -- DELETE FROM users WHERE tenant_id = p_tenant_id;
    -- DELETE FROM projects WHERE tenant_id = p_tenant_id;
    -- DELETE FROM tasks WHERE tenant_id = p_tenant_id;
END;
$$ LANGUAGE plpgsql;
Code collapsed

数据迁移工具

code
// lib/migration/tenant-migrator.ts
import { pool } from '@/lib/db';

export class TenantMigrator {
  /**
   * 迁移单个租户到独立 Schema
   */
  static async migrateToSchema(tenantId: number): Promise<void> {
    const client = await pool.connect();

    try {
      await client.query('BEGIN');

      // 获取租户信息
      const tenant = await client.query(
        'SELECT id, slug FROM tenants WHERE id = $1',
        [tenantId]
      );

      if (tenant.rows.length === 0) {
        throw new Error('租户不存在');
      }

      const slug = tenant.rows[0].slug;

      // 创建独立 Schema
      await client.query(
        'SELECT migrate_tenant_to_schema($1)',
        [tenantId]
      );

      await client.query('COMMIT');

      console.log(`租户 ${slug} 已迁移到独立 Schema`);
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  /**
   * 批量迁移大型租户
   */
  static async migrateLargeTenants(
    minDataSize: number = 1073741824 // 1GB
  ): Promise<void> {
    // 找出数据量大的租户
    const result = await pool.query(`
      SELECT tenant_id,
             pg_total_relation_size('users') as user_size,
             pg_total_relation_size('projects') as project_size,
             pg_total_relation_size('tasks') as task_size
      FROM (
        SELECT DISTINCT tenant_id FROM users
      ) t
      WHERE user_size + project_size + task_size > $1
    `, [minDataSize]);

    for (const row of result.rows) {
      try {
        await this.migrateToSchema(row.tenant_id);
      } catch (error) {
        console.error(`迁移租户 ${row.tenant_id} 失败:`, error);
      }
    }
  }
}
Code collapsed

安全检查清单

共享表架构

  • 所有表启用 RLS
  • 租户上下文正确设置
  • 索引包含 tenant_id
  • 跨租户查询被阻止
  • 租户数据可独立删除

独立 Schema

  • Schema 命名规范统一
  • 从模板复制结构
  • 权限正确设置
  • Schema 可独立备份
  • 支持租户间迁移

独立数据库

  • 数据库连接池管理
  • 自动化创建/删除
  • 独立备份策略
  • 资源配额限制
  • 监控和告警

参考资料


免责声明:架构选择应基于具体业务需求和规模。建议在实施前进行充分的技术评估和性能测试。

#

文章标签

postgres
multi-tenant
database
saas
architecture
schema-design

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅