CQRS 模式实践:构建可扩展的实时分析系统
概述
CQRS(Command Query Responsibility Segregation,命令查询责任分离)是一种将系统读写操作分离的架构模式。对于需要处理大量数据分析的健康平台,CQRS 可以显著提升性能和可扩展性。
为什么使用 CQRS?
| 传统架构 | CQRS 架构 |
|---|---|
| 读写使用相同模型 | 读写使用不同模型 |
| 单一数据存储 | 读写分离存储 |
| 查询影响写性能 | 读写独立优化 |
| 难以应对复杂查询 | 专用读模型 |
| 扩展受限 | 独立扩展读写 |
适用场景
- 高读写比:读取远多于写入
- 复杂查询:需要聚合、分组、分析
- 实时分析:需要即时统计和报告
- 多数据源:整合来自不同服务的数据
- 性能要求高:读写性能要求不同
1. CQRS 基础架构
核心组件
code
// cqrs/core/command.ts
/**
* 命令:表示写操作的意图
*/
export interface ICommand<T extends ICommandResult> {
id: string;
aggregateId: string;
timestamp: Date;
expectedVersion?: number;
}
export interface ICommandResult {
success: boolean;
events: DomainEvent[];
errors?: string[];
}
/**
* 命令处理器接口
*/
export interface ICommandHandler<T extends ICommand<any>> {
handle(command: T): Promise<ICommandResult>;
}
/**
* 命令总线
*/
export class CommandBus {
private handlers: Map<string, ICommandHandler<any>> = new Map();
register<T extends ICommand<any>>(
commandName: string,
handler: ICommandHandler<T>
): void {
this.handlers.set(commandName, handler);
}
async dispatch<T extends ICommand<any>>(command: T): Promise<ICommandResult> {
const commandName = command.constructor.name;
const handler = this.handlers.get(commandName);
if (!handler) {
throw new Error(`未找到命令处理器: ${commandName}`);
}
return await handler.handle(command);
}
async dispatchBatch<T extends ICommand<any>>(
commands: T[]
): Promise<ICommandResult[]> {
return await Promise.all(
commands.map(cmd => this.dispatch(cmd))
);
}
}
// cqrs/core/query.ts
/**
* 查询:表示读操作的请求
*/
export interface IQuery<T> {
params: any;
}
/**
* 查询处理器接口
*/
export interface IQueryHandler<T extends IQuery<any>, R> {
handle(query: T): Promise<R>;
}
/**
* 查询总线
*/
export class QueryBus {
private handlers: Map<string, IQueryHandler<any, any>> = new Map();
register<T extends IQuery<any>, R>(
queryName: string,
handler: IQueryHandler<T, R>
): void {
this.handlers.set(queryName, handler);
}
async execute<T extends IQuery<any>, R>(
queryName: string,
query: T
): Promise<R> {
const handler = this.handlers.get(queryName);
if (!handler) {
throw new Error(`未找到查询处理器: ${queryName}`);
}
return await handler.handle(query);
}
async executeBatch<T extends IQuery<any>, R>(
queries: Array<{ name: string; query: T }>
): Promise<R[]> {
return await Promise.all(
queries.map(({ name, query }) => this.execute(name, query))
);
}
}
Code collapsed
2. 命令端实现
命令定义
code
// health-records/commands/create-health-record.command.ts
export interface CreateHealthRecordCommand extends ICommand<CreateHealthRecordResult> {
id: string;
aggregateId: string; // userId
recordType: HealthRecordType;
data: HealthDataInput[];
source: DataSource;
recordedAt?: Date;
}
export interface CreateHealthRecordResult extends ICommandResult {
recordId: string;
}
// health-records/commands/update-health-data.command.ts
export interface UpdateHealthDataCommand extends ICommand<UpdateHealthDataResult> {
id: string;
aggregateId: string; // recordId
data: HealthDataInput[];
updatedAt?: Date;
}
export interface UpdateHealthDataResult extends ICommandResult {
recordId: string;
previousData: HealthData[];
newData: HealthData[];
}
// health-records/commands/delete-health-record.command.ts
export interface DeleteHealthRecordCommand extends ICommand<DeleteHealthRecordResult> {
id: string;
aggregateId: string; // recordId
reason: string;
deletedBy: string;
}
export interface DeleteHealthRecordResult extends ICommandResult {
recordId: string;
deletedAt: Date;
}
Code collapsed
命令处理器
code
// health-records/command-handlers/create-health-record.handler.ts
export class CreateHealthRecordCommandHandler
implements ICommandHandler<CreateHealthRecordCommand>
{
constructor(
private eventStore: IEventStore,
private repository: HealthRecordRepository
) {}
async handle(command: CreateHealthRecordCommand): Promise<CreateHealthRecordResult> {
try {
// 1. 加载聚合(用户健康记录聚合)
const aggregate = await this.repository.loadUserHealthAggregate(command.aggregateId);
// 2. 执行业务逻辑
const result = aggregate.createRecord({
recordType: command.recordType,
data: command.data.map(d => new HealthData(d.type, d.value, d.unit)),
source: command.source,
recordedAt: command.recordedAt || new Date(),
});
// 3. 保存事件
await this.eventStore.appendEvents(command.aggregateId, result.events);
// 4. 发布事件到事件总线
for (const event of result.events) {
await this.eventBus.publish(event);
}
return {
success: true,
events: result.events,
recordId: result.recordId,
};
} catch (error) {
return {
success: false,
events: [],
errors: [error.message],
};
}
}
}
// health-records/command-handlers/update-health-data.handler.ts
export class UpdateHealthDataCommandHandler
implements ICommandHandler<UpdateHealthDataCommand>
{
constructor(
private eventStore: IEventStore,
private repository: HealthRecordRepository,
private validator: HealthDataValidator
) {}
async handle(command: UpdateHealthDataCommand): Promise<UpdateHealthDataResult> {
try {
// 1. 验证数据
const validationResult = await this.validator.validate(command.data);
if (!validationResult.isValid) {
return {
success: false,
events: [],
errors: validationResult.errors,
};
}
// 2. 加载聚合
const aggregate = await this.repository.load(command.aggregateId);
// 3. 执行更新
const result = aggregate.updateData({
data: command.data.map(d => new HealthData(d.type, d.value, d.unit)),
updatedAt: command.updatedAt || new Date(),
});
// 4. 保存事件
await this.eventStore.appendEvents(command.aggregateId, result.events);
// 5. 发布事件
for (const event of result.events) {
await this.eventBus.publish(event);
}
return {
success: true,
events: result.events,
recordId: command.aggregateId,
previousData: result.previousData,
newData: result.newData,
};
} catch (error) {
return {
success: false,
events: [],
errors: [error.message],
};
}
}
}
Code collapsed
3. 查询端实现
读模型设计
code
-- =============================================================================
-- CQRS 读模型:物化视图
-- =============================================================================
-- 用户健康数据摘要
CREATE MATERIALIZED VIEW user_health_summary_mv AS
SELECT
u.id as user_id,
u.tenant_id,
COUNT(DISTINCT hr.id) as total_records,
MAX(hr.recorded_at) as last_record_at,
COUNT(DISTINCT CASE WHEN hr.record_type = 'blood_pressure' THEN hr.id END) as blood_pressure_count,
COUNT(DISTINCT CASE WHEN hr.record_type = 'heart_rate' THEN hr.id END) as heart_rate_count,
COUNT(DISTINCT CASE WHEN hr.record_type = 'blood_glucose' THEN hr.id END) as blood_glucose_count,
COUNT(DISTINCT CASE WHEN hr.record_type = 'weight' THEN hr.id END) as weight_count,
-- 最新读数
(SELECT json_agg(data)
FROM health_record_data hrd
WHERE hrd.record_id = (SELECT id FROM health_records hr2 WHERE hr2.user_id = u.id ORDER BY hr2.recorded_at DESC LIMIT 1)
) as latest_readings
FROM users u
LEFT JOIN health_records hr ON hr.user_id = u.id AND hr.deleted_at IS NULL
GROUP BY u.id, u.tenant_id;
-- 健康趋势分析(按时间聚合)
CREATE MATERIALIZED VIEW health_trends_mv AS
SELECT
user_id,
tenant_id,
record_type,
data_type,
DATE_TRUNC('day', recorded_at) as date,
AVG(data_value::numeric) as avg_value,
MIN(data_value::numeric) as min_value,
MAX(data_value::numeric) as max_value,
STDDEV(data_value::numeric) as stddev_value,
COUNT(*) as record_count
FROM (
SELECT
hr.user_id,
hr.tenant_id,
hr.record_type,
hrd.data_type,
hrd.data_value,
hr.recorded_at
FROM health_records hr
JOIN health_record_data hrd ON hrd.record_id = hr.id
WHERE hr.deleted_at IS NULL
) raw_data
GROUP BY user_id, tenant_id, record_type, data_type, DATE_TRUNC('day', recorded_at');
-- 健康告警统计
CREATE MATERIALIZED VIEW health_alerts_summary_mv AS
SELECT
user_id,
tenant_id,
DATE_TRUNC('day', created_at) as date,
COUNT(*) FILTER (WHERE severity = 'critical') as critical_count,
COUNT(*) FILTER (WHERE severity = 'warning') as warning_count,
COUNT(*) FILTER (WHERE severity = 'info') as info_count,
array_agg(DISTINCT alert_type) as alert_types
FROM health_alerts
WHERE resolved_at IS NULL
GROUP BY user_id, tenant_id, DATE_TRUNC('day', created_at);
-- 创建索引加速查询
CREATE INDEX idx_user_health_summary_user ON user_health_summary_mv(user_id);
CREATE INDEX idx_user_health_summary_tenant ON user_health_summary_mv(tenant_id);
CREATE INDEX idx_health_trends_user_date ON health_trends_mv(user_id, date DESC);
CREATE INDEX idx_health_alerts_summary_user_date ON health_alerts_summary_mv(user_id, date DESC);
-- 定期刷新物化视图
CREATE OR REPLACE FUNCTION refresh_read_models()
RETURNS VOID AS $$
BEGIN
REFRESH MATERIALIZED VIEW CONCURRENTLY user_health_summary_mv;
REFRESH MATERIALIZED VIEW CONCURRENTLY health_trends_mv;
REFRESH MATERIALIZED VIEW CONCURRENTLY health_alerts_summary_mv;
END;
$$ LANGUAGE plpgsql;
-- 设置定时任务(使用 pg_cron 或外部调度器)
-- SELECT cron.schedule('refresh-read-models', '*/5 * * * *', 'SELECT refresh_read_models()');
Code collapsed
查询处理器
code
// analytics/query-handlers/get-user-health-summary.handler.ts
export interface GetUserHealthSummaryQuery extends IQuery<UserHealthSummary> {
params: {
userId: string;
includeTrends?: boolean;
includeAlerts?: boolean;
};
}
export interface UserHealthSummary {
userId: string;
tenantId: string;
totalRecords: number;
lastRecordAt: Date;
recordCounts: Record<string, number>;
latestReadings: HealthDataReading[];
trends?: HealthTrend[];
alerts?: HealthAlertSummary;
}
export class GetUserHealthSummaryQueryHandler
implements IQueryHandler<GetUserHealthSummaryQuery, UserHealthSummary>
{
constructor(
private readDb: Database
) {}
async handle(query: GetUserHealthSummaryQuery): Promise<UserHealthSummary> {
const { userId, includeTrends, includeAlerts } = query.params;
// 从物化视图获取基本摘要
const summary = await this.readDb.oneOrNone(
`SELECT * FROM user_health_summary_mv WHERE user_id = $1`,
[userId]
);
if (!summary) {
throw new Error('用户健康摘要不存在');
}
const result: UserHealthSummary = {
userId: summary.user_id,
tenantId: summary.tenant_id,
totalRecords: summary.total_records,
lastRecordAt: summary.last_record_at,
recordCounts: {
bloodPressure: summary.blood_pressure_count,
heartRate: summary.heart_rate_count,
bloodGlucose: summary.blood_glucose_count,
weight: summary.weight_count,
},
latestReadings: summary.latest_readings || [],
};
// 可选:获取趋势数据
if (includeTrends) {
result.trends = await this.getTrends(userId);
}
// 可选:获取告警数据
if (includeAlerts) {
result.alerts = await this.getAlerts(userId);
}
return result;
}
private async getTrends(userId: string): Promise<HealthTrend[]> {
return await this.readDb.any(
`SELECT
data_type,
date,
avg_value,
min_value,
max_value
FROM health_trends_mv
WHERE user_id = $1
ORDER BY date DESC
LIMIT 30`,
[userId]
);
}
private async getAlerts(userId: string): Promise<HealthAlertSummary> {
return await this.readDb.one(
`SELECT
critical_count,
warning_count,
info_count,
alert_types
FROM health_alerts_summary_mv
WHERE user_id = $1
ORDER BY date DESC
LIMIT 1`,
[userId]
);
}
}
// analytics/query-handlers/get-health-analytics.handler.ts
export interface GetHealthAnalyticsQuery extends IQuery<HealthAnalytics> {
params: {
userId: string;
period: 'week' | 'month' | 'quarter' | 'year';
metrics: HealthMetricType[];
};
}
export interface HealthAnalytics {
userId: string;
period: AnalyticsPeriod;
metrics: MetricAnalytics[];
insights: AnalyticsInsight[];
generatedAt: Date;
}
export class GetHealthAnalyticsQueryHandler
implements IQueryHandler<GetHealthAnalyticsQuery, HealthAnalytics>
{
constructor(
private readDb: Database,
private analyzer: HealthAnalyzer
) {}
async handle(query: GetHealthAnalyticsQuery): Promise<HealthAnalytics> {
const { userId, period, metrics } = query.params;
const { startDate, endDate } = this.getPeriodDates(period);
// 并行获取所有指标数据
const metricsData = await Promise.all(
metrics.map(metric => this.getMetricData(userId, metric, startDate, endDate))
);
// 生成洞察
const insights = await this.analyzer.generateInsights(metricsData);
return {
userId,
period: { type: period, startDate, endDate },
metrics: metricsData,
insights,
generatedAt: new Date(),
};
}
private async getMetricData(
userId: string,
metric: HealthMetricType,
startDate: Date,
endDate: Date
): Promise<MetricAnalytics> {
const result = await this.readDb.oneOrNone(
`SELECT
data_type,
COUNT(*) as record_count,
AVG(data_value::numeric) as avg_value,
MIN(data_value::numeric) as min_value,
MAX(data_value::numeric) as max_value,
STDDEV(data_value::numeric) as stddev_value,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY data_value::numeric) as median_value
FROM (
SELECT hr.recorded_at, hrd.data_type, hrd.data_value
FROM health_records hr
JOIN health_record_data hrd ON hrd.record_id = hr.id
WHERE hr.user_id = $1
AND hrd.data_type = $2
AND hr.recorded_at BETWEEN $3 AND $4
AND hr.deleted_at IS NULL
) data
GROUP BY data_type`,
[userId, metric, startDate, endDate]
);
return {
metric,
recordCount: parseInt(result.record_count),
statistics: {
average: parseFloat(result.avg_value),
min: parseFloat(result.min_value),
max: parseFloat(result.max_value),
stdDev: parseFloat(result.stddev_value),
median: parseFloat(result.median_value),
},
};
}
private getPeriodDates(period: string): { startDate: Date; endDate: Date } {
const now = new Date();
const endDate = new Date(now.getFullYear(), now.getMonth(), now.getDate(), 23, 59, 59);
let startDate: Date;
switch (period) {
case 'week':
startDate = new Date(now);
startDate.setDate(now.getDate() - 7);
break;
case 'month':
startDate = new Date(now);
startDate.setMonth(now.getMonth() - 1);
break;
case 'quarter':
startDate = new Date(now);
startDate.setMonth(now.getMonth() - 3);
break;
case 'year':
startDate = new Date(now);
startDate.setFullYear(now.getFullYear() - 1);
break;
default:
startDate = new Date(now);
startDate.setDate(now.getDate() - 30);
}
startDate.setHours(0, 0, 0, 0);
return { startDate, endDate };
}
}
Code collapsed
复杂分析查询
code
// analytics/query-handlers/get-comparative-analytics.handler.ts
export interface GetComparativeAnalyticsQuery extends IQuery<ComparativeAnalytics> {
params: {
userIds: string[];
metricTypes: HealthMetricType[];
period: AnalyticsPeriod;
groupBy?: 'age' | 'gender' | 'region';
};
}
export interface ComparativeAnalytics {
period: AnalyticsPeriod;
users: UserAnalytics[];
aggregates: {
averages: Record<string, number>;
distributions: Record<string, DistributionData>;
};
comparisons: ComparisonResult[];
generatedAt: Date;
}
export class GetComparativeAnalyticsQueryHandler
implements IQueryHandler<GetComparativeAnalyticsQuery, ComparativeAnalytics>
{
constructor(
private readDb: Database,
private cache: ICacheService
) {}
async handle(query: GetComparativeAnalyticsQuery): Promise<ComparativeAnalytics> {
const { userIds, metricTypes, period, groupBy } = query.params;
// 检查缓存
const cacheKey = this.generateCacheKey(query.params);
const cached = await this.cache.get<ComparativeAnalytics>(cacheKey);
if (cached) {
return cached;
}
const { startDate, endDate } = this.getPeriodDates(period);
// 并行执行查询
const [userAnalytics, aggregates, comparisons] = await Promise.all([
this.getUserAnalytics(userIds, metricTypes, startDate, endDate),
this.getAggregateAnalytics(userIds, metricTypes, startDate, endDate, groupBy),
this.getComparisons(userIds, metricTypes, startDate, endDate),
]);
const result: ComparativeAnalytics = {
period: { type: period, startDate, endDate },
users: userAnalytics,
aggregates,
comparisons,
generatedAt: new Date(),
};
// 缓存结果(5 分钟)
await this.cache.set(cacheKey, result, 300);
return result;
}
private async getUserAnalytics(
userIds: string[],
metricTypes: HealthMetricType[],
startDate: Date,
endDate: Date
): Promise<UserAnalytics[]> {
const query = `
WITH user_metrics AS (
SELECT
u.id as user_id,
u.age,
u.gender,
u.region,
hrd.data_type,
AVG(hrd.data_value::numeric) as avg_value,
COUNT(*) as record_count
FROM users u
JOIN health_records hr ON hr.user_id = u.id
JOIN health_record_data hrd ON hrd.record_id = hr.id
WHERE u.id = ANY($1)
AND hrd.data_type = ANY($2)
AND hr.recorded_at BETWEEN $3 AND $4
AND hr.deleted_at IS NULL
GROUP BY u.id, u.age, u.gender, u.region, hrd.data_type
)
SELECT
user_id,
age,
gender,
region,
json_object_agg(data_type, json_build_object(
'average', avg_value,
'recordCount', record_count
)) as metrics
FROM user_metrics
GROUP BY user_id, age, gender, region
`;
return await this.readDb.any(query, [
userIds,
metricTypes,
startDate,
endDate,
]);
}
private async getAggregateAnalytics(
userIds: string[],
metricTypes: HealthMetricType[],
startDate: Date,
endDate: Date,
groupBy?: string
): Promise<ComparativeAnalytics['aggregates']> {
const groupClause = groupBy ? `, u.${groupBy}` : '';
const groupSelect = groupBy ? `, ${groupBy}` : '';
const query = `
SELECT
hrd.data_type${groupSelect},
AVG(hrd.data_value::numeric) as avg_value,
PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY hrd.data_value::numeric) as p25,
PERCENTILE_CONT(0.5) WITHIN GROUP (ORDER BY hrd.data_value::numeric) as p50,
PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY hrd.data_value::numeric) as p75,
STDDEV(hrd.data_value::numeric) as stddev_value
FROM users u
JOIN health_records hr ON hr.user_id = u.id
JOIN health_record_data hrd ON hrd.record_id = hr.id
WHERE u.id = ANY($1)
AND hrd.data_type = ANY($2)
AND hr.recorded_at BETWEEN $3 AND $4
AND hr.deleted_at IS NULL
GROUP BY hrd.data_type${groupClause}
`;
const results = await this.readDb.any(query, [
userIds,
metricTypes,
startDate,
endDate,
]);
// 处理结果
const averages: Record<string, number> = {};
const distributions: Record<string, DistributionData> = {};
for (const row of results) {
const key = groupBy ? `${row.data_type}_${row[groupBy]}` : row.data_type;
averages[key] = parseFloat(row.avg_value);
distributions[key] = {
min: parseFloat(row.p25),
q1: parseFloat(row.p25),
median: parseFloat(row.p50),
q3: parseFloat(row.p75),
max: parseFloat(row.p75),
stdDev: parseFloat(row.stddev_value),
};
}
return { averages, distributions };
}
private async getComparisons(
userIds: string[],
metricTypes: HealthMetricType[],
startDate: Date,
endDate: Date
): Promise<ComparisonResult[]> {
const query = `
WITH user_averages AS (
SELECT
u.id as user_id,
hrd.data_type,
AVG(hrd.data_value::numeric) as user_avg
FROM users u
JOIN health_records hr ON hr.user_id = u.id
JOIN health_record_data hrd ON hrd.record_id = hr.id
WHERE u.id = ANY($1)
AND hrd.data_type = ANY($2)
AND hr.recorded_at BETWEEN $3 AND $4
AND hr.deleted_at IS NULL
GROUP BY u.id, hrd.data_type
),
overall_averages AS (
SELECT
data_type,
AVG(user_avg) as overall_avg
FROM user_averages
GROUP BY data_type
)
SELECT
ua.user_id,
ua.data_type,
ua.user_avg,
oa.overall_avg,
((ua.user_avg - oa.overall_avg) / NULLIF(oa.overall_avg, 0)) * 100 as percent_diff
FROM user_averages ua
JOIN overall_averages oa ON oa.data_type = ua.data_type
ORDER BY data_type, ABS(percent_diff) DESC
`;
return await this.readDb.any(query, [
userIds,
metricTypes,
startDate,
endDate,
]);
}
private generateCacheKey(params: any): string {
return `comparative-analytics:${hashObject(params)}`;
}
private getPeriodDates(period: string): { startDate: Date; endDate: Date } {
// 实现同上
const now = new Date();
const endDate = new Date(now.getFullYear(), now.getMonth(), now.getDate());
let startDate = new Date(now);
startDate.setDate(now.getDate() - 30);
return { startDate, endDate };
}
}
Code collapsed
4. 事件溯源集成
事件存储
code
// infrastructure/event-sourcing/event-store.ts
export interface IEventStore {
appendEvents(aggregateId: string, events: DomainEvent[]): Promise<void>;
getEvents(aggregateId: string): Promise<DomainEvent[]>;
getEventsFromVersion(aggregateId: string, version: number): Promise<DomainEvent[]>;
getAllEvents(): AsyncIterable<DomainEvent>;
}
export class PostgresEventStore implements IEventStore {
constructor(private db: Database) {}
async appendEvents(aggregateId: string, events: DomainEvent[]): Promise<void> {
const client = await this.db.connect();
try {
await client.query('BEGIN');
for (const event of events) {
await client.query(
`INSERT INTO event_store (event_id, aggregate_id, event_type, event_data, version, occurred_at)
VALUES ($1, $2, $3, $4, $5, $6)`,
[
event.eventId,
aggregateId,
event.getEventType(),
JSON.stringify(event),
event.aggregateVersion,
event.occurredAt,
]
);
}
await client.query('COMMIT');
} catch (error) {
await client.query('ROLLBACK');
throw error;
} finally {
client.release();
}
}
async getEvents(aggregateId: string): Promise<DomainEvent[]> {
const result = await this.db.any(
`SELECT event_id, event_type, event_data, version, occurred_at
FROM event_store
WHERE aggregate_id = $1
ORDER BY version ASC`,
[aggregateId]
);
return result.map(row => this.deserializeEvent(row));
}
async getEventsFromVersion(
aggregateId: string,
version: number
): Promise<DomainEvent[]> {
const result = await this.db.any(
`SELECT event_id, event_type, event_data, version, occurred_at
FROM event_store
WHERE aggregate_id = $1 AND version >= $2
ORDER BY version ASC`,
[aggregateId, version]
);
return result.map(row => this.deserializeEvent(row));
}
async *getAllEvents(): AsyncIterable<DomainEvent> {
let lastId = '';
while (true) {
const result = await this.db.any(
`SELECT event_id, event_type, event_data, version, occurred_at
FROM event_store
WHERE event_id > $1
ORDER BY event_id
LIMIT 1000`,
[lastId]
);
if (result.length === 0) {
// 等待新事件
await new Promise(resolve => setTimeout(resolve, 1000));
continue;
}
for (const row of result) {
yield this.deserializeEvent(row);
lastId = row.event_id;
}
}
}
private deserializeEvent(row: any): DomainEvent {
const eventData = JSON.parse(row.event_data);
// 根据事件类型反序列化为具体的事件类
return eventData as DomainEvent;
}
}
Code collapsed
投影器(Projector)
code
// analytics/projectors/health-summary-projector.ts
/**
* 投影器:将事件转换为读模型
*/
export class HealthSummaryProjector {
constructor(
private eventStore: IEventStore,
private readDb: Database
) {}
/**
* 处理事件并更新读模型
*/
async processEvents(): Promise<void> {
for await (const event of this.eventStore.getAllEvents()) {
try {
await this.processEvent(event);
} catch (error) {
console.error(`处理事件 ${event.eventId} 失败:`, error);
}
}
}
/**
* 处理单个事件
*/
private async processEvent(event: DomainEvent): Promise<void> {
switch (event.getEventType()) {
case 'HealthRecordCreated':
await this.handleHealthRecordCreated(event as HealthRecordCreatedEvent);
break;
case 'HealthRecordUpdated':
await this.handleHealthRecordUpdated(event as HealthRecordUpdatedEvent);
break;
case 'CriticalHealthValueDetected':
await this.handleCriticalHealthValue(event as CriticalHealthValueDetectedEvent);
break;
// 其他事件类型...
}
}
/**
* 处理健康记录创建事件
*/
private async handleHealthRecordCreated(
event: HealthRecordCreatedEvent
): Promise<void> {
// 更新用户健康摘要
await this.readDb.none(
`INSERT INTO user_health_summary (user_id, tenant_id, total_records, last_record_at)
VALUES ($1, $2, 1, $3)
ON CONFLICT (user_id)
DO UPDATE SET
total_records = user_health_summary.total_records + 1,
last_record_at = GREATEST(user_health_summary.last_record_at, $3)`,
[event.userId, event.tenantId, event.recordedAt]
);
// 更新健康趋势数据
for (const data of event.data) {
await this.readDb.none(
`INSERT INTO health_trends (user_id, tenant_id, record_type, data_type, date, avg_value, min_value, max_value, record_count)
VALUES ($1, $2, $3, $4, $5, $6, $6, $6, 1)
ON CONFLICT (user_id, record_type, data_type, date)
DO UPDATE SET
avg_value = (health_trends.avg_value * health_trends.record_count + $6) / (health_trends.record_count + 1),
min_value = LEAST(health_trends.min_value, $6),
max_value = GREATEST(health_trends.max_value, $6),
record_count = health_trends.record_count + 1`,
[
event.userId,
event.tenantId,
event.recordType,
data.type,
event.recordedAt,
data.value,
]
);
}
}
/**
* 处理健康记录更新事件
*/
private async handleHealthRecordUpdated(
event: HealthRecordUpdatedEvent
): Promise<void> {
// 实现略
}
/**
* 处理临界健康值检测事件
*/
private async handleCriticalHealthValue(
event: CriticalHealthValueDetectedEvent
): Promise<void> {
// 更新告警摘要
await this.readDb.none(
`INSERT INTO health_alerts_summary (user_id, tenant_id, date, critical_count, warning_count, info_count, alert_types)
VALUES ($1, $2, $3, $4, 0, 0, ARRAY[$5])
ON CONFLICT (user_id, date)
DO UPDATE SET
critical_count = health_alerts_summary.critical_count + $4,
alert_types = array_append(health_alerts_summary.alert_types, $5)`,
[
event.userId,
event.tenantId,
event.detectedAt,
event.severity === 'critical' ? 1 : 0,
event.valueType,
]
);
}
}
Code collapsed
5. 性能优化
缓存策略
code
// analytics/cache/strategies.ts
/**
* 多层缓存策略
*/
export class MultiLevelCache {
private memoryCache: Map<string, { value: any; expiresAt: number }>;
private redis: Redis;
private memoryTTL: number = 5000; // 5 秒
private redisTTL: number = 300; // 5 分钟
constructor() {
this.memoryCache = new Map();
this.redis = new Redis(process.env.REDIS_URL);
}
async get<T>(key: string): Promise<T | null> {
// L1: 内存缓存
const memCached = this.memoryCache.get(key);
if (memCached && memCached.expiresAt > Date.now()) {
return memCached.value as T;
}
// L2: Redis 缓存
const redisCached = await this.redis.get(key);
if (redisCached) {
const value = JSON.parse(redisCached) as T;
// 回填内存缓存
this.memoryCache.set(key, {
value,
expiresAt: Date.now() + this.memoryTTL,
});
return value;
}
return null;
}
async set<T>(key: string, value: T, ttl?: number): Promise<void> {
const effectiveTTL = ttl || this.redisTTL;
// 写入内存缓存
this.memoryCache.set(key, {
value,
expiresAt: Date.now() + this.memoryTTL,
});
// 写入 Redis 缓存
await this.redis.setex(key, effectiveTTL, JSON.stringify(value));
}
async invalidate(pattern: string): Promise<void> {
// 清除内存缓存
for (const key of this.memoryCache.keys()) {
if (key.match(pattern)) {
this.memoryCache.delete(key);
}
}
// 清除 Redis 缓存
const keys = await this.redis.keys(pattern);
if (keys.length > 0) {
await this.redis.del(...keys);
}
}
}
/**
* 缓存装饰器
*/
export function Cacheable(ttl?: number) {
return function (
target: any,
propertyKey: string,
descriptor: PropertyDescriptor
) {
const originalMethod = descriptor.value;
const cache = new MultiLevelCache();
descriptor.value = async function (...args: any[]) {
const cacheKey = `${propertyKey}:${hashArgs(args)}`;
// 尝试从缓存获取
const cached = await cache.get(cacheKey);
if (cached !== null) {
return cached;
}
// 执行原方法
const result = await originalMethod.apply(this, args);
// 写入缓存
await cache.set(cacheKey, result, ttl);
return result;
};
return descriptor;
};
}
Code collapsed
批处理和预聚合
code
// analytics/batch/batch-processor.ts
/**
* 批处理处理器
*/
export class BatchProcessor {
private queue: Map<string, any[]> = new Map();
private processing: boolean = false;
constructor(
private batchSize: number = 100,
private batchTimeout: number = 5000
) {}
/**
* 添加到批处理队列
*/
async add(topic: string, item: any): Promise<void> {
if (!this.queue.has(topic)) {
this.queue.set(topic, []);
}
this.queue.get(topic)!.push(item);
// 检查是否达到批处理大小
if (this.queue.get(topic)!.length >= this.batchSize) {
await this.processBatch(topic);
}
}
/**
* 定期刷新批处理
*/
startPeriodicFlush(): void {
setInterval(async () => {
for (const topic of this.queue.keys()) {
if (this.queue.get(topic)!.length > 0) {
await this.processBatch(topic);
}
}
}, this.batchTimeout);
}
/**
* 处理批次
*/
private async processBatch(topic: string): Promise<void> {
if (this.processing) return;
this.processing = true;
try {
const items = this.queue.get(topic) || [];
if (items.length === 0) return;
// 处理批次
switch (topic) {
case 'health-events':
await this.processHealthEvents(items);
break;
case 'user-events':
await this.processUserEvents(items);
break;
// 其他主题...
}
// 清空队列
this.queue.set(topic, []);
} finally {
this.processing = false;
}
}
/**
* 处理健康事件批次
*/
private async processHealthEvents(events: DomainEvent[]): Promise<void> {
// 批量插入到数据库
await this.db.none(
`INSERT INTO event_store (event_id, aggregate_id, event_type, event_data, version, occurred_at)
SELECT * FROM json_populate_recordset(null::event_store, $1)`,
[JSON.stringify(events.map(e => ({
event_id: e.eventId,
aggregate_id: e.aggregateId,
event_type: e.getEventType(),
event_data: JSON.stringify(e),
version: e.aggregateVersion,
occurred_at: e.occurredAt,
})))]
);
// 批量更新物化视图
await this.updateReadModels(events);
}
/**
* 更新读模型
*/
private async updateReadModels(events: DomainEvent[]): Promise<void> {
// 按用户分组
const userEvents = new Map<string, DomainEvent[]>();
for (const event of events) {
const userId = this.extractUserId(event);
if (!userEvents.has(userId)) {
userEvents.set(userId, []);
}
userEvents.get(userId)!.push(event);
}
// 批量更新每个用户的摘要
for (const [userId, userEvents] of userEvents.entries()) {
await this.updateUserSummary(userId, userEvents);
}
}
private async updateUserSummary(
userId: string,
events: DomainEvent[]
): Promise<void> {
// 实现略
}
private extractUserId(event: DomainEvent): string {
// 从事件中提取用户 ID
return (event as any).userId || '';
}
}
Code collapsed
合规检查清单
数据安全
- 读写访问控制分离
- 敏感数据加密存储
- 审计日志完整记录
- 数据最小化原则
性能优化
- 物化视图定期刷新
- 多层缓存策略
- 批处理优化
- 查询索引优化
数据一致性
- 最终一致性保证
- 事件顺序保证
- 补偿事务机制
- 数据同步监控
参考资料
免责声明:CQRS 增加了系统复杂度。在采用前请评估实际需求,确保团队有能力维护这种架构。