康心伴Logo
康心伴WellAlly
Data & Privacy

单体到微服务重构指南:领域驱动设计(DDD)实战

5 分钟阅读

单体到微服务重构指南:领域驱动设计(DDD)实战

概述

将大型单体应用拆分为微服务是一个复杂的架构决策。领域驱动设计(DDD)提供了科学的方法来识别服务边界,确保每个微服务都有清晰的职责和独立的业务价值。

为什么选择 DDD 进行拆分?

传统拆分方式DDD 拆分方式
按技术层次(UI、业务、数据)按业务领域
按数据库表按聚合根和实体
随意划分明确的边界上下文
导致服务间紧耦合保持服务松耦合

拆分收益

  • 独立部署:每个服务可独立发布
  • 技术异构:不同服务使用不同技术栈
  • 弹性扩展:按需扩展特定服务
  • 团队自治:小团队负责特定领域
  • 故障隔离:单个服务故障不影响全局

1. 领域建模

识别边界上下文

code
// 理解健康平台的业务领域
interface DomainContexts {
  // 用户管理上下文
  userContext: {
    entities: ['User', 'Profile', 'Preferences'];
    boundedContext: 'Identity & Access';
  };

  // 健康记录上下文
  healthRecordContext: {
    entities: ['HealthRecord', 'LabResult', 'Prescription'];
    boundedContext: 'Health Records Management';
  };

  // 设备管理上下文
  deviceContext: {
    entities: ['Device', 'DeviceReading', 'DeviceCalibration'];
    boundedContext: 'IoT Device Management';
  };

  // 告警通知上下文
  notificationContext: {
    entities: ['Alert', 'Notification', 'NotificationTemplate'];
    boundedContext: 'Alert & Notification';
  };

  // 报告分析上下文
  analyticsContext: {
    entities: ['HealthReport', 'TrendAnalysis', 'Insight'];
    boundedContext: 'Health Analytics';
  };
}
Code collapsed

定义聚合和聚合根

code
// health-record/domain/health-record-aggregate.ts

/**
 * 聚合:健康记录
 * 聚合根:HealthRecord
 * 职责:管理用户的所有健康数据记录
 */
export class HealthRecordAggregate {
  private readonly records: Map<string, HealthRecord> = new Map();

  /**
   * 创建新的健康记录
   */
  createRecord(command: CreateHealthRecordCommand): HealthRecordCreatedEvent {
    const record = HealthRecord.create({
      userId: command.userId,
      recordType: command.recordType,
      data: command.data,
      recordedAt: command.recordedAt || new Date(),
      source: command.source,
    });

    this.records.set(record.id, record);

    return {
      eventType: 'HealthRecordCreated',
      aggregateId: record.id,
      userId: record.userId,
      recordType: record.recordType,
      data: record.data,
      timestamp: new Date(),
    };
  }

  /**
   * 更新健康记录
   */
  updateRecord(command: UpdateHealthRecordCommand): HealthRecordUpdatedEvent {
    const record = this.records.get(command.recordId);

    if (!record) {
      throw new Error('记录不存在');
    }

    if (record.userId !== command.userId) {
      throw new Error('无权修改此记录');
    }

    record.update({
      data: command.data,
      updatedAt: new Date(),
    });

    return {
      eventType: 'HealthRecordUpdated',
      aggregateId: record.id,
      userId: record.userId,
      data: record.data,
      timestamp: new Date(),
    };
  }

  /**
   * 删除健康记录(软删除)
   */
  deleteRecord(command: DeleteHealthRecordCommand): HealthRecordDeletedEvent {
    const record = this.records.get(command.recordId);

    if (!record) {
      throw new Error('记录不存在');
    }

    record.softDelete(command.reason, command.deletedBy);

    return {
      eventType: 'HealthRecordDeleted',
      aggregateId: record.id,
      userId: record.userId,
      reason: command.reason,
      timestamp: new Date(),
    };
  }

  /**
   * 添加附件到记录
   */
  addAttachment(command: AddAttachmentCommand): AttachmentAddedEvent {
    const record = this.records.get(command.recordId);

    if (!record) {
      throw new Error('记录不存在');
    }

    const attachment = record.addAttachment({
      fileType: command.fileType,
      fileName: command.fileName,
      fileUrl: command.fileUrl,
      fileSize: command.fileSize,
    });

    return {
      eventType: 'AttachmentAdded',
      aggregateId: record.id,
      userId: record.userId,
      attachmentId: attachment.id,
      timestamp: new Date(),
    };
  }
}

/**
 * 值对象:健康数据
 */
export class HealthData {
  constructor(
    public readonly type: HealthDataType,
    public readonly value: number,
    public readonly unit: string,
    public readonly referenceRange?: ReferenceRange
  ) {}

  /**
   * 检查是否在正常范围
   */
  isNormal(): boolean {
    if (!this.referenceRange) return true;
    return this.value >= this.referenceRange.min &&
           this.value <= this.referenceRange.max;
  }

  /**
   * 获取异常级别
   */
  getAbnormalLevel(): 'normal' | 'warning' | 'critical' {
    if (!this.referenceRange) return 'normal';

    if (this.value < this.referenceRange.criticalMin ||
        this.value > this.referenceRange.criticalMax) {
      return 'critical';
    }

    if (this.value < this.referenceRange.min ||
        this.value > this.referenceRange.max) {
      return 'warning';
    }

    return 'normal';
  }

  equals(other: HealthData): boolean {
    return this.type === other.type &&
           this.value === other.value &&
           this.unit === other.unit;
  }
}

/**
 * 实体:健康记录
 */
export class HealthRecord {
  private readonly _id: string;
  private readonly _userId: string;
  private readonly _recordType: HealthRecordType;
  private _data: HealthData[];
  private readonly _recordedAt: Date;
  private _updatedAt: Date;
  private _deletedAt: Date | null;
  private readonly _attachments: Attachment[];

  private constructor(props: HealthRecordProps) {
    this._id = props.id;
    this._userId = props.userId;
    this._recordType = props.recordType;
    this._data = props.data;
    this._recordedAt = props.recordedAt;
    this._updatedAt = props.updatedAt;
    this._deletedAt = props.deletedAt;
    this._attachments = props.attachments || [];
  }

  static create(props: CreateHealthRecordProps): HealthRecord {
    return new HealthRecord({
      id: generateId(),
      userId: props.userId,
      recordType: props.recordType,
      data: props.data,
      recordedAt: props.recordedAt,
      updatedAt: new Date(),
      deletedAt: null,
    });
  }

  get id(): string {
    return this._id;
  }

  get userId(): string {
    return this._userId;
  }

  get data(): HealthData[] {
    return [...this._data];
  }

  update(props: Partial<UpdateProps>): void {
    if (this._deletedAt) {
      throw new Error('无法修改已删除的记录');
    }

    if (props.data) {
      this._data = props.data;
    }
    this._updatedAt = new Date();
  }

  softDelete(reason: string, deletedBy: string): void {
    this._deletedAt = new Date();
  }

  addAttachment(props: AttachmentProps): Attachment {
    const attachment = Attachment.create({
      healthRecordId: this.id,
      ...props,
    });

    this._attachments.push(attachment);
    return attachment;
  }

  isDeleted(): boolean {
    return this._deletedAt !== null;
  }
}
Code collapsed

定义领域服务

code
// health-record/domain/health-analyzer.domain-service.ts

/**
 * 领域服务:健康数据分析
 * 跨聚合的复杂业务逻辑
 */
export class HealthAnalyzerDomainService {
  /**
   * 分析健康趋势
   */
  analyzeTrend(
    records: HealthRecord[]
  ): HealthTrendAnalysis {
    if (records.length < 2) {
      throw new Error('需要至少 2 条记录来分析趋势');
    }

    const sortedRecords = records.sort((a, b) =>
      a.recordedAt.getTime() - b.recordedAt.getTime()
    );

    const trends: Record<string, TrendDirection> = {};

    // 按数据类型分组
    const dataByType = this.groupDataByType(sortedRecords);

    for (const [type, dataPoints] of Object.entries(dataByType)) {
      if (dataPoints.length < 2) continue;

      const trend = this.calculateTrend(dataPoints);
      trends[type] = trend;
    }

    return {
      period: {
        start: sortedRecords[0].recordedAt,
        end: sortedRecords[sortedRecords.length - 1].recordedAt,
      },
      trends,
      overall: this.calculateOverallTrend(trends),
    };
  }

  /**
   * 检测异常值
   */
  detectAnomalies(
    records: HealthRecord[]
  ): AnomalyDetectionResult {
    const anomalies: DetectedAnomaly[] = [];

    for (const record of records) {
      for (const data of record.data) {
        const level = data.getAbnormalLevel();

        if (level !== 'normal') {
          anomalies.push({
            recordId: record.id,
            dataType: data.type,
            value: data.value,
            unit: data.unit,
            level,
            detectedAt: new Date(),
          });
        }
      }
    }

    return {
      total: anomalies.length,
      critical: anomalies.filter(a => a.level === 'critical').length,
      warning: anomalies.filter(a => a.level === 'warning').length,
      anomalies,
    };
  }

  /**
   * 生成健康报告
   */
  generateReport(
    userId: string,
    records: HealthRecord[],
    userBaseline?: HealthBaseline
  ): HealthReport {
    const trend = this.analyzeTrend(records);
    const anomalies = this.detectAnomalies(records);
    const insights = this.generateInsights(trend, anomalies, userBaseline);

    return {
      id: generateId(),
      userId,
      generatedAt: new Date(),
      period: trend.period,
      trend: trend.overall,
      anomalies: anomalies.total,
      criticalAlerts: anomalies.critical,
      warnings: anomalies.warning,
      insights,
      recommendations: this.generateRecommendations(insights),
    };
  }

  private groupDataByType(
    records: HealthRecord[]
  ): Record<string, number[]> {
    const grouped: Record<string, number[]> = {};

    for (const record of records) {
      for (const data of record.data) {
        if (!grouped[data.type]) {
          grouped[data.type] = [];
        }
        grouped[data.type].push(data.value);
      }
    }

    return grouped;
  }

  private calculateTrend(values: number[]): TrendDirection {
    // 简单线性回归
    const n = values.length;
    const sumX = (n * (n - 1)) / 2;
    const sumY = values.reduce((a, b) => a + b, 0);
    const sumXY = values.reduce((sum, y, x) => sum + x * y, 0);
    const sumX2 = (n * (n - 1) * (2 * n - 1)) / 6;

    const slope = (n * sumXY - sumX * sumY) / (n * sumX2 - sumX * sumX);

    if (slope > 0.1) return 'increasing';
    if (slope < -0.1) return 'decreasing';
    return 'stable';
  }

  private calculateOverallTrend(
    trends: Record<string, TrendDirection>
  ): TrendDirection {
    const values = Object.values(trends);
    const increasing = values.filter(v => v === 'increasing').length;
    const decreasing = values.filter(v => v === 'decreasing').length;

    if (increasing > decreasing) return 'improving';
    if (decreasing > increasing) return 'declining';
    return 'stable';
  }

  private generateInsights(
    trend: HealthTrendAnalysis,
    anomalies: AnomalyDetectionResult,
    baseline?: HealthBaseline
  ): Insight[] {
    const insights: Insight[] = [];

    // 趋势洞察
    for (const [type, direction] of Object.entries(trend.trends)) {
      insights.push({
        type: 'trend',
        category: type,
        message: this.formatTrendMessage(type, direction),
        priority: direction === 'declining' ? 'high' : 'medium',
      });
    }

    // 异常洞察
    for (const anomaly of anomalies.anomalies) {
      if (anomaly.level === 'critical') {
        insights.push({
          type: 'alert',
          category: anomaly.dataType,
          message: `${anomaly.dataType} 值异常: ${anomaly.value} ${anomaly.unit}`,
          priority: 'critical',
        });
      }
    }

    return insights.sort((a, b) => {
      const priorityOrder = { critical: 3, high: 2, medium: 1, low: 0 };
      return priorityOrder[b.priority] - priorityOrder[a.priority];
    });
  }

  private generateRecommendations(insights: Insight[]): Recommendation[] {
    const recommendations: Recommendation[] = [];

    for (const insight of insights) {
      if (insight.type === 'alert' || insight.priority === 'high') {
        recommendations.push({
          action: 'consult_healthcare_provider',
          category: insight.category,
          priority: 'high',
          message: `建议咨询医生关于 ${insight.category} 的情况`,
        });
      }
    }

    return recommendations;
  }

  private formatTrendMessage(type: string, direction: TrendDirection): string {
    const messages = {
      increasing: `${type} 呈上升趋势`,
      decreasing: `${type} 呈下降趋势`,
      stable: `${type} 保持稳定`,
      improving: `${type} 状况改善`,
      declining: `${type} 状况恶化`,
    };

    return messages[direction];
  }
}
Code collapsed

2. 服务拆分策略

绞杀者模式(Strangler Fig)

code
// migration/strangler-fig-pattern.ts

/**
 * 绞杀者模式实现
 * 逐步将单体应用的功能迁移到微服务
 */
export class StranglerFigMigration {
  private routes: Map<string, RouteHandler> = new Map();
  private migrationConfig: MigrationConfig;

  constructor(config: MigrationConfig) {
    this.migrationConfig = config;
  }

  /**
   * 注册迁移路由
   */
  registerMigrationRoute(
    path: string,
    target: 'monolith' | 'microservice',
    microserviceUrl?: string
  ): void {
    this.routes.set(path, {
      path,
      target,
      microserviceUrl,
    });
  }

  /**
   * 路由请求到适当的处理程序
   */
  async routeRequest(
    req: Request
  ): Promise<Response> {
    const path = new URL(req.url).pathname;
    const route = this.routes.get(path);

    if (!route) {
      // 默认转发到单体应用
      return this.forwardToMonolith(req);
    }

    switch (route.target) {
      case 'microservice':
        return this.forwardToMicroservice(req, route.microserviceUrl!);

      case 'monolith':
        return this.forwardToMonolith(req);

      default:
        return new Response('Service not available', { status: 503 });
    }
  }

  /**
   * 转发到微服务
   */
  private async forwardToMicroservice(
    req: Request,
    microserviceUrl: string
  ): Promise<Response> {
    const url = new URL(req.url);
    const targetUrl = `${microserviceUrl}${url.pathname}${url.search}`;

    const response = await fetch(targetUrl, {
      method: req.method,
      headers: req.headers,
      body: req.body,
    });

    return response;
  }

  /**
   * 转发到单体应用
   */
  private async forwardToMonolith(req: Request): Promise<Response> {
    const monolithUrl = this.migrationConfig.monolithUrl;
    const url = new URL(req.url);
    const targetUrl = `${monolithUrl}${url.pathname}${url.search}`;

    const response = await fetch(targetUrl, {
      method: req.method,
      headers: req.headers,
      body: req.body,
    });

    return response;
  }

  /**
   * 获取迁移状态
   */
  getMigrationStatus(): MigrationStatus {
    const total = this.routes.size;
    const migrated = Array.from(this.routes.values())
      .filter(r => r.target === 'microservice').length;

    return {
      total,
      migrated,
      remaining: total - migrated,
      percentage: total > 0 ? (migrated / total) * 100 : 0,
    };
  }
}

// 迁移配置示例
const migrationConfig = {
  monolithUrl: 'http://localhost:3000',
  featureFlags: {
    healthRecords: true,
    appointments: false,
    prescriptions: true,
  },
};

const strangler = new StranglerFigMigration(migrationConfig);

// 注册迁移路由
strangler.registerMigrationRoute(
  '/api/health-records/*',
  'microservice',
  'http://health-records-service:3001'
);

strangler.registerMigrationRoute(
  '/api/prescriptions/*',
  'microservice',
  'http://prescriptions-service:3002'
);

// 单体应用仍处理
strangler.registerMigrationRoute(
  '/api/appointments/*',
  'monolith'
);
Code collapsed

数据库拆分策略

code
-- =============================================================================
-- 数据库拆分:共享数据库 → 独立数据库
-- =============================================================================

-- 阶段 1: 在单体数据库中添加服务标识
ALTER TABLE health_records ADD COLUMN service_id VARCHAR(50);
ALTER TABLE lab_results ADD COLUMN service_id VARCHAR(50);
ALTER TABLE prescriptions ADD COLUMN service_id VARCHAR(50);

-- 为现有数据标记服务
UPDATE health_records SET service_id = 'health-records-service' WHERE service_id IS NULL;
UPDATE lab_results SET service_id = 'health-records-service' WHERE service_id IS NULL;
UPDATE prescriptions SET service_id = 'prescriptions-service' WHERE service_id IS NULL;

-- 阶段 2: 创建变更数据捕获(CDC)
-- 使用 Debezium 或自定义 CDC 解决方案捕获变更

-- 阶段 3: 数据同步
-- 将数据从单体数据库同步到新的服务数据库

-- 阶段 4: 验证数据一致性
-- 比较源数据库和目标数据库的数据

-- 阶段 5: 切换读写
-- 将应用程序的读写操作切换到新数据库

-- 阶段 6: 清理
-- 从单体数据库中删除已迁移的数据
Code collapsed

3. 事件驱动架构

事件定义

code
// common/events/domain-events.ts

/**
 * 基础领域事件
 */
export abstract class DomainEvent {
  public readonly occurredAt: Date;
  public readonly eventId: string;
  public readonly aggregateId: string;
  public readonly aggregateVersion: number;

  constructor(props: DomainEventProps) {
    this.eventId = generateId();
    this.occurredAt = new Date();
    this.aggregateId = props.aggregateId;
    this.aggregateVersion = props.aggregateVersion;
  }

  abstract getEventType(): string;
}

/**
 * 健康记录相关事件
 */
export class HealthRecordCreatedEvent extends DomainEvent {
  constructor(
    public readonly userId: string,
    public readonly recordType: HealthRecordType,
    public readonly data: HealthData[],
    aggregateId: string,
    aggregateVersion: number = 1
  ) {
    super({ aggregateId, aggregateVersion });
  }

  getEventType(): string {
    return 'HealthRecordCreated';
  }
}

export class HealthRecordUpdatedEvent extends DomainEvent {
  constructor(
    public readonly userId: string,
    public readonly changes: Record<string, any>,
    aggregateId: string,
    aggregateVersion: number
  ) {
    super({ aggregateId, aggregateVersion });
  }

  getEventType(): string {
    return 'HealthRecordUpdated';
  }
}

export class CriticalHealthValueDetectedEvent extends DomainEvent {
  constructor(
    public readonly userId: string,
    public readonly valueType: string,
    public readonly value: number,
    public readonly severity: 'warning' | 'critical',
    public readonly detectedAt: Date,
    aggregateId: string,
    aggregateVersion: number
  ) {
    super({ aggregateId, aggregateVersion });
  }

  getEventType(): string {
    return 'CriticalHealthValueDetected';
  }
}

/**
 * 用户相关事件
 */
export class UserRegisteredEvent extends DomainEvent {
  constructor(
    public readonly userId: string,
    public readonly email: string,
    public readonly name: string,
    aggregateId: string,
    aggregateVersion: number = 1
  ) {
    super({ aggregateId, aggregateVersion });
  }

  getEventType(): string {
    return 'UserRegistered';
  }
}

export class UserPreferencesUpdatedEvent extends DomainEvent {
  constructor(
    public readonly userId: string,
    public readonly preferences: Record<string, any>,
    aggregateId: string,
    aggregateVersion: number
  ) {
    super({ aggregateId, aggregateVersion });
  }

  getEventType(): string {
    return 'UserPreferencesUpdated';
  }
}

/**
 * 告警相关事件
 */
export class AlertCreatedEvent extends DomainEvent {
  constructor(
    public readonly userId: string,
    public readonly alertType: string,
    public readonly severity: 'info' | 'warning' | 'critical',
    public readonly message: string,
    public readonly metadata: Record<string, any>,
    aggregateId: string,
    aggregateVersion: number = 1
  ) {
    super({ aggregateId, aggregateVersion });
  }

  getEventType(): string {
    return 'AlertCreated';
  }
}
Code collapsed

事件总线实现

code
// infrastructure/messaging/event-bus.ts

/**
 * 事件总线接口
 */
export interface IEventBus {
  publish(event: DomainEvent): Promise<void>;
  subscribe(eventType: string, handler: EventHandler): Promise<void>;
  unsubscribe(eventType: string, handler: EventHandler): Promise<void>;
}

type EventHandler = (event: DomainEvent) => Promise<void>;

/**
 * RabbitMQ 事件总线实现
 */
export class RabbitMqEventBus implements IEventBus {
  private connection: Connection;
  private channel: Channel;
  private exchanges: Set<string> = new Set();
  private handlers: Map<string, Set<EventHandler>> = new Map();

  constructor(private config: RabbitMqConfig) {}

  async connect(): Promise<void> {
    this.connection = await amqp.connect(this.config.url);
    this.channel = await this.connection.createChannel();

    // 设置预取(每个消费者未确认消息数)
    await this.channel.prefetch(this.config.prefetch || 10);
  }

  async publish(event: DomainEvent): Promise<void> {
    const eventType = event.getEventType();
    const exchange = this.getExchangeName(eventType);

    // 确保交换机存在
    await this.ensureExchange(exchange);

    const message = {
      eventId: event.eventId,
      eventType,
      aggregateId: event.aggregateId,
      aggregateVersion: event.aggregateVersion,
      occurredAt: event.occurredAt,
      data: event,
    };

    await this.channel.publish(
      exchange,
      eventType, // routing key
      Buffer.from(JSON.stringify(message)),
      {
        persistent: true,
        messageId: event.eventId,
        timestamp: Math.floor(event.occurredAt.getTime() / 1000),
      }
    );
  }

  async subscribe(eventType: string, handler: EventHandler): Promise<void> {
    const exchange = this.getExchangeName(eventType);
    const queueName = this.getQueueName(eventType);

    await this.ensureExchange(exchange);
    await this.ensureQueue(queueName);

    // 绑定队列到交换机
    await this.channel.bindQueue(queueName, exchange, eventType);

    // 注册处理器
    if (!this.handlers.has(eventType)) {
      this.handlers.set(eventType, new Set());
    }
    this.handlers.get(eventType)!.add(handler);

    // 开始消费
    await this.channel.consume(queueName, async (msg) => {
      if (!msg) return;

      try {
        const content = JSON.parse(msg.content.toString());
        const event = this.deserializeEvent(content);

        await handler(event);

        // 确认消息
        this.channel.ack(msg);
      } catch (error) {
        console.error(`处理事件 ${eventType} 时出错:`, error);

        // 拒绝消息并重新入队
        this.channel.nack(msg, false, true);
      }
    });
  }

  async unsubscribe(eventType: string, handler: EventHandler): Promise<void> {
    const handlers = this.handlers.get(eventType);
    if (handlers) {
      handlers.delete(handler);
    }
  }

  private async ensureExchange(exchange: string): Promise<void> {
    if (this.exchanges.has(exchange)) return;

    await this.channel.assertExchange(exchange, 'topic', {
      durable: true,
    });

    this.exchanges.add(exchange);
  }

  private async ensureQueue(queue: string): Promise<void> {
    await this.channel.assertQueue(queue, {
      durable: true,
      arguments: {
        'x-message-ttl': 86400000, // 24 小时
        'x-max-length': 10000,      // 最大消息数
      },
    });
  }

  private getExchangeName(eventType: string): string {
    // 按业务域分组交换机
    const domain = eventType.split('.')[0];
    return `${domain}.events`;
  }

  private getQueueName(eventType: string): string {
    return `${eventType}.queue`;
  }

  private deserializeEvent(content: any): DomainEvent {
    // 根据事件类型反序列化
    // 实际实现需要事件注册表
    return content as DomainEvent;
  }

  async disconnect(): Promise<void> {
    await this.channel.close();
    await this.connection.close();
  }
}
Code collapsed

事件处理器

code
// notification-service/handlers/health-event-handlers.ts

/**
 * 健康记录事件处理器
 * 生成告警和通知
 */
export class HealthEventHandlers {
  constructor(
    private alertService: AlertService,
    private notificationService: NotificationService
  ) {}

  /**
   * 处理健康记录创建事件
   */
  async handleHealthRecordCreated(event: HealthRecordCreatedEvent): Promise<void> {
    // 检查是否有临界值
    for (const data of event.data) {
      const abnormalLevel = data.getAbnormalLevel();

      if (abnormalLevel !== 'normal') {
        // 创建告警
        await this.alertService.createAlert({
          userId: event.userId,
          alertType: data.type,
          severity: abnormalLevel,
          message: this.formatAbnormalMessage(data, abnormalLevel),
          sourceRecordId: event.aggregateId,
        });

        // 发送通知
        if (abnormalLevel === 'critical') {
          await this.notificationService.sendImmediateNotification({
            userId: event.userId,
            type: 'health_alert',
            severity: 'critical',
            message: this.formatAbnormalMessage(data, abnormalLevel),
          });
        }
      }
    }
  }

  /**
   * 处理临界健康值检测事件
   */
  async handleCriticalHealthValue(event: CriticalHealthValueDetectedEvent): Promise<void> {
    // 创建告警
    await this.alertService.createAlert({
      userId: event.userId,
      alertType: event.valueType,
      severity: event.severity,
      message: `检测到临界 ${event.valueType} 值: ${event.value}`,
      sourceEventId: event.eventId,
    });

    // 发送紧急通知
    await this.notificationService.sendImmediateNotification({
      userId: event.userId,
      type: 'critical_health_alert',
      severity: 'critical',
      message: `您的 ${event.valueType} 检测到临界值,请立即关注`,
    });

    // 通知紧急联系人
    await this.notificationService.notifyEmergencyContacts({
      userId: event.userId,
      alertType: event.valueType,
      severity: event.severity,
    });
  }

  private formatAbnormalMessage(data: HealthData, level: string): string {
    if (level === 'critical') {
      return `严重异常:${data.type} 值为 ${data.value} ${data.unit},超出正常范围`;
    }
    return `注意:${data.type} 值为 ${data.value} ${data.unit},略偏离正常范围`;
  }
}

// 注册事件处理器
const eventBus = new RabbitMqEventBus(rabbitMqConfig);
const handlers = new HealthEventHandlers(alertService, notificationService);

await eventBus.subscribe('HealthRecordCreated', handlers.handleHealthRecordCreated.bind(handlers));
await eventBus.subscribe('CriticalHealthValueDetected', handlers.handleCriticalHealthValue.bind(handlers));
Code collapsed

4. 数据一致性

Saga 模式实现

code
// infrastructure/sagas/create-health-record-saga.ts

/**
 * Saga:创建健康记录
 * 协调多个服务完成复杂业务流程
 */
export class CreateHealthRecordSaga {
  constructor(
    private healthRecordService: HealthRecordService,
    private analyticsService: AnalyticsService,
    private notificationService: NotificationService,
    private auditLogService: AuditLogService
  ) {}

  async execute(command: CreateHealthRecordCommand): Promise<SagaResult> {
    const sagaId = generateId();
    const steps: SagaStep[] = [];

    try {
      // 步骤 1: 创建健康记录
      const recordResult = await this.executeStep(
        sagaId,
        'create_health_record',
        async () => {
          return await this.healthRecordService.createRecord(command);
        },
        async (result) => {
          // 补偿:删除记录
          await this.healthRecordService.deleteRecord(result.recordId);
        }
      );
      steps.push(recordResult);

      // 步骤 2: 更新分析数据
      const analyticsResult = await this.executeStep(
        sagaId,
        'update_analytics',
        async () => {
          return await this.analyticsService.updateHealthData(command.userId, recordResult.data);
        },
        async (result) => {
          // 补偿:回滚分析数据
          await this.analyticsService.rollbackHealthData(command.userId, result.data);
        }
      );
      steps.push(analyticsResult);

      // 步骤 3: 发送通知(如果需要)
      if (command.shouldNotify) {
        const notificationResult = await this.executeStep(
          sagaId,
          'send_notification',
          async () => {
            return await this.notificationService.sendHealthRecordNotification(recordResult.data);
          },
          async (result) => {
            // 补偿:无(通知失败不影响主流程)
          }
        );
        steps.push(notificationResult);
      }

      // 步骤 4: 记录审计日志
      await this.executeStep(
        sagaId,
        'log_audit',
        async () => {
          return await this.auditLogService.logAction({
            action: 'CREATE_HEALTH_RECORD',
            userId: command.userId,
            resourceId: recordResult.data.recordId,
            details: command,
          });
        },
        async (result) => {
          // 补偿:无(审计日志失败不影响主流程)
        }
      );

      return {
        sagaId,
        status: 'completed',
        steps,
        result: recordResult.data,
      };

    } catch (error) {
      // 执行补偿
      await this.compensate(steps.reverse());

      return {
        sagaId,
        status: 'failed',
        steps,
        error: error.message,
      };
    }
  }

  private async executeStep<T>(
    sagaId: string,
    stepName: string,
    action: () => Promise<T>,
    compensate: (result: T) => Promise<void>
  ): Promise<SagaStep> {
    try {
      const result = await action();

      return {
        name: stepName,
        status: 'completed',
        result,
        compensate: () => compensate(result),
      };
    } catch (error) {
      return {
        name: stepName,
        status: 'failed',
        error: error.message,
        compensate: () => Promise.resolve(),
      };
    }
  }

  private async compensate(steps: SagaStep[]): Promise<void> {
    for (const step of steps) {
      if (step.status === 'completed') {
        try {
          await step.compensate();
        } catch (error) {
          console.error(`补偿步骤 ${step.name} 失败:`, error);
        }
      }
    }
  }
}
Code collapsed

分布式事务

code
// infrastructure/transactions/distributed-transaction.ts

/**
 * 分布式事务协调器
 */
export class DistributedTransactionCoordinator {
  private transactions: Map<string, DistributedTransaction> = new Map();

  /**
   * 开始新事务
   */
  beginTransaction(
    participants: TransactionParticipant[]
  ): string {
    const transactionId = generateId();

    const transaction: DistributedTransaction = {
      id: transactionId,
      status: 'pending',
      participants: participants.map(p => ({
        ...p,
        status: 'pending',
      })),
      startedAt: new Date(),
    };

    this.transactions.set(transactionId, transaction);

    return transactionId;
  }

  /**
   * 两阶段提交:准备阶段
   */
  async prepare(transactionId: string): Promise<boolean> {
    const transaction = this.transactions.get(transactionId);

    if (!transaction) {
      throw new Error('事务不存在');
    }

    transaction.status = 'preparing';

    // 请求所有参与者准备
    for (const participant of transaction.participants) {
      try {
        const prepared = await participant.prepare();

        if (prepared) {
          participant.status = 'prepared';
        } else {
          // 准备失败,取消其他参与者
          await this.rollback(transactionId);
          return false;
        }
      } catch (error) {
        participant.status = 'failed';
        participant.error = error.message;
        await this.rollback(transactionId);
        return false;
      }
    }

    transaction.status = 'prepared';
    return true;
  }

  /**
   * 两阶段提交:提交阶段
   */
  async commit(transactionId: string): Promise<void> {
    const transaction = this.transactions.get(transactionId);

    if (!transaction || transaction.status !== 'prepared') {
      throw new Error('事务未准备好');
    }

    transaction.status = 'committing';

    // 提交所有参与者
    for (const participant of transaction.participants) {
      try {
        await participant.commit();
        participant.status = 'committed';
      } catch (error) {
        participant.status = 'failed';
        participant.error = error.message;
        // 记录错误但继续提交其他参与者
        console.error(`提交参与者 ${participant.name} 失败:`, error);
      }
    }

    const allCommitted = transaction.participants.every(
      p => p.status === 'committed'
    );

    transaction.status = allCommitted ? 'committed' : 'partially_committed';
    transaction.completedAt = new Date();
  }

  /**
   * 回滚事务
   */
  async rollback(transactionId: string): Promise<void> {
    const transaction = this.transactions.get(transactionId);

    if (!transaction) {
      throw new Error('事务不存在');
    }

    transaction.status = 'rolling_back';

    // 回滚所有已准备的参与者
    for (const participant of transaction.participants) {
      if (participant.status === 'prepared') {
        try {
          await participant.rollback();
          participant.status = 'rolled_back';
        } catch (error) {
          participant.status = 'rollback_failed';
          participant.error = error.message;
        }
      }
    }

    transaction.status = 'rolled_back';
    transaction.completedAt = new Date();
  }
}

/**
 * 事务参与者接口
 */
export interface TransactionParticipant {
  name: string;
  service: string;
  prepare(): Promise<boolean>;
  commit(): Promise<void>;
  rollback(): Promise<void>;
}
Code collapsed

5. API 网关

网关路由配置

code
// api-gateway/routes/gateway-routes.ts

export const gatewayRoutes = [
  // 健康记录服务
  {
    path: '/api/health-records/*',
    service: 'health-records-service',
    timeout: 30000,
    rateLimit: {
      windowMs: 60000,
      maxRequests: 100,
    },
    auth: {
      required: true,
      scopes: ['health:read', 'health:write'],
    },
  },

  // 处方服务
  {
    path: '/api/prescriptions/*',
    service: 'prescriptions-service',
    timeout: 30000,
    rateLimit: {
      windowMs: 60000,
      maxRequests: 50,
    },
    auth: {
      required: true,
      scopes: ['prescription:read', 'prescription:write'],
    },
  },

  // 告警服务
  {
    path: '/api/alerts/*',
    service: 'alert-service',
    timeout: 10000,
    rateLimit: {
      windowMs: 60000,
      maxRequests: 200,
    },
    auth: {
      required: true,
      scopes: ['alert:read'],
    },
  },

  // 分析服务
  {
    path: '/api/analytics/*',
    service: 'analytics-service',
    timeout: 60000,
    rateLimit: {
      windowMs: 60000,
      maxRequests: 20,
    },
    auth: {
      required: true,
      scopes: ['analytics:read'],
    },
    cache: {
      enabled: true,
      ttl: 300, // 5 分钟
    },
  },
];
Code collapsed

网关中间件

code
// api-gateway/middleware/compose-middleware.ts

import { Request, Response, NextFunction } from 'express';

/**
 * 组合中间件
 */
export function composeMiddleware(
  ...middlewares: Array<(req: Request, res: Response, next: NextFunction) => void>
) {
  return (req: Request, res: Response, next: NextFunction) => {
    let index = 0;

    function dispatch(i: number): void {
      if (i <= index) {
        return next(new Error('next() called multiple times'));
      }

      index = i;

      if (i === middlewares.length) {
        return next();
      }

      const middleware = middlewares[i];

      try {
        middleware(req, res, (err?: any) => {
          if (err) return next(err);
          dispatch(i + 1);
        });
      } catch (err) {
        next(err);
      }
    }

    dispatch(0);
  };
}

/**
 * 认证中间件
 */
export function authMiddleware(requiredScopes: string[] = []) {
  return async (req: Request, res: Response, next: NextFunction) => {
    try {
      const token = req.headers.authorization?.replace('Bearer ', '');

      if (!token) {
        return res.status(401).json({ error: 'Unauthorized' });
      }

      const decoded = await verifyToken(token);
      const userScopes = decoded.scopes || [];

      const hasRequiredScopes = requiredScopes.every(scope =>
        userScopes.includes(scope)
      );

      if (!hasRequiredScopes) {
        return res.status(403).json({ error: 'Forbidden' });
      }

      req.user = decoded;
      next();
    } catch (error) {
      res.status(401).json({ error: 'Invalid token' });
    }
  };
}

/**
 * 速率限制中间件
 */
export function rateLimitMiddleware(config: { windowMs: number; maxRequests: number }) {
  const requests = new Map<string, number[]>();

  return (req: Request, res: Response, next: NextFunction) => {
    const key = req.ip || req.socket.remoteAddress || 'unknown';
    const now = Date.now();

    // 清理过期记录
    const windowStart = now - config.windowMs;
    let userRequests = requests.get(key) || [];
    userRequests = userRequests.filter(time => time > windowStart);

    // 检查限制
    if (userRequests.length >= config.maxRequests) {
      return res.status(429).json({
        error: 'Too many requests',
        retryAfter: Math.ceil(config.windowMs / 1000),
      });
    }

    // 记录请求
    userRequests.push(now);
    requests.set(key, userRequests);

    // 设置速率限制头
    res.setHeader('X-RateLimit-Limit', config.maxRequests);
    res.setHeader('X-RateLimit-Remaining', config.maxRequests - userRequests.length);
    res.setHeader('X-RateLimit-Reset', new Date(now + config.windowMs).toISOString());

    next();
  };
}

/**
 * 超时中间件
 */
export function timeoutMiddleware(ms: number) {
  return (req: Request, res: Response, next: NextFunction) => {
    const timeout = setTimeout(() => {
      next(new Error('Request timeout'));
    }, ms);

    res.on('finish', () => clearTimeout(timeout));
    next();
  };
}
Code collapsed

迁移清单

准备阶段

  • 绘制当前系统架构图
  • 识别业务边界上下文
  • 定义服务拆分策略
  • 评估技术栈需求
  • 制定迁移计划

实施阶段

  • 建立 API 网关
  • 设置事件总线
  • 创建新服务数据库
  • 实施绞杀者模式
  • 迁移数据和功能

验证阶段

  • 功能测试
  • 性能测试
  • 数据一致性验证
  • 故障恢复测试
  • 安全审计

优化阶段

  • 监控和告警
  • 日志聚合
  • 链路追踪
  • 容量规划
  • 成本优化

参考资料


免责声明:微服务架构增加了系统复杂度。在重构前请仔细评估业务需求、团队能力和技术成熟度。

#

文章标签

microservices
ddd
refactoring
architecture
nodejs
backend

觉得这篇文章有帮助?

立即体验康心伴,开始您的健康管理之旅