康心伴Logo
康心伴WellAlly
Data & Privacy

CQRS 模式实践:构建可扩展的实时分析系统

5 分钟阅读

CQRS 模式实践:构建可扩展的实时分析系统

概述

CQRS(Command Query Responsibility Segregation,命令查询责任分离)是一种将系统读写操作分离的架构模式。对于需要处理大量数据分析的健康平台,CQRS 可以显著提升性能和可扩展性。

为什么使用 CQRS?

传统架构CQRS 架构
读写使用相同模型读写使用不同模型
单一数据存储读写分离存储
查询影响写性能读写独立优化
难以应对复杂查询专用读模型
扩展受限独立扩展读写

适用场景

  • 高读写比:读取远多于写入
  • 复杂查询:需要聚合、分组、分析
  • 实时分析:需要即时统计和报告
  • 多数据源:整合来自不同服务的数据
  • 性能要求高:读写性能要求不同

1. CQRS 基础架构

核心组件

code
// cqrs/core/command.ts

/**
 * 命令:表示写操作的意图
 */
export interface ICommand<T extends ICommandResult> {
  id: string;
  aggregateId: string;
  timestamp: Date;
  expectedVersion?: number;
}

export interface ICommandResult {
  success: boolean;
  events: DomainEvent[];
  errors?: string[];
}

/**
 * 命令处理器接口
 */
export interface ICommandHandler<T extends ICommand<any>> {
  handle(command: T): Promise<ICommandResult>;
}

/**
 * 命令总线
 */
export class CommandBus {
  private handlers: Map<string, ICommandHandler<any>> = new Map();

  register<T extends ICommand<any>>(
    commandName: string,
    handler: ICommandHandler<T>
  ): void {
    this.handlers.set(commandName, handler);
  }

  async dispatch<T extends ICommand<any>>(command: T): Promise<ICommandResult> {
    const commandName = command.constructor.name;
    const handler = this.handlers.get(commandName);

    if (!handler) {
      throw new Error(`未找到命令处理器: ${commandName}`);
    }

    return await handler.handle(command);
  }

  async dispatchBatch<T extends ICommand<any>>(
    commands: T[]
  ): Promise<ICommandResult[]> {
    return await Promise.all(
      commands.map(cmd => this.dispatch(cmd))
    );
  }
}

// cqrs/core/query.ts

/**
 * 查询:表示读操作的请求
 */
export interface IQuery<T> {
  params: any;
}

/**
 * 查询处理器接口
 */
export interface IQueryHandler<T extends IQuery<any>, R> {
  handle(query: T): Promise<R>;
}

/**
 * 查询总线
 */
export class QueryBus {
  private handlers: Map<string, IQueryHandler<any, any>> = new Map();

  register<T extends IQuery<any>, R>(
    queryName: string,
    handler: IQueryHandler<T, R>
  ): void {
    this.handlers.set(queryName, handler);
  }

  async execute<T extends IQuery<any>, R>(
    queryName: string,
    query: T
  ): Promise<R> {
    const handler = this.handlers.get(queryName);

    if (!handler) {
      throw new Error(`未找到查询处理器: ${queryName}`);
    }

    return await handler.handle(query);
  }

  async executeBatch<T extends IQuery<any>, R>(
    queries: Array<{ name: string; query: T }>
  ): Promise<R[]> {
    return await Promise.all(
      queries.map(({ name, query }) => this.execute(name, query))
    );
  }
}
Code collapsed

2. 命令端实现

命令定义

code
// health-records/commands/create-health-record.command.ts

export interface CreateHealthRecordCommand extends ICommand<CreateHealthRecordResult> {
  id: string;
  aggregateId: string; // userId
  recordType: HealthRecordType;
  data: HealthDataInput[];
  source: DataSource;
  recordedAt?: Date;
}

export interface CreateHealthRecordResult extends ICommandResult {
  recordId: string;
}

// health-records/commands/update-health-data.command.ts

export interface UpdateHealthDataCommand extends ICommand<UpdateHealthDataResult> {
  id: string;
  aggregateId: string; // recordId
  data: HealthDataInput[];
  updatedAt?: Date;
}

export interface UpdateHealthDataResult extends ICommandResult {
  recordId: string;
  previousData: HealthData[];
  newData: HealthData[];
}

// health-records/commands/delete-health-record.command.ts

export interface DeleteHealthRecordCommand extends ICommand<DeleteHealthRecordResult> {
  id: string;
  aggregateId: string; // recordId
  reason: string;
  deletedBy: string;
}

export interface DeleteHealthRecordResult extends ICommandResult {
  recordId: string;
  deletedAt: Date;
}
Code collapsed

命令处理器

code
// health-records/command-handlers/create-health-record.handler.ts

export class CreateHealthRecordCommandHandler
  implements ICommandHandler<CreateHealthRecordCommand>
{
  constructor(
    private eventStore: IEventStore,
    private repository: HealthRecordRepository
  ) {}

  async handle(command: CreateHealthRecordCommand): Promise<CreateHealthRecordResult> {
    try {
      // 1. 加载聚合(用户健康记录聚合)
      const aggregate = await this.repository.loadUserHealthAggregate(command.aggregateId);

      // 2. 执行业务逻辑
      const result = aggregate.createRecord({
        recordType: command.recordType,
        data: command.data.map(d => new HealthData(d.type, d.value, d.unit)),
        source: command.source,
        recordedAt: command.recordedAt || new Date(),
      });

      // 3. 保存事件
      await this.eventStore.appendEvents(command.aggregateId, result.events);

      // 4. 发布事件到事件总线
      for (const event of result.events) {
        await this.eventBus.publish(event);
      }

      return {
        success: true,
        events: result.events,
        recordId: result.recordId,
      };
    } catch (error) {
      return {
        success: false,
        events: [],
        errors: [error.message],
      };
    }
  }
}

// health-records/command-handlers/update-health-data.handler.ts

export class UpdateHealthDataCommandHandler
  implements ICommandHandler<UpdateHealthDataCommand>
{
  constructor(
    private eventStore: IEventStore,
    private repository: HealthRecordRepository,
    private validator: HealthDataValidator
  ) {}

  async handle(command: UpdateHealthDataCommand): Promise<UpdateHealthDataResult> {
    try {
      // 1. 验证数据
      const validationResult = await this.validator.validate(command.data);
      if (!validationResult.isValid) {
        return {
          success: false,
          events: [],
          errors: validationResult.errors,
        };
      }

      // 2. 加载聚合
      const aggregate = await this.repository.load(command.aggregateId);

      // 3. 执行更新
      const result = aggregate.updateData({
        data: command.data.map(d => new HealthData(d.type, d.value, d.unit)),
        updatedAt: command.updatedAt || new Date(),
      });

      // 4. 保存事件
      await this.eventStore.appendEvents(command.aggregateId, result.events);

      // 5. 发布事件
      for (const event of result.events) {
        await this.eventBus.publish(event);
      }

      return {
        success: true,
        events: result.events,
        recordId: command.aggregateId,
        previousData: result.previousData,
        newData: result.newData,
      };
    } catch (error) {
      return {
        success: false,
        events: [],
        errors: [error.message],
      };
    }
  }
}
Code collapsed

3. 查询端实现

读模型设计

code
-- =============================================================================
-- CQRS 读模型:物化视图
-- =============================================================================

-- 用户健康数据摘要
CREATE MATERIALIZED VIEW user_health_summary_mv AS
SELECT
    u.id as user_id,
    u.tenant_id,
    COUNT(DISTINCT hr.id) as total_records,
    MAX(hr.recorded_at) as last_record_at,
    COUNT(DISTINCT CASE WHEN hr.record_type = 'blood_pressure' THEN hr.id END) as blood_pressure_count,
    COUNT(DISTINCT CASE WHEN hr.record_type = 'heart_rate' THEN hr.id END) as heart_rate_count,
    COUNT(DISTINCT CASE WHEN hr.record_type = 'blood_glucose' THEN hr.id END) as blood_glucose_count,
    COUNT(DISTINCT CASE WHEN hr.record_type = 'weight' THEN hr.id END) as weight_count,
    -- 最新读数
    (SELECT json_agg(data)
     FROM health_record_data hrd
     WHERE hrd.record_id = (SELECT id FROM health_records hr2 WHERE hr2.user_id = u.id ORDER BY hr2.recorded_at DESC LIMIT 1)
    ) as latest_readings
FROM users u
LEFT JOIN health_records hr ON hr.user_id = u.id AND hr.deleted_at IS NULL
GROUP BY u.id, u.tenant_id;

-- 健康趋势分析(按时间聚合)
CREATE MATERIALIZED VIEW health_trends_mv AS
SELECT
    user_id,
    tenant_id,
    record_type,
    data_type,
    DATE_TRUNC('day', recorded_at) as date,
    AVG(data_value::numeric) as avg_value,
    MIN(data_value::numeric) as min_value,
    MAX(data_value::numeric) as max_value,
    STDDEV(data_value::numeric) as stddev_value,
    COUNT(*) as record_count
FROM (
    SELECT
        hr.user_id,
        hr.tenant_id,
        hr.record_type,
        hrd.data_type,
        hrd.data_value,
        hr.recorded_at
    FROM health_records hr
    JOIN health_record_data hrd ON hrd.record_id = hr.id
    WHERE hr.deleted_at IS NULL
) raw_data
GROUP BY user_id, tenant_id, record_type, data_type, DATE_TRUNC('day', recorded_at');

-- 健康告警统计
CREATE MATERIALIZED VIEW health_alerts_summary_mv AS
SELECT
    user_id,
    tenant_id,
    DATE_TRUNC('day', created_at) as date,
    COUNT(*) FILTER (WHERE severity = 'critical') as critical_count,
    COUNT(*) FILTER (WHERE severity = 'warning') as warning_count,
    COUNT(*) FILTER (WHERE severity = 'info') as info_count,
    array_agg(DISTINCT alert_type) as alert_types
FROM health_alerts
WHERE resolved_at IS NULL
GROUP BY user_id, tenant_id, DATE_TRUNC('day', created_at);

-- 创建索引加速查询
CREATE INDEX idx_user_health_summary_user ON user_health_summary_mv(user_id);
CREATE INDEX idx_user_health_summary_tenant ON user_health_summary_mv(tenant_id);
CREATE INDEX idx_health_trends_user_date ON health_trends_mv(user_id, date DESC);
CREATE INDEX idx_health_alerts_summary_user_date ON health_alerts_summary_mv(user_id, date DESC);

-- 定期刷新物化视图
CREATE OR REPLACE FUNCTION refresh_read_models()
RETURNS VOID AS $$
BEGIN
    REFRESH MATERIALIZED VIEW CONCURRENTLY user_health_summary_mv;
    REFRESH MATERIALIZED VIEW CONCURRENTLY health_trends_mv;
    REFRESH MATERIALIZED VIEW CONCURRENTLY health_alerts_summary_mv;
END;
$$ LANGUAGE plpgsql;

-- 设置定时任务(使用 pg_cron 或外部调度器)
-- SELECT cron.schedule('refresh-read-models', '*/5 * * * *', 'SELECT refresh_read_models()');
Code collapsed

查询处理器

code
// analytics/query-handlers/get-user-health-summary.handler.ts

export interface GetUserHealthSummaryQuery extends IQuery<UserHealthSummary> {
  params: {
    userId: string;
    includeTrends?: boolean;
    includeAlerts?: boolean;
  };
}

export interface UserHealthSummary {
  userId: string;
  tenantId: string;
  totalRecords: number;
  lastRecordAt: Date;
  recordCounts: Record<string, number>;
  latestReadings: HealthDataReading[];
  trends?: HealthTrend[];
  alerts?: HealthAlertSummary;
}

export class GetUserHealthSummaryQueryHandler
  implements IQueryHandler<GetUserHealthSummaryQuery, UserHealthSummary>
{
  constructor(
    private readDb: Database
  ) {}

  async handle(query: GetUserHealthSummaryQuery): Promise<UserHealthSummary> {
    const { userId, includeTrends, includeAlerts } = query.params;

    // 从物化视图获取基本摘要
    const summary = await this.readDb.oneOrNone(
      `SELECT * FROM user_health_summary_mv WHERE user_id = $1`,
      [userId]
    );

    if (!summary) {
      throw new Error('用户健康摘要不存在');
    }

    const result: UserHealthSummary = {
      userId: summary.user_id,
      tenantId: summary.tenant_id,
      totalRecords: summary.total_records,
      lastRecordAt: summary.last_record_at,
      recordCounts: {
        bloodPressure: summary.blood_pressure_count,
        heartRate: summary.heart_rate_count,
        bloodGlucose: summary.blood_glucose_count,
        weight: summary.weight_count,
      },
      latestReadings: summary.latest_readings || [],
    };

    // 可选:获取趋势数据
    if (includeTrends) {
      result.trends = await this.getTrends(userId);
    }

    // 可选:获取告警数据
    if (includeAlerts) {
      result.alerts = await this.getAlerts(userId);
    }

    return result;
  }

  private async getTrends(userId: string): Promise<HealthTrend[]> {
    return await this.readDb.any(
      `SELECT
        data_type,
        date,
        avg_value,
        min_value,
        max_value
       FROM health_trends_mv
       WHERE user_id = $1
       ORDER BY date DESC
       LIMIT 30`,
      [userId]
    );
  }

  private async getAlerts(userId: string): Promise<HealthAlertSummary> {
    return await this.readDb.one(
      `SELECT
        critical_count,
        warning_count,
        info_count,
        alert_types
       FROM health_alerts_summary_mv
       WHERE user_id = $1
       ORDER BY date DESC
       LIMIT 1`,
      [userId]
    );
  }
}

// analytics/query-handlers/get-health-analytics.handler.ts

export interface GetHealthAnalyticsQuery extends IQuery<HealthAnalytics> {
  params: {
    userId: string;
    period: 'week' | 'month' | 'quarter' | 'year';
    metrics: HealthMetricType[];
  };
}

export interface HealthAnalytics {
  userId: string;
  period: AnalyticsPeriod;
  metrics: MetricAnalytics[];
  insights: AnalyticsInsight[];
  generatedAt: Date;
}

export class GetHealthAnalyticsQueryHandler
  implements IQueryHandler<GetHealthAnalyticsQuery, HealthAnalytics>
{
  constructor(
    private readDb: Database,
    private analyzer: HealthAnalyzer
  ) {}

  async handle(query: GetHealthAnalyticsQuery): Promise<HealthAnalytics> {
    const { userId, period, metrics } = query.params;
    const { startDate, endDate } = this.getPeriodDates(period);

    // 并行获取所有指标数据
    const metricsData = await Promise.all(
      metrics.map(metric => this.getMetricData(userId, metric, startDate, endDate))
    );

    // 生成洞察
    const insights = await this.analyzer.generateInsights(metricsData);

    return {
      userId,
      period: { type: period, startDate, endDate },
      metrics: metricsData,
      insights,
      generatedAt: new Date(),
    };
  }

  private async getMetricData(
    userId: string,
    metric: HealthMetricType,
    startDate: Date,
    endDate: Date
  ): Promise<MetricAnalytics> {
    const result = await this.readDb.oneOrNone(
      `SELECT
        data_type,
        COUNT(*) as record_count,
        AVG(data_value::numeric) as avg_value,
        MIN(data_value::numeric) as min_value,
        MAX(data_value::numeric) as max_value,
        STDDEV(data_value::numeric) as stddev_value,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY data_value::numeric) as median_value
       FROM (
         SELECT hr.recorded_at, hrd.data_type, hrd.data_value
         FROM health_records hr
         JOIN health_record_data hrd ON hrd.record_id = hr.id
         WHERE hr.user_id = $1
           AND hrd.data_type = $2
           AND hr.recorded_at BETWEEN $3 AND $4
           AND hr.deleted_at IS NULL
       ) data
       GROUP BY data_type`,
      [userId, metric, startDate, endDate]
    );

    return {
      metric,
      recordCount: parseInt(result.record_count),
      statistics: {
        average: parseFloat(result.avg_value),
        min: parseFloat(result.min_value),
        max: parseFloat(result.max_value),
        stdDev: parseFloat(result.stddev_value),
        median: parseFloat(result.median_value),
      },
    };
  }

  private getPeriodDates(period: string): { startDate: Date; endDate: Date } {
    const now = new Date();
    const endDate = new Date(now.getFullYear(), now.getMonth(), now.getDate(), 23, 59, 59);

    let startDate: Date;

    switch (period) {
      case 'week':
        startDate = new Date(now);
        startDate.setDate(now.getDate() - 7);
        break;
      case 'month':
        startDate = new Date(now);
        startDate.setMonth(now.getMonth() - 1);
        break;
      case 'quarter':
        startDate = new Date(now);
        startDate.setMonth(now.getMonth() - 3);
        break;
      case 'year':
        startDate = new Date(now);
        startDate.setFullYear(now.getFullYear() - 1);
        break;
      default:
        startDate = new Date(now);
        startDate.setDate(now.getDate() - 30);
    }

    startDate.setHours(0, 0, 0, 0);

    return { startDate, endDate };
  }
}
Code collapsed

复杂分析查询

code
// analytics/query-handlers/get-comparative-analytics.handler.ts

export interface GetComparativeAnalyticsQuery extends IQuery<ComparativeAnalytics> {
  params: {
    userIds: string[];
    metricTypes: HealthMetricType[];
    period: AnalyticsPeriod;
    groupBy?: 'age' | 'gender' | 'region';
  };
}

export interface ComparativeAnalytics {
  period: AnalyticsPeriod;
  users: UserAnalytics[];
  aggregates: {
    averages: Record<string, number>;
    distributions: Record<string, DistributionData>;
  };
  comparisons: ComparisonResult[];
  generatedAt: Date;
}

export class GetComparativeAnalyticsQueryHandler
  implements IQueryHandler<GetComparativeAnalyticsQuery, ComparativeAnalytics>
{
  constructor(
    private readDb: Database,
    private cache: ICacheService
  ) {}

  async handle(query: GetComparativeAnalyticsQuery): Promise<ComparativeAnalytics> {
    const { userIds, metricTypes, period, groupBy } = query.params;

    // 检查缓存
    const cacheKey = this.generateCacheKey(query.params);
    const cached = await this.cache.get<ComparativeAnalytics>(cacheKey);

    if (cached) {
      return cached;
    }

    const { startDate, endDate } = this.getPeriodDates(period);

    // 并行执行查询
    const [userAnalytics, aggregates, comparisons] = await Promise.all([
      this.getUserAnalytics(userIds, metricTypes, startDate, endDate),
      this.getAggregateAnalytics(userIds, metricTypes, startDate, endDate, groupBy),
      this.getComparisons(userIds, metricTypes, startDate, endDate),
    ]);

    const result: ComparativeAnalytics = {
      period: { type: period, startDate, endDate },
      users: userAnalytics,
      aggregates,
      comparisons,
      generatedAt: new Date(),
    };

    // 缓存结果(5 分钟)
    await this.cache.set(cacheKey, result, 300);

    return result;
  }

  private async getUserAnalytics(
    userIds: string[],
    metricTypes: HealthMetricType[],
    startDate: Date,
    endDate: Date
  ): Promise<UserAnalytics[]> {
    const query = `
      WITH user_metrics AS (
        SELECT
          u.id as user_id,
          u.age,
          u.gender,
          u.region,
          hrd.data_type,
          AVG(hrd.data_value::numeric) as avg_value,
          COUNT(*) as record_count
        FROM users u
        JOIN health_records hr ON hr.user_id = u.id
        JOIN health_record_data hrd ON hrd.record_id = hr.id
        WHERE u.id = ANY($1)
          AND hrd.data_type = ANY($2)
          AND hr.recorded_at BETWEEN $3 AND $4
          AND hr.deleted_at IS NULL
        GROUP BY u.id, u.age, u.gender, u.region, hrd.data_type
      )
      SELECT
        user_id,
        age,
        gender,
        region,
        json_object_agg(data_type, json_build_object(
          'average', avg_value,
          'recordCount', record_count
        )) as metrics
      FROM user_metrics
      GROUP BY user_id, age, gender, region
    `;

    return await this.readDb.any(query, [
      userIds,
      metricTypes,
      startDate,
      endDate,
    ]);
  }

  private async getAggregateAnalytics(
    userIds: string[],
    metricTypes: HealthMetricType[],
    startDate: Date,
    endDate: Date,
    groupBy?: string
  ): Promise<ComparativeAnalytics['aggregates']> {
    const groupClause = groupBy ? `, u.${groupBy}` : '';
    const groupSelect = groupBy ? `, ${groupBy}` : '';

    const query = `
      SELECT
        hrd.data_type${groupSelect},
        AVG(hrd.data_value::numeric) as avg_value,
        PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY hrd.data_value::numeric) as p25,
        PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY hrd.data_value::numeric) as p50,
        PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY hrd.data_value::numeric) as p75,
        STDDEV(hrd.data_value::numeric) as stddev_value
      FROM users u
      JOIN health_records hr ON hr.user_id = u.id
      JOIN health_record_data hrd ON hrd.record_id = hr.id
      WHERE u.id = ANY($1)
        AND hrd.data_type = ANY($2)
        AND hr.recorded_at BETWEEN $3 AND $4
        AND hr.deleted_at IS NULL
      GROUP BY hrd.data_type${groupClause}
    `;

    const results = await this.readDb.any(query, [
      userIds,
      metricTypes,
      startDate,
      endDate,
    ]);

    // 处理结果
    const averages: Record<string, number> = {};
    const distributions: Record<string, DistributionData> = {};

    for (const row of results) {
      const key = groupBy ? `${row.data_type}_${row[groupBy]}` : row.data_type;

      averages[key] = parseFloat(row.avg_value);
      distributions[key] = {
        min: parseFloat(row.p25),
        q1: parseFloat(row.p25),
        median: parseFloat(row.p50),
        q3: parseFloat(row.p75),
        max: parseFloat(row.p75),
        stdDev: parseFloat(row.stddev_value),
      };
    }

    return { averages, distributions };
  }

  private async getComparisons(
    userIds: string[],
    metricTypes: HealthMetricType[],
    startDate: Date,
    endDate: Date
  ): Promise<ComparisonResult[]> {
    const query = `
      WITH user_averages AS (
        SELECT
          u.id as user_id,
          hrd.data_type,
          AVG(hrd.data_value::numeric) as user_avg
        FROM users u
        JOIN health_records hr ON hr.user_id = u.id
        JOIN health_record_data hrd ON hrd.record_id = hr.id
        WHERE u.id = ANY($1)
          AND hrd.data_type = ANY($2)
          AND hr.recorded_at BETWEEN $3 AND $4
          AND hr.deleted_at IS NULL
        GROUP BY u.id, hrd.data_type
      ),
      overall_averages AS (
        SELECT
          data_type,
          AVG(user_avg) as overall_avg
        FROM user_averages
        GROUP BY data_type
      )
      SELECT
        ua.user_id,
        ua.data_type,
        ua.user_avg,
        oa.overall_avg,
        ((ua.user_avg - oa.overall_avg) / NULLIF(oa.overall_avg, 0)) * 100 as percent_diff
      FROM user_averages ua
      JOIN overall_averages oa ON oa.data_type = ua.data_type
      ORDER BY data_type, ABS(percent_diff) DESC
    `;

    return await this.readDb.any(query, [
      userIds,
      metricTypes,
      startDate,
      endDate,
    ]);
  }

  private generateCacheKey(params: any): string {
    return `comparative-analytics:${hashObject(params)}`;
  }

  private getPeriodDates(period: string): { startDate: Date; endDate: Date } {
    // 实现同上
    const now = new Date();
    const endDate = new Date(now.getFullYear(), now.getMonth(), now.getDate());
    let startDate = new Date(now);
    startDate.setDate(now.getDate() - 30);
    return { startDate, endDate };
  }
}
Code collapsed

4. 事件溯源集成

事件存储

code
// infrastructure/event-sourcing/event-store.ts

export interface IEventStore {
  appendEvents(aggregateId: string, events: DomainEvent[]): Promise<void>;
  getEvents(aggregateId: string): Promise<DomainEvent[]>;
  getEventsFromVersion(aggregateId: string, version: number): Promise<DomainEvent[]>;
  getAllEvents(): AsyncIterable<DomainEvent>;
}

export class PostgresEventStore implements IEventStore {
  constructor(private db: Database) {}

  async appendEvents(aggregateId: string, events: DomainEvent[]): Promise<void> {
    const client = await this.db.connect();

    try {
      await client.query('BEGIN');

      for (const event of events) {
        await client.query(
          `INSERT INTO event_store (event_id, aggregate_id, event_type, event_data, version, occurred_at)
           VALUES ($1, $2, $3, $4, $5, $6)`,
          [
            event.eventId,
            aggregateId,
            event.getEventType(),
            JSON.stringify(event),
            event.aggregateVersion,
            event.occurredAt,
          ]
        );
      }

      await client.query('COMMIT');
    } catch (error) {
      await client.query('ROLLBACK');
      throw error;
    } finally {
      client.release();
    }
  }

  async getEvents(aggregateId: string): Promise<DomainEvent[]> {
    const result = await this.db.any(
      `SELECT event_id, event_type, event_data, version, occurred_at
       FROM event_store
       WHERE aggregate_id = $1
       ORDER BY version ASC`,
      [aggregateId]
    );

    return result.map(row => this.deserializeEvent(row));
  }

  async getEventsFromVersion(
    aggregateId: string,
    version: number
  ): Promise<DomainEvent[]> {
    const result = await this.db.any(
      `SELECT event_id, event_type, event_data, version, occurred_at
       FROM event_store
       WHERE aggregate_id = $1 AND version >= $2
       ORDER BY version ASC`,
      [aggregateId, version]
    );

    return result.map(row => this.deserializeEvent(row));
  }

  async *getAllEvents(): AsyncIterable<DomainEvent> {
    let lastId = '';

    while (true) {
      const result = await this.db.any(
        `SELECT event_id, event_type, event_data, version, occurred_at
         FROM event_store
         WHERE event_id > $1
         ORDER BY event_id
         LIMIT 1000`,
        [lastId]
      );

      if (result.length === 0) {
        // 等待新事件
        await new Promise(resolve => setTimeout(resolve, 1000));
        continue;
      }

      for (const row of result) {
        yield this.deserializeEvent(row);
        lastId = row.event_id;
      }
    }
  }

  private deserializeEvent(row: any): DomainEvent {
    const eventData = JSON.parse(row.event_data);
    // 根据事件类型反序列化为具体的事件类
    return eventData as DomainEvent;
  }
}
Code collapsed

投影器(Projector)

code
// analytics/projectors/health-summary-projector.ts

/**
 * 投影器:将事件转换为读模型
 */
export class HealthSummaryProjector {
  constructor(
    private eventStore: IEventStore,
    private readDb: Database
  ) {}

  /**
   * 处理事件并更新读模型
   */
  async processEvents(): Promise<void> {
    for await (const event of this.eventStore.getAllEvents()) {
      try {
        await this.processEvent(event);
      } catch (error) {
        console.error(`处理事件 ${event.eventId} 失败:`, error);
      }
    }
  }

  /**
   * 处理单个事件
   */
  private async processEvent(event: DomainEvent): Promise<void> {
    switch (event.getEventType()) {
      case 'HealthRecordCreated':
        await this.handleHealthRecordCreated(event as HealthRecordCreatedEvent);
        break;

      case 'HealthRecordUpdated':
        await this.handleHealthRecordUpdated(event as HealthRecordUpdatedEvent);
        break;

      case 'CriticalHealthValueDetected':
        await this.handleCriticalHealthValue(event as CriticalHealthValueDetectedEvent);
        break;

      // 其他事件类型...
    }
  }

  /**
   * 处理健康记录创建事件
   */
  private async handleHealthRecordCreated(
    event: HealthRecordCreatedEvent
  ): Promise<void> {
    // 更新用户健康摘要
    await this.readDb.none(
      `INSERT INTO user_health_summary (user_id, tenant_id, total_records, last_record_at)
       VALUES ($1, $2, 1, $3)
       ON CONFLICT (user_id)
       DO UPDATE SET
         total_records = user_health_summary.total_records + 1,
         last_record_at = GREATEST(user_health_summary.last_record_at, $3)`,
      [event.userId, event.tenantId, event.recordedAt]
    );

    // 更新健康趋势数据
    for (const data of event.data) {
      await this.readDb.none(
        `INSERT INTO health_trends (user_id, tenant_id, record_type, data_type, date, avg_value, min_value, max_value, record_count)
         VALUES ($1, $2, $3, $4, $5, $6, $6, $6, 1)
         ON CONFLICT (user_id, record_type, data_type, date)
         DO UPDATE SET
           avg_value = (health_trends.avg_value * health_trends.record_count + $6) / (health_trends.record_count + 1),
           min_value = LEAST(health_trends.min_value, $6),
           max_value = GREATEST(health_trends.max_value, $6),
           record_count = health_trends.record_count + 1`,
        [
          event.userId,
          event.tenantId,
          event.recordType,
          data.type,
          event.recordedAt,
          data.value,
        ]
      );
    }
  }

  /**
   * 处理健康记录更新事件
   */
  private async handleHealthRecordUpdated(
    event: HealthRecordUpdatedEvent
  ): Promise<void> {
    // 实现略
  }

  /**
   * 处理临界健康值检测事件
   */
  private async handleCriticalHealthValue(
    event: CriticalHealthValueDetectedEvent
  ): Promise<void> {
    // 更新告警摘要
    await this.readDb.none(
      `INSERT INTO health_alerts_summary (user_id, tenant_id, date, critical_count, warning_count, info_count, alert_types)
       VALUES ($1, $2, $3, $4, 0, 0, ARRAY[$5])
       ON CONFLICT (user_id, date)
       DO UPDATE SET
         critical_count = health_alerts_summary.critical_count + $4,
         alert_types = array_append(health_alerts_summary.alert_types, $5)`,
      [
        event.userId,
        event.tenantId,
        event.detectedAt,
        event.severity === 'critical' ? 1 : 0,
        event.valueType,
      ]
    );
  }
}
Code collapsed

5. 性能优化

缓存策略

code
// analytics/cache/strategies.ts

/**
 * 多层缓存策略
 */
export class MultiLevelCache {
  private memoryCache: Map<string, { value: any; expiresAt: number }>;
  private redis: Redis;
  private memoryTTL: number = 5000; // 5 秒
  private redisTTL: number = 300;   // 5 分钟

  constructor() {
    this.memoryCache = new Map();
    this.redis = new Redis(process.env.REDIS_URL);
  }

  async get<T>(key: string): Promise<T | null> {
    // L1: 内存缓存
    const memCached = this.memoryCache.get(key);
    if (memCached && memCached.expiresAt > Date.now()) {
      return memCached.value as T;
    }

    // L2: Redis 缓存
    const redisCached = await this.redis.get(key);
    if (redisCached) {
      const value = JSON.parse(redisCached) as T;

      // 回填内存缓存
      this.memoryCache.set(key, {
        value,
        expiresAt: Date.now() + this.memoryTTL,
      });

      return value;
    }

    return null;
  }

  async set<T>(key: string, value: T, ttl?: number): Promise<void> {
    const effectiveTTL = ttl || this.redisTTL;

    // 写入内存缓存
    this.memoryCache.set(key, {
      value,
      expiresAt: Date.now() + this.memoryTTL,
    });

    // 写入 Redis 缓存
    await this.redis.setex(key, effectiveTTL, JSON.stringify(value));
  }

  async invalidate(pattern: string): Promise<void> {
    // 清除内存缓存
    for (const key of this.memoryCache.keys()) {
      if (key.match(pattern)) {
        this.memoryCache.delete(key);
      }
    }

    // 清除 Redis 缓存
    const keys = await this.redis.keys(pattern);
    if (keys.length > 0) {
      await this.redis.del(...keys);
    }
  }
}

/**
 * 缓存装饰器
 */
export function Cacheable(ttl?: number) {
  return function (
    target: any,
    propertyKey: string,
    descriptor: PropertyDescriptor
  ) {
    const originalMethod = descriptor.value;
    const cache = new MultiLevelCache();

    descriptor.value = async function (...args: any[]) {
      const cacheKey = `${propertyKey}:${hashArgs(args)}`;

      // 尝试从缓存获取
      const cached = await cache.get(cacheKey);
      if (cached !== null) {
        return cached;
      }

      // 执行原方法
      const result = await originalMethod.apply(this, args);

      // 写入缓存
      await cache.set(cacheKey, result, ttl);

      return result;
    };

    return descriptor;
  };
}
Code collapsed

批处理和预聚合

code
// analytics/batch/batch-processor.ts

/**
 * 批处理处理器
 */
export class BatchProcessor {
  private queue: Map<string, any[]> = new Map();
  private processing: boolean = false;

  constructor(
    private batchSize: number = 100,
    private batchTimeout: number = 5000
  ) {}

  /**
   * 添加到批处理队列
   */
  async add(topic: string, item: any): Promise<void> {
    if (!this.queue.has(topic)) {
      this.queue.set(topic, []);
    }

    this.queue.get(topic)!.push(item);

    // 检查是否达到批处理大小
    if (this.queue.get(topic)!.length >= this.batchSize) {
      await this.processBatch(topic);
    }
  }

  /**
   * 定期刷新批处理
   */
  startPeriodicFlush(): void {
    setInterval(async () => {
      for (const topic of this.queue.keys()) {
        if (this.queue.get(topic)!.length > 0) {
          await this.processBatch(topic);
        }
      }
    }, this.batchTimeout);
  }

  /**
   * 处理批次
   */
  private async processBatch(topic: string): Promise<void> {
    if (this.processing) return;

    this.processing = true;

    try {
      const items = this.queue.get(topic) || [];
      if (items.length === 0) return;

      // 处理批次
      switch (topic) {
        case 'health-events':
          await this.processHealthEvents(items);
          break;

        case 'user-events':
          await this.processUserEvents(items);
          break;

        // 其他主题...
      }

      // 清空队列
      this.queue.set(topic, []);
    } finally {
      this.processing = false;
    }
  }

  /**
   * 处理健康事件批次
   */
  private async processHealthEvents(events: DomainEvent[]): Promise<void> {
    // 批量插入到数据库
    await this.db.none(
      `INSERT INTO event_store (event_id, aggregate_id, event_type, event_data, version, occurred_at)
       SELECT * FROM json_populate_recordset(null::event_store, $1)`,
      [JSON.stringify(events.map(e => ({
        event_id: e.eventId,
        aggregate_id: e.aggregateId,
        event_type: e.getEventType(),
        event_data: JSON.stringify(e),
        version: e.aggregateVersion,
        occurred_at: e.occurredAt,
      })))]
    );

    // 批量更新物化视图
    await this.updateReadModels(events);
  }

  /**
   * 更新读模型
   */
  private async updateReadModels(events: DomainEvent[]): Promise<void> {
    // 按用户分组
    const userEvents = new Map<string, DomainEvent[]>();

    for (const event of events) {
      const userId = this.extractUserId(event);
      if (!userEvents.has(userId)) {
        userEvents.set(userId, []);
      }
      userEvents.get(userId)!.push(event);
    }

    // 批量更新每个用户的摘要
    for (const [userId, userEvents] of userEvents.entries()) {
      await this.updateUserSummary(userId, userEvents);
    }
  }

  private async updateUserSummary(
    userId: string,
    events: DomainEvent[]
  ): Promise<void> {
    // 实现略
  }

  private extractUserId(event: DomainEvent): string {
    // 从事件中提取用户 ID
    return (event as any).userId || '';
  }
}
Code collapsed

合规检查清单

数据安全

  • 读写访问控制分离
  • 敏感数据加密存储
  • 审计日志完整记录
  • 数据最小化原则

性能优化

  • 物化视图定期刷新
  • 多层缓存策略
  • 批处理优化
  • 查询索引优化

数据一致性

  • 最终一致性保证
  • 事件顺序保证
  • 补偿事务机制
  • 数据同步监控

参考资料


免责声明:CQRS 增加了系统复杂度。在采用前请评估实际需求,确保团队有能力维护这种架构。

#

文章标签

cqrs
event-sourcing
analytics
scalability
architecture
postgres

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅