14 KiB
14 KiB
RabbitMQ 配置指南
1. RabbitMQ 模块配置
1.1 RabbitMQ 模块设置
// src/rabbitmq/rabbitmq.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ConfigService } from '@nestjs/config';
@Module({
imports: [
ClientsModule.registerAsync([
{
name: 'RABBITMQ_SERVICE',
useFactory: (configService: ConfigService) => ({
transport: Transport.RMQ,
options: {
urls: [configService.get<string>('RABBITMQ_URL')],
queue: 'main_queue',
queueOptions: {
durable: true,
},
socketOptions: {
heartbeatIntervalInSeconds: 60,
reconnectTimeInSeconds: 5,
},
},
}),
inject: [ConfigService],
},
]),
],
exports: [ClientsModule],
})
export class RabbitMQModule {}
1.2 消息队列配置
// src/config/queue.config.ts
export const QueueConfig = {
// 交换机配置
exchanges: {
USER: 'user.exchange',
PAYMENT: 'payment.exchange',
PUSH: 'push.exchange',
PLATFORM: 'platform.exchange',
},
// 队列配置
queues: {
// 用户相关队列
USER_REGISTRATION: 'user.registration.queue',
USER_LOGIN: 'user.login.queue',
USER_UPDATE: 'user.update.queue',
// AI生成相关队列
AI_GENERATION_TASK: 'ai.generation.task.queue',
AI_GENERATION_COMPLETED: 'ai.generation.completed.queue',
AI_GENERATION_FAILED: 'ai.generation.failed.queue',
// 积分系统队列
CREDIT_REWARD: 'credit.reward.queue',
CREDIT_CONSUME: 'credit.consume.queue',
// 广告系统队列
AD_WATCH_COMPLETED: 'ad.watch.completed.queue',
AD_REWARD_GRANTED: 'ad.reward.granted.queue',
// 平台同步队列
PLATFORM_SYNC_USER: 'platform.sync.user.queue',
PLATFORM_SYNC_DATA: 'platform.sync.data.queue',
// 扩展服务队列 (预留)
EXTENSION_SERVICE: 'extension.service.queue',
CUSTOM_EVENT: 'custom.event.queue',
},
// 路由键配置
routingKeys: {
USER_REGISTERED: 'user.registered',
USER_LOGIN_SUCCESS: 'user.login.success',
USER_UPDATED: 'user.updated',
// AI生成相关路由键
AI_GENERATION_TASK: 'ai.generation.task',
AI_GENERATION_COMPLETED: 'ai.generation.completed',
AI_GENERATION_FAILED: 'ai.generation.failed',
// 积分系统路由键
CREDIT_REWARD: 'credit.reward',
CREDIT_CONSUME: 'credit.consume',
// 广告系统路由键
AD_WATCH_COMPLETED: 'ad.watch.completed',
AD_REWARD_GRANTED: 'ad.reward.granted',
PLATFORM_SYNC: 'platform.sync',
EXTENSION_EVENT: 'extension.event',
},
};
2. 消息生产者服务
// src/services/message-producer.service.ts
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { QueueConfig } from '../config/queue.config';
export interface UserRegistrationMessage {
userId: string;
platform: string;
userInfo: any;
timestamp: Date;
}
export interface ExtensionMessage {
userId: string;
platform: string;
dataType: string;
action: string; // 'create', 'update', 'delete'
data: Record<string, any>;
timestamp: Date;
}
export interface GenerationTaskMessage {
taskId: string;
userId: string;
platform: string;
type: 'image' | 'video';
templateCode?: string;
inputParameters: Record<string, any>;
timestamp: Date;
}
export interface GenerationCompletedMessage {
taskId: string;
userId: string;
platform: string;
outputUrl: string;
thumbnailUrl?: string;
timestamp: Date;
}
export interface GenerationFailedMessage {
taskId: string;
userId: string;
platform: string;
errorMessage: string;
timestamp: Date;
}
@Injectable()
export class MessageProducerService {
constructor(
@Inject('RABBITMQ_SERVICE')
private readonly client: ClientProxy,
) {}
// 发送用户注册消息
async sendUserRegistration(message: UserRegistrationMessage) {
return this.client.emit(QueueConfig.routingKeys.USER_REGISTERED, message);
}
// 发送用户更新消息
async sendUserUpdate(message: UserRegistrationMessage) {
return this.client.emit(QueueConfig.routingKeys.USER_UPDATED, message);
}
// 发送平台同步消息
async sendPlatformSync(data: any) {
return this.client.emit(QueueConfig.routingKeys.PLATFORM_SYNC, data);
}
// 发送扩展服务消息
async sendExtensionEvent(message: ExtensionMessage) {
return this.client.emit(QueueConfig.routingKeys.EXTENSION_EVENT, message);
}
// AI生成相关消息
async sendGenerationTask(message: GenerationTaskMessage) {
return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_TASK, message);
}
async sendGenerationCompleted(message: GenerationCompletedMessage) {
return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_COMPLETED, message);
}
async sendGenerationFailed(message: GenerationFailedMessage) {
return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_FAILED, message);
}
}
3. 消息消费者服务
// src/services/message-consumer.service.ts
import { Injectable } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import { QueueConfig } from '../config/queue.config';
import { UserService } from './user.service';
import { PaymentService } from './payment.service';
import { PushService } from './push.service';
import { PlatformService } from './platform.service';
@Injectable()
export class MessageConsumerService {
constructor(
private readonly userService: UserService,
private readonly platformService: PlatformService,
private readonly extensionService: ExtensionService,
private readonly generationService: GenerationService,
private readonly messageProducer: MessageProducerService,
) {}
// 处理用户注册消息
@EventPattern(QueueConfig.routingKeys.USER_REGISTERED)
async handleUserRegistration(@Payload() data: UserRegistrationMessage) {
try {
console.log('Processing user registration:', data);
// 同步到其他平台
await this.platformService.syncUserRegistration(data);
// 触发扩展服务处理
await this.extensionService.handleUserRegistration(data);
console.log('User registration processed successfully');
} catch (error) {
console.error('Error processing user registration:', error);
// 这里可以实现重试机制或死信队列
}
}
// 处理用户更新消息
@EventPattern(QueueConfig.routingKeys.USER_UPDATED)
async handleUserUpdate(@Payload() data: UserRegistrationMessage) {
try {
console.log('Processing user update:', data);
// 同步用户数据到各平台
await this.platformService.syncUserUpdate(data);
// 触发扩展服务处理
await this.extensionService.handleUserUpdate(data);
console.log('User update processed successfully');
} catch (error) {
console.error('Error processing user update:', error);
}
}
// 处理AI生成任务消息
@EventPattern(QueueConfig.routingKeys.AI_GENERATION_TASK)
async handleGenerationTask(@Payload() data: GenerationTaskMessage) {
try {
console.log('Processing AI generation task:', data);
// 调用AI生成服务处理任务
await this.generationService.processGenerationTask(data.taskId);
console.log('AI generation task processed successfully');
} catch (error) {
console.error('Error processing AI generation task:', error);
// 发送失败消息
await this.messageProducer.sendGenerationFailed({
taskId: data.taskId,
userId: data.userId,
platform: data.platform,
errorMessage: error.message,
timestamp: new Date(),
});
}
}
// 处理AI生成完成消息
@EventPattern(QueueConfig.routingKeys.AI_GENERATION_COMPLETED)
async handleGenerationCompleted(@Payload() data: GenerationCompletedMessage) {
try {
console.log('Processing AI generation completed:', data);
// 可以在这里添加后续处理逻辑,如推送通知等
// await this.pushService.sendGenerationCompleteNotification(data);
console.log('AI generation completed processed successfully');
} catch (error) {
console.error('Error processing AI generation completed:', error);
}
}
// 处理AI生成失败消息
@EventPattern(QueueConfig.routingKeys.AI_GENERATION_FAILED)
async handleGenerationFailed(@Payload() data: GenerationFailedMessage) {
try {
console.log('Processing AI generation failed:', data);
// 可以在这里添加失败处理逻辑,如推送错误通知等
// await this.pushService.sendGenerationFailedNotification(data);
console.log('AI generation failed processed successfully');
} catch (error) {
console.error('Error processing AI generation failed:', error);
}
}
// 处理扩展服务消息
@EventPattern(QueueConfig.routingKeys.EXTENSION_EVENT)
async handleExtensionEvent(@Payload() data: ExtensionMessage) {
try {
console.log('Processing extension event:', data);
// 根据数据类型和操作类型处理
await this.extensionService.handleExtensionEvent(data);
// 同步到相关平台
await this.platformService.syncExtensionData(data);
console.log('Extension event processed successfully');
} catch (error) {
console.error('Error processing extension event:', error);
}
}
// 处理平台同步消息
@EventPattern(QueueConfig.routingKeys.PLATFORM_SYNC)
async handlePlatformSync(@Payload() data: any) {
try {
console.log('Processing platform sync:', data);
await this.platformService.syncData(data);
console.log('Platform sync processed successfully');
} catch (error) {
console.error('Error processing platform sync:', error);
}
}
}
4. 在业务服务中使用消息队列
// src/services/user.service.ts
import { Injectable } from '@nestjs/common';
import { MessageProducerService } from './message-producer.service';
@Injectable()
export class UserService {
constructor(
private readonly messageProducer: MessageProducerService,
) {}
async registerUser(userData: any) {
// 创建用户
const user = await this.createUser(userData);
// 发送用户注册消息到队列
await this.messageProducer.sendUserRegistration({
userId: user.id,
platform: userData.platform,
userInfo: userData,
timestamp: new Date(),
});
return user;
}
private async createUser(userData: any) {
// 实际的用户创建逻辑
return { id: 'user-id', ...userData };
}
}
5. 主应用模块配置
// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { RabbitMQModule } from './rabbitmq/rabbitmq.module';
import { MessageProducerService } from './services/message-producer.service';
import { MessageConsumerService } from './services/message-consumer.service';
@Module({
imports: [
ConfigModule.forRoot({
isGlobal: true,
}),
RabbitMQModule,
],
providers: [
MessageProducerService,
MessageConsumerService,
],
exports: [MessageProducerService],
})
export class AppModule {}
6. 错误处理和重试机制
// src/decorators/retry.decorator.ts
export function Retry(maxRetries: number = 3, delay: number = 1000) {
return function (target: any, propertyName: string, descriptor: PropertyDescriptor) {
const method = descriptor.value;
descriptor.value = async function (...args: any[]) {
let lastError: any;
for (let i = 0; i <= maxRetries; i++) {
try {
return await method.apply(this, args);
} catch (error) {
lastError = error;
if (i < maxRetries) {
console.log(`Retry ${i + 1}/${maxRetries} for ${propertyName}:`, error.message);
await new Promise(resolve => setTimeout(resolve, delay * Math.pow(2, i)));
}
}
}
throw lastError;
};
};
}
7. 队列监控和管理
// src/services/queue-monitor.service.ts
import { Injectable } from '@nestjs/common';
import * as amqp from 'amqplib';
@Injectable()
export class QueueMonitorService {
private connection: amqp.Connection;
async getQueueInfo(queueName: string) {
const channel = await this.connection.createChannel();
const queueInfo = await channel.checkQueue(queueName);
await channel.close();
return {
queue: queueName,
messageCount: queueInfo.messageCount,
consumerCount: queueInfo.consumerCount,
};
}
async purgeQueue(queueName: string) {
const channel = await this.connection.createChannel();
await channel.purgeQueue(queueName);
await channel.close();
}
}
8. Docker Compose 配置
# docker-compose.yml
version: '3.8'
services:
rabbitmq:
image: rabbitmq:3-management-alpine
container_name: rabbitmq
environment:
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: password
RABBITMQ_DEFAULT_VHOST: /
ports:
- "5672:5672" # AMQP端口
- "15672:15672" # 管理界面端口
volumes:
- rabbitmq_data:/var/lib/rabbitmq
healthcheck:
test: ["CMD", "rabbitmq-diagnostics", "ping"]
interval: 30s
timeout: 10s
retries: 5
volumes:
rabbitmq_data:
9. 环境变量配置
# RabbitMQ配置
RABBITMQ_URL=amqp://admin:password@localhost:5672
RABBITMQ_USER=admin
RABBITMQ_PASSWORD=password
RABBITMQ_VHOST=/
RABBITMQ_HEARTBEAT=60
RABBITMQ_RECONNECT_TIME=5
这个配置提供了完整的RabbitMQ集成方案,包括消息生产者、消费者、错误处理和监控功能。