单体到微服务重构指南:领域驱动设计(DDD)实战
概述
将大型单体应用拆分为微服务是一个复杂的架构决策。领域驱动设计(DDD)提供了科学的方法来识别服务边界,确保每个微服务都有清晰的职责和独立的业务价值。
为什么选择 DDD 进行拆分?
| 传统拆分方式 | DDD 拆分方式 |
|---|---|
| 按技术层次(UI、业务、数据) | 按业务领域 |
| 按数据库表 | 按聚合根和实体 |
| 随意划分 | 明确的边界上下文 |
| 导致服务间紧耦合 | 保持服务松耦合 |
拆分收益
- 独立部署:每个服务可独立发布
- 技术异构:不同服务使用不同技术栈
- 弹性扩展:按需扩展特定服务
- 团队自治:小团队负责特定领域
- 故障隔离:单个服务故障不影响全局
1. 领域建模
识别边界上下文
code
// 理解健康平台的业务领域
interface DomainContexts {
// 用户管理上下文
userContext: {
entities: ['User', 'Profile', 'Preferences'];
boundedContext: 'Identity & Access';
};
// 健康记录上下文
healthRecordContext: {
entities: ['HealthRecord', 'LabResult', 'Prescription'];
boundedContext: 'Health Records Management';
};
// 设备管理上下文
deviceContext: {
entities: ['Device', 'DeviceReading', 'DeviceCalibration'];
boundedContext: 'IoT Device Management';
};
// 告警通知上下文
notificationContext: {
entities: ['Alert', 'Notification', 'NotificationTemplate'];
boundedContext: 'Alert & Notification';
};
// 报告分析上下文
analyticsContext: {
entities: ['HealthReport', 'TrendAnalysis', 'Insight'];
boundedContext: 'Health Analytics';
};
}
Code collapsed
定义聚合和聚合根
code
// health-record/domain/health-record-aggregate.ts
/**
* 聚合:健康记录
* 聚合根:HealthRecord
* 职责:管理用户的所有健康数据记录
*/
export class HealthRecordAggregate {
private readonly records: Map<string, HealthRecord> = new Map();
/**
* 创建新的健康记录
*/
createRecord(command: CreateHealthRecordCommand): HealthRecordCreatedEvent {
const record = HealthRecord.create({
userId: command.userId,
recordType: command.recordType,
data: command.data,
recordedAt: command.recordedAt || new Date(),
source: command.source,
});
this.records.set(record.id, record);
return {
eventType: 'HealthRecordCreated',
aggregateId: record.id,
userId: record.userId,
recordType: record.recordType,
data: record.data,
timestamp: new Date(),
};
}
/**
* 更新健康记录
*/
updateRecord(command: UpdateHealthRecordCommand): HealthRecordUpdatedEvent {
const record = this.records.get(command.recordId);
if (!record) {
throw new Error('记录不存在');
}
if (record.userId !== command.userId) {
throw new Error('无权修改此记录');
}
record.update({
data: command.data,
updatedAt: new Date(),
});
return {
eventType: 'HealthRecordUpdated',
aggregateId: record.id,
userId: record.userId,
data: record.data,
timestamp: new Date(),
};
}
/**
* 删除健康记录(软删除)
*/
deleteRecord(command: DeleteHealthRecordCommand): HealthRecordDeletedEvent {
const record = this.records.get(command.recordId);
if (!record) {
throw new Error('记录不存在');
}
record.softDelete(command.reason, command.deletedBy);
return {
eventType: 'HealthRecordDeleted',
aggregateId: record.id,
userId: record.userId,
reason: command.reason,
timestamp: new Date(),
};
}
/**
* 添加附件到记录
*/
addAttachment(command: AddAttachmentCommand): AttachmentAddedEvent {
const record = this.records.get(command.recordId);
if (!record) {
throw new Error('记录不存在');
}
const attachment = record.addAttachment({
fileType: command.fileType,
fileName: command.fileName,
fileUrl: command.fileUrl,
fileSize: command.fileSize,
});
return {
eventType: 'AttachmentAdded',
aggregateId: record.id,
userId: record.userId,
attachmentId: attachment.id,
timestamp: new Date(),
};
}
}
/**
* 值对象:健康数据
*/
export class HealthData {
constructor(
public readonly type: HealthDataType,
public readonly value: number,
public readonly unit: string,
public readonly referenceRange?: ReferenceRange
) {}
/**
* 检查是否在正常范围
*/
isNormal(): boolean {
if (!this.referenceRange) return true;
return this.value >= this.referenceRange.min &&
this.value <= this.referenceRange.max;
}
/**
* 获取异常级别
*/
getAbnormalLevel(): 'normal' | 'warning' | 'critical' {
if (!this.referenceRange) return 'normal';
if (this.value < this.referenceRange.criticalMin ||
this.value > this.referenceRange.criticalMax) {
return 'critical';
}
if (this.value < this.referenceRange.min ||
this.value > this.referenceRange.max) {
return 'warning';
}
return 'normal';
}
equals(other: HealthData): boolean {
return this.type === other.type &&
this.value === other.value &&
this.unit === other.unit;
}
}
/**
* 实体:健康记录
*/
export class HealthRecord {
private readonly _id: string;
private readonly _userId: string;
private readonly _recordType: HealthRecordType;
private _data: HealthData[];
private readonly _recordedAt: Date;
private _updatedAt: Date;
private _deletedAt: Date | null;
private readonly _attachments: Attachment[];
private constructor(props: HealthRecordProps) {
this._id = props.id;
this._userId = props.userId;
this._recordType = props.recordType;
this._data = props.data;
this._recordedAt = props.recordedAt;
this._updatedAt = props.updatedAt;
this._deletedAt = props.deletedAt;
this._attachments = props.attachments || [];
}
static create(props: CreateHealthRecordProps): HealthRecord {
return new HealthRecord({
id: generateId(),
userId: props.userId,
recordType: props.recordType,
data: props.data,
recordedAt: props.recordedAt,
updatedAt: new Date(),
deletedAt: null,
});
}
get id(): string {
return this._id;
}
get userId(): string {
return this._userId;
}
get data(): HealthData[] {
return [...this._data];
}
update(props: Partial<UpdateProps>): void {
if (this._deletedAt) {
throw new Error('无法修改已删除的记录');
}
if (props.data) {
this._data = props.data;
}
this._updatedAt = new Date();
}
softDelete(reason: string, deletedBy: string): void {
this._deletedAt = new Date();
}
addAttachment(props: AttachmentProps): Attachment {
const attachment = Attachment.create({
healthRecordId: this.id,
...props,
});
this._attachments.push(attachment);
return attachment;
}
isDeleted(): boolean {
return this._deletedAt !== null;
}
}
Code collapsed
定义领域服务
code
// health-record/domain/health-analyzer.domain-service.ts
/**
* 领域服务:健康数据分析
* 跨聚合的复杂业务逻辑
*/
export class HealthAnalyzerDomainService {
/**
* 分析健康趋势
*/
analyzeTrend(
records: HealthRecord[]
): HealthTrendAnalysis {
if (records.length < 2) {
throw new Error('需要至少 2 条记录来分析趋势');
}
const sortedRecords = records.sort((a, b) =>
a.recordedAt.getTime() - b.recordedAt.getTime()
);
const trends: Record<string, TrendDirection> = {};
// 按数据类型分组
const dataByType = this.groupDataByType(sortedRecords);
for (const [type, dataPoints] of Object.entries(dataByType)) {
if (dataPoints.length < 2) continue;
const trend = this.calculateTrend(dataPoints);
trends[type] = trend;
}
return {
period: {
start: sortedRecords[0].recordedAt,
end: sortedRecords[sortedRecords.length - 1].recordedAt,
},
trends,
overall: this.calculateOverallTrend(trends),
};
}
/**
* 检测异常值
*/
detectAnomalies(
records: HealthRecord[]
): AnomalyDetectionResult {
const anomalies: DetectedAnomaly[] = [];
for (const record of records) {
for (const data of record.data) {
const level = data.getAbnormalLevel();
if (level !== 'normal') {
anomalies.push({
recordId: record.id,
dataType: data.type,
value: data.value,
unit: data.unit,
level,
detectedAt: new Date(),
});
}
}
}
return {
total: anomalies.length,
critical: anomalies.filter(a => a.level === 'critical').length,
warning: anomalies.filter(a => a.level === 'warning').length,
anomalies,
};
}
/**
* 生成健康报告
*/
generateReport(
userId: string,
records: HealthRecord[],
userBaseline?: HealthBaseline
): HealthReport {
const trend = this.analyzeTrend(records);
const anomalies = this.detectAnomalies(records);
const insights = this.generateInsights(trend, anomalies, userBaseline);
return {
id: generateId(),
userId,
generatedAt: new Date(),
period: trend.period,
trend: trend.overall,
anomalies: anomalies.total,
criticalAlerts: anomalies.critical,
warnings: anomalies.warning,
insights,
recommendations: this.generateRecommendations(insights),
};
}
private groupDataByType(
records: HealthRecord[]
): Record<string, number[]> {
const grouped: Record<string, number[]> = {};
for (const record of records) {
for (const data of record.data) {
if (!grouped[data.type]) {
grouped[data.type] = [];
}
grouped[data.type].push(data.value);
}
}
return grouped;
}
private calculateTrend(values: number[]): TrendDirection {
// 简单线性回归
const n = values.length;
const sumX = (n * (n - 1)) / 2;
const sumY = values.reduce((a, b) => a + b, 0);
const sumXY = values.reduce((sum, y, x) => sum + x * y, 0);
const sumX2 = (n * (n - 1) * (2 * n - 1)) / 6;
const slope = (n * sumXY - sumX * sumY) / (n * sumX2 - sumX * sumX);
if (slope > 0.1) return 'increasing';
if (slope < -0.1) return 'decreasing';
return 'stable';
}
private calculateOverallTrend(
trends: Record<string, TrendDirection>
): TrendDirection {
const values = Object.values(trends);
const increasing = values.filter(v => v === 'increasing').length;
const decreasing = values.filter(v => v === 'decreasing').length;
if (increasing > decreasing) return 'improving';
if (decreasing > increasing) return 'declining';
return 'stable';
}
private generateInsights(
trend: HealthTrendAnalysis,
anomalies: AnomalyDetectionResult,
baseline?: HealthBaseline
): Insight[] {
const insights: Insight[] = [];
// 趋势洞察
for (const [type, direction] of Object.entries(trend.trends)) {
insights.push({
type: 'trend',
category: type,
message: this.formatTrendMessage(type, direction),
priority: direction === 'declining' ? 'high' : 'medium',
});
}
// 异常洞察
for (const anomaly of anomalies.anomalies) {
if (anomaly.level === 'critical') {
insights.push({
type: 'alert',
category: anomaly.dataType,
message: `${anomaly.dataType} 值异常: ${anomaly.value} ${anomaly.unit}`,
priority: 'critical',
});
}
}
return insights.sort((a, b) => {
const priorityOrder = { critical: 3, high: 2, medium: 1, low: 0 };
return priorityOrder[b.priority] - priorityOrder[a.priority];
});
}
private generateRecommendations(insights: Insight[]): Recommendation[] {
const recommendations: Recommendation[] = [];
for (const insight of insights) {
if (insight.type === 'alert' || insight.priority === 'high') {
recommendations.push({
action: 'consult_healthcare_provider',
category: insight.category,
priority: 'high',
message: `建议咨询医生关于 ${insight.category} 的情况`,
});
}
}
return recommendations;
}
private formatTrendMessage(type: string, direction: TrendDirection): string {
const messages = {
increasing: `${type} 呈上升趋势`,
decreasing: `${type} 呈下降趋势`,
stable: `${type} 保持稳定`,
improving: `${type} 状况改善`,
declining: `${type} 状况恶化`,
};
return messages[direction];
}
}
Code collapsed
2. 服务拆分策略
绞杀者模式(Strangler Fig)
code
// migration/strangler-fig-pattern.ts
/**
* 绞杀者模式实现
* 逐步将单体应用的功能迁移到微服务
*/
export class StranglerFigMigration {
private routes: Map<string, RouteHandler> = new Map();
private migrationConfig: MigrationConfig;
constructor(config: MigrationConfig) {
this.migrationConfig = config;
}
/**
* 注册迁移路由
*/
registerMigrationRoute(
path: string,
target: 'monolith' | 'microservice',
microserviceUrl?: string
): void {
this.routes.set(path, {
path,
target,
microserviceUrl,
});
}
/**
* 路由请求到适当的处理程序
*/
async routeRequest(
req: Request
): Promise<Response> {
const path = new URL(req.url).pathname;
const route = this.routes.get(path);
if (!route) {
// 默认转发到单体应用
return this.forwardToMonolith(req);
}
switch (route.target) {
case 'microservice':
return this.forwardToMicroservice(req, route.microserviceUrl!);
case 'monolith':
return this.forwardToMonolith(req);
default:
return new Response('Service not available', { status: 503 });
}
}
/**
* 转发到微服务
*/
private async forwardToMicroservice(
req: Request,
microserviceUrl: string
): Promise<Response> {
const url = new URL(req.url);
const targetUrl = `${microserviceUrl}${url.pathname}${url.search}`;
const response = await fetch(targetUrl, {
method: req.method,
headers: req.headers,
body: req.body,
});
return response;
}
/**
* 转发到单体应用
*/
private async forwardToMonolith(req: Request): Promise<Response> {
const monolithUrl = this.migrationConfig.monolithUrl;
const url = new URL(req.url);
const targetUrl = `${monolithUrl}${url.pathname}${url.search}`;
const response = await fetch(targetUrl, {
method: req.method,
headers: req.headers,
body: req.body,
});
return response;
}
/**
* 获取迁移状态
*/
getMigrationStatus(): MigrationStatus {
const total = this.routes.size;
const migrated = Array.from(this.routes.values())
.filter(r => r.target === 'microservice').length;
return {
total,
migrated,
remaining: total - migrated,
percentage: total > 0 ? (migrated / total) * 100 : 0,
};
}
}
// 迁移配置示例
const migrationConfig = {
monolithUrl: 'http://localhost:3000',
featureFlags: {
healthRecords: true,
appointments: false,
prescriptions: true,
},
};
const strangler = new StranglerFigMigration(migrationConfig);
// 注册迁移路由
strangler.registerMigrationRoute(
'/api/health-records/*',
'microservice',
'http://health-records-service:3001'
);
strangler.registerMigrationRoute(
'/api/prescriptions/*',
'microservice',
'http://prescriptions-service:3002'
);
// 单体应用仍处理
strangler.registerMigrationRoute(
'/api/appointments/*',
'monolith'
);
Code collapsed
数据库拆分策略
code
-- =============================================================================
-- 数据库拆分:共享数据库 → 独立数据库
-- =============================================================================
-- 阶段 1: 在单体数据库中添加服务标识
ALTER TABLE health_records ADD COLUMN service_id VARCHAR(50);
ALTER TABLE lab_results ADD COLUMN service_id VARCHAR(50);
ALTER TABLE prescriptions ADD COLUMN service_id VARCHAR(50);
-- 为现有数据标记服务
UPDATE health_records SET service_id = 'health-records-service' WHERE service_id IS NULL;
UPDATE lab_results SET service_id = 'health-records-service' WHERE service_id IS NULL;
UPDATE prescriptions SET service_id = 'prescriptions-service' WHERE service_id IS NULL;
-- 阶段 2: 创建变更数据捕获(CDC)
-- 使用 Debezium 或自定义 CDC 解决方案捕获变更
-- 阶段 3: 数据同步
-- 将数据从单体数据库同步到新的服务数据库
-- 阶段 4: 验证数据一致性
-- 比较源数据库和目标数据库的数据
-- 阶段 5: 切换读写
-- 将应用程序的读写操作切换到新数据库
-- 阶段 6: 清理
-- 从单体数据库中删除已迁移的数据
Code collapsed
3. 事件驱动架构
事件定义
code
// common/events/domain-events.ts
/**
* 基础领域事件
*/
export abstract class DomainEvent {
public readonly occurredAt: Date;
public readonly eventId: string;
public readonly aggregateId: string;
public readonly aggregateVersion: number;
constructor(props: DomainEventProps) {
this.eventId = generateId();
this.occurredAt = new Date();
this.aggregateId = props.aggregateId;
this.aggregateVersion = props.aggregateVersion;
}
abstract getEventType(): string;
}
/**
* 健康记录相关事件
*/
export class HealthRecordCreatedEvent extends DomainEvent {
constructor(
public readonly userId: string,
public readonly recordType: HealthRecordType,
public readonly data: HealthData[],
aggregateId: string,
aggregateVersion: number = 1
) {
super({ aggregateId, aggregateVersion });
}
getEventType(): string {
return 'HealthRecordCreated';
}
}
export class HealthRecordUpdatedEvent extends DomainEvent {
constructor(
public readonly userId: string,
public readonly changes: Record<string, any>,
aggregateId: string,
aggregateVersion: number
) {
super({ aggregateId, aggregateVersion });
}
getEventType(): string {
return 'HealthRecordUpdated';
}
}
export class CriticalHealthValueDetectedEvent extends DomainEvent {
constructor(
public readonly userId: string,
public readonly valueType: string,
public readonly value: number,
public readonly severity: 'warning' | 'critical',
public readonly detectedAt: Date,
aggregateId: string,
aggregateVersion: number
) {
super({ aggregateId, aggregateVersion });
}
getEventType(): string {
return 'CriticalHealthValueDetected';
}
}
/**
* 用户相关事件
*/
export class UserRegisteredEvent extends DomainEvent {
constructor(
public readonly userId: string,
public readonly email: string,
public readonly name: string,
aggregateId: string,
aggregateVersion: number = 1
) {
super({ aggregateId, aggregateVersion });
}
getEventType(): string {
return 'UserRegistered';
}
}
export class UserPreferencesUpdatedEvent extends DomainEvent {
constructor(
public readonly userId: string,
public readonly preferences: Record<string, any>,
aggregateId: string,
aggregateVersion: number
) {
super({ aggregateId, aggregateVersion });
}
getEventType(): string {
return 'UserPreferencesUpdated';
}
}
/**
* 告警相关事件
*/
export class AlertCreatedEvent extends DomainEvent {
constructor(
public readonly userId: string,
public readonly alertType: string,
public readonly severity: 'info' | 'warning' | 'critical',
public readonly message: string,
public readonly metadata: Record<string, any>,
aggregateId: string,
aggregateVersion: number = 1
) {
super({ aggregateId, aggregateVersion });
}
getEventType(): string {
return 'AlertCreated';
}
}
Code collapsed
事件总线实现
code
// infrastructure/messaging/event-bus.ts
/**
* 事件总线接口
*/
export interface IEventBus {
publish(event: DomainEvent): Promise<void>;
subscribe(eventType: string, handler: EventHandler): Promise<void>;
unsubscribe(eventType: string, handler: EventHandler): Promise<void>;
}
type EventHandler = (event: DomainEvent) => Promise<void>;
/**
* RabbitMQ 事件总线实现
*/
export class RabbitMqEventBus implements IEventBus {
private connection: Connection;
private channel: Channel;
private exchanges: Set<string> = new Set();
private handlers: Map<string, Set<EventHandler>> = new Map();
constructor(private config: RabbitMqConfig) {}
async connect(): Promise<void> {
this.connection = await amqp.connect(this.config.url);
this.channel = await this.connection.createChannel();
// 设置预取(每个消费者未确认消息数)
await this.channel.prefetch(this.config.prefetch || 10);
}
async publish(event: DomainEvent): Promise<void> {
const eventType = event.getEventType();
const exchange = this.getExchangeName(eventType);
// 确保交换机存在
await this.ensureExchange(exchange);
const message = {
eventId: event.eventId,
eventType,
aggregateId: event.aggregateId,
aggregateVersion: event.aggregateVersion,
occurredAt: event.occurredAt,
data: event,
};
await this.channel.publish(
exchange,
eventType, // routing key
Buffer.from(JSON.stringify(message)),
{
persistent: true,
messageId: event.eventId,
timestamp: Math.floor(event.occurredAt.getTime() / 1000),
}
);
}
async subscribe(eventType: string, handler: EventHandler): Promise<void> {
const exchange = this.getExchangeName(eventType);
const queueName = this.getQueueName(eventType);
await this.ensureExchange(exchange);
await this.ensureQueue(queueName);
// 绑定队列到交换机
await this.channel.bindQueue(queueName, exchange, eventType);
// 注册处理器
if (!this.handlers.has(eventType)) {
this.handlers.set(eventType, new Set());
}
this.handlers.get(eventType)!.add(handler);
// 开始消费
await this.channel.consume(queueName, async (msg) => {
if (!msg) return;
try {
const content = JSON.parse(msg.content.toString());
const event = this.deserializeEvent(content);
await handler(event);
// 确认消息
this.channel.ack(msg);
} catch (error) {
console.error(`处理事件 ${eventType} 时出错:`, error);
// 拒绝消息并重新入队
this.channel.nack(msg, false, true);
}
});
}
async unsubscribe(eventType: string, handler: EventHandler): Promise<void> {
const handlers = this.handlers.get(eventType);
if (handlers) {
handlers.delete(handler);
}
}
private async ensureExchange(exchange: string): Promise<void> {
if (this.exchanges.has(exchange)) return;
await this.channel.assertExchange(exchange, 'topic', {
durable: true,
});
this.exchanges.add(exchange);
}
private async ensureQueue(queue: string): Promise<void> {
await this.channel.assertQueue(queue, {
durable: true,
arguments: {
'x-message-ttl': 86400000, // 24 小时
'x-max-length': 10000, // 最大消息数
},
});
}
private getExchangeName(eventType: string): string {
// 按业务域分组交换机
const domain = eventType.split('.')[0];
return `${domain}.events`;
}
private getQueueName(eventType: string): string {
return `${eventType}.queue`;
}
private deserializeEvent(content: any): DomainEvent {
// 根据事件类型反序列化
// 实际实现需要事件注册表
return content as DomainEvent;
}
async disconnect(): Promise<void> {
await this.channel.close();
await this.connection.close();
}
}
Code collapsed
事件处理器
code
// notification-service/handlers/health-event-handlers.ts
/**
* 健康记录事件处理器
* 生成告警和通知
*/
export class HealthEventHandlers {
constructor(
private alertService: AlertService,
private notificationService: NotificationService
) {}
/**
* 处理健康记录创建事件
*/
async handleHealthRecordCreated(event: HealthRecordCreatedEvent): Promise<void> {
// 检查是否有临界值
for (const data of event.data) {
const abnormalLevel = data.getAbnormalLevel();
if (abnormalLevel !== 'normal') {
// 创建告警
await this.alertService.createAlert({
userId: event.userId,
alertType: data.type,
severity: abnormalLevel,
message: this.formatAbnormalMessage(data, abnormalLevel),
sourceRecordId: event.aggregateId,
});
// 发送通知
if (abnormalLevel === 'critical') {
await this.notificationService.sendImmediateNotification({
userId: event.userId,
type: 'health_alert',
severity: 'critical',
message: this.formatAbnormalMessage(data, abnormalLevel),
});
}
}
}
}
/**
* 处理临界健康值检测事件
*/
async handleCriticalHealthValue(event: CriticalHealthValueDetectedEvent): Promise<void> {
// 创建告警
await this.alertService.createAlert({
userId: event.userId,
alertType: event.valueType,
severity: event.severity,
message: `检测到临界 ${event.valueType} 值: ${event.value}`,
sourceEventId: event.eventId,
});
// 发送紧急通知
await this.notificationService.sendImmediateNotification({
userId: event.userId,
type: 'critical_health_alert',
severity: 'critical',
message: `您的 ${event.valueType} 检测到临界值,请立即关注`,
});
// 通知紧急联系人
await this.notificationService.notifyEmergencyContacts({
userId: event.userId,
alertType: event.valueType,
severity: event.severity,
});
}
private formatAbnormalMessage(data: HealthData, level: string): string {
if (level === 'critical') {
return `严重异常:${data.type} 值为 ${data.value} ${data.unit},超出正常范围`;
}
return `注意:${data.type} 值为 ${data.value} ${data.unit},略偏离正常范围`;
}
}
// 注册事件处理器
const eventBus = new RabbitMqEventBus(rabbitMqConfig);
const handlers = new HealthEventHandlers(alertService, notificationService);
await eventBus.subscribe('HealthRecordCreated', handlers.handleHealthRecordCreated.bind(handlers));
await eventBus.subscribe('CriticalHealthValueDetected', handlers.handleCriticalHealthValue.bind(handlers));
Code collapsed
4. 数据一致性
Saga 模式实现
code
// infrastructure/sagas/create-health-record-saga.ts
/**
* Saga:创建健康记录
* 协调多个服务完成复杂业务流程
*/
export class CreateHealthRecordSaga {
constructor(
private healthRecordService: HealthRecordService,
private analyticsService: AnalyticsService,
private notificationService: NotificationService,
private auditLogService: AuditLogService
) {}
async execute(command: CreateHealthRecordCommand): Promise<SagaResult> {
const sagaId = generateId();
const steps: SagaStep[] = [];
try {
// 步骤 1: 创建健康记录
const recordResult = await this.executeStep(
sagaId,
'create_health_record',
async () => {
return await this.healthRecordService.createRecord(command);
},
async (result) => {
// 补偿:删除记录
await this.healthRecordService.deleteRecord(result.recordId);
}
);
steps.push(recordResult);
// 步骤 2: 更新分析数据
const analyticsResult = await this.executeStep(
sagaId,
'update_analytics',
async () => {
return await this.analyticsService.updateHealthData(command.userId, recordResult.data);
},
async (result) => {
// 补偿:回滚分析数据
await this.analyticsService.rollbackHealthData(command.userId, result.data);
}
);
steps.push(analyticsResult);
// 步骤 3: 发送通知(如果需要)
if (command.shouldNotify) {
const notificationResult = await this.executeStep(
sagaId,
'send_notification',
async () => {
return await this.notificationService.sendHealthRecordNotification(recordResult.data);
},
async (result) => {
// 补偿:无(通知失败不影响主流程)
}
);
steps.push(notificationResult);
}
// 步骤 4: 记录审计日志
await this.executeStep(
sagaId,
'log_audit',
async () => {
return await this.auditLogService.logAction({
action: 'CREATE_HEALTH_RECORD',
userId: command.userId,
resourceId: recordResult.data.recordId,
details: command,
});
},
async (result) => {
// 补偿:无(审计日志失败不影响主流程)
}
);
return {
sagaId,
status: 'completed',
steps,
result: recordResult.data,
};
} catch (error) {
// 执行补偿
await this.compensate(steps.reverse());
return {
sagaId,
status: 'failed',
steps,
error: error.message,
};
}
}
private async executeStep<T>(
sagaId: string,
stepName: string,
action: () => Promise<T>,
compensate: (result: T) => Promise<void>
): Promise<SagaStep> {
try {
const result = await action();
return {
name: stepName,
status: 'completed',
result,
compensate: () => compensate(result),
};
} catch (error) {
return {
name: stepName,
status: 'failed',
error: error.message,
compensate: () => Promise.resolve(),
};
}
}
private async compensate(steps: SagaStep[]): Promise<void> {
for (const step of steps) {
if (step.status === 'completed') {
try {
await step.compensate();
} catch (error) {
console.error(`补偿步骤 ${step.name} 失败:`, error);
}
}
}
}
}
Code collapsed
分布式事务
code
// infrastructure/transactions/distributed-transaction.ts
/**
* 分布式事务协调器
*/
export class DistributedTransactionCoordinator {
private transactions: Map<string, DistributedTransaction> = new Map();
/**
* 开始新事务
*/
beginTransaction(
participants: TransactionParticipant[]
): string {
const transactionId = generateId();
const transaction: DistributedTransaction = {
id: transactionId,
status: 'pending',
participants: participants.map(p => ({
...p,
status: 'pending',
})),
startedAt: new Date(),
};
this.transactions.set(transactionId, transaction);
return transactionId;
}
/**
* 两阶段提交:准备阶段
*/
async prepare(transactionId: string): Promise<boolean> {
const transaction = this.transactions.get(transactionId);
if (!transaction) {
throw new Error('事务不存在');
}
transaction.status = 'preparing';
// 请求所有参与者准备
for (const participant of transaction.participants) {
try {
const prepared = await participant.prepare();
if (prepared) {
participant.status = 'prepared';
} else {
// 准备失败,取消其他参与者
await this.rollback(transactionId);
return false;
}
} catch (error) {
participant.status = 'failed';
participant.error = error.message;
await this.rollback(transactionId);
return false;
}
}
transaction.status = 'prepared';
return true;
}
/**
* 两阶段提交:提交阶段
*/
async commit(transactionId: string): Promise<void> {
const transaction = this.transactions.get(transactionId);
if (!transaction || transaction.status !== 'prepared') {
throw new Error('事务未准备好');
}
transaction.status = 'committing';
// 提交所有参与者
for (const participant of transaction.participants) {
try {
await participant.commit();
participant.status = 'committed';
} catch (error) {
participant.status = 'failed';
participant.error = error.message;
// 记录错误但继续提交其他参与者
console.error(`提交参与者 ${participant.name} 失败:`, error);
}
}
const allCommitted = transaction.participants.every(
p => p.status === 'committed'
);
transaction.status = allCommitted ? 'committed' : 'partially_committed';
transaction.completedAt = new Date();
}
/**
* 回滚事务
*/
async rollback(transactionId: string): Promise<void> {
const transaction = this.transactions.get(transactionId);
if (!transaction) {
throw new Error('事务不存在');
}
transaction.status = 'rolling_back';
// 回滚所有已准备的参与者
for (const participant of transaction.participants) {
if (participant.status === 'prepared') {
try {
await participant.rollback();
participant.status = 'rolled_back';
} catch (error) {
participant.status = 'rollback_failed';
participant.error = error.message;
}
}
}
transaction.status = 'rolled_back';
transaction.completedAt = new Date();
}
}
/**
* 事务参与者接口
*/
export interface TransactionParticipant {
name: string;
service: string;
prepare(): Promise<boolean>;
commit(): Promise<void>;
rollback(): Promise<void>;
}
Code collapsed
5. API 网关
网关路由配置
code
// api-gateway/routes/gateway-routes.ts
export const gatewayRoutes = [
// 健康记录服务
{
path: '/api/health-records/*',
service: 'health-records-service',
timeout: 30000,
rateLimit: {
windowMs: 60000,
maxRequests: 100,
},
auth: {
required: true,
scopes: ['health:read', 'health:write'],
},
},
// 处方服务
{
path: '/api/prescriptions/*',
service: 'prescriptions-service',
timeout: 30000,
rateLimit: {
windowMs: 60000,
maxRequests: 50,
},
auth: {
required: true,
scopes: ['prescription:read', 'prescription:write'],
},
},
// 告警服务
{
path: '/api/alerts/*',
service: 'alert-service',
timeout: 10000,
rateLimit: {
windowMs: 60000,
maxRequests: 200,
},
auth: {
required: true,
scopes: ['alert:read'],
},
},
// 分析服务
{
path: '/api/analytics/*',
service: 'analytics-service',
timeout: 60000,
rateLimit: {
windowMs: 60000,
maxRequests: 20,
},
auth: {
required: true,
scopes: ['analytics:read'],
},
cache: {
enabled: true,
ttl: 300, // 5 分钟
},
},
];
Code collapsed
网关中间件
code
// api-gateway/middleware/compose-middleware.ts
import { Request, Response, NextFunction } from 'express';
/**
* 组合中间件
*/
export function composeMiddleware(
...middlewares: Array<(req: Request, res: Response, next: NextFunction) => void>
) {
return (req: Request, res: Response, next: NextFunction) => {
let index = 0;
function dispatch(i: number): void {
if (i <= index) {
return next(new Error('next() called multiple times'));
}
index = i;
if (i === middlewares.length) {
return next();
}
const middleware = middlewares[i];
try {
middleware(req, res, (err?: any) => {
if (err) return next(err);
dispatch(i + 1);
});
} catch (err) {
next(err);
}
}
dispatch(0);
};
}
/**
* 认证中间件
*/
export function authMiddleware(requiredScopes: string[] = []) {
return async (req: Request, res: Response, next: NextFunction) => {
try {
const token = req.headers.authorization?.replace('Bearer ', '');
if (!token) {
return res.status(401).json({ error: 'Unauthorized' });
}
const decoded = await verifyToken(token);
const userScopes = decoded.scopes || [];
const hasRequiredScopes = requiredScopes.every(scope =>
userScopes.includes(scope)
);
if (!hasRequiredScopes) {
return res.status(403).json({ error: 'Forbidden' });
}
req.user = decoded;
next();
} catch (error) {
res.status(401).json({ error: 'Invalid token' });
}
};
}
/**
* 速率限制中间件
*/
export function rateLimitMiddleware(config: { windowMs: number; maxRequests: number }) {
const requests = new Map<string, number[]>();
return (req: Request, res: Response, next: NextFunction) => {
const key = req.ip || req.socket.remoteAddress || 'unknown';
const now = Date.now();
// 清理过期记录
const windowStart = now - config.windowMs;
let userRequests = requests.get(key) || [];
userRequests = userRequests.filter(time => time > windowStart);
// 检查限制
if (userRequests.length >= config.maxRequests) {
return res.status(429).json({
error: 'Too many requests',
retryAfter: Math.ceil(config.windowMs / 1000),
});
}
// 记录请求
userRequests.push(now);
requests.set(key, userRequests);
// 设置速率限制头
res.setHeader('X-RateLimit-Limit', config.maxRequests);
res.setHeader('X-RateLimit-Remaining', config.maxRequests - userRequests.length);
res.setHeader('X-RateLimit-Reset', new Date(now + config.windowMs).toISOString());
next();
};
}
/**
* 超时中间件
*/
export function timeoutMiddleware(ms: number) {
return (req: Request, res: Response, next: NextFunction) => {
const timeout = setTimeout(() => {
next(new Error('Request timeout'));
}, ms);
res.on('finish', () => clearTimeout(timeout));
next();
};
}
Code collapsed
迁移清单
准备阶段
- 绘制当前系统架构图
- 识别业务边界上下文
- 定义服务拆分策略
- 评估技术栈需求
- 制定迁移计划
实施阶段
- 建立 API 网关
- 设置事件总线
- 创建新服务数据库
- 实施绞杀者模式
- 迁移数据和功能
验证阶段
- 功能测试
- 性能测试
- 数据一致性验证
- 故障恢复测试
- 安全审计
优化阶段
- 监控和告警
- 日志聚合
- 链路追踪
- 容量规划
- 成本优化
参考资料
免责声明:微服务架构增加了系统复杂度。在重构前请仔细评估业务需求、团队能力和技术成熟度。