# RabbitMQ 配置指南 ## 1. RabbitMQ 模块配置 ### 1.1 RabbitMQ 模块设置 ```typescript // src/rabbitmq/rabbitmq.module.ts import { Module } from '@nestjs/common'; import { ClientsModule, Transport } from '@nestjs/microservices'; import { ConfigService } from '@nestjs/config'; @Module({ imports: [ ClientsModule.registerAsync([ { name: 'RABBITMQ_SERVICE', useFactory: (configService: ConfigService) => ({ transport: Transport.RMQ, options: { urls: [configService.get('RABBITMQ_URL')], queue: 'main_queue', queueOptions: { durable: true, }, socketOptions: { heartbeatIntervalInSeconds: 60, reconnectTimeInSeconds: 5, }, }, }), inject: [ConfigService], }, ]), ], exports: [ClientsModule], }) export class RabbitMQModule {} ``` ### 1.2 消息队列配置 ```typescript // src/config/queue.config.ts export const QueueConfig = { // 交换机配置 exchanges: { USER: 'user.exchange', PAYMENT: 'payment.exchange', PUSH: 'push.exchange', PLATFORM: 'platform.exchange', }, // 队列配置 queues: { // 用户相关队列 USER_REGISTRATION: 'user.registration.queue', USER_LOGIN: 'user.login.queue', USER_UPDATE: 'user.update.queue', // AI生成相关队列 AI_GENERATION_TASK: 'ai.generation.task.queue', AI_GENERATION_COMPLETED: 'ai.generation.completed.queue', AI_GENERATION_FAILED: 'ai.generation.failed.queue', // 积分系统队列 CREDIT_REWARD: 'credit.reward.queue', CREDIT_CONSUME: 'credit.consume.queue', // 广告系统队列 AD_WATCH_COMPLETED: 'ad.watch.completed.queue', AD_REWARD_GRANTED: 'ad.reward.granted.queue', // 平台同步队列 PLATFORM_SYNC_USER: 'platform.sync.user.queue', PLATFORM_SYNC_DATA: 'platform.sync.data.queue', // 扩展服务队列 (预留) EXTENSION_SERVICE: 'extension.service.queue', CUSTOM_EVENT: 'custom.event.queue', }, // 路由键配置 routingKeys: { USER_REGISTERED: 'user.registered', USER_LOGIN_SUCCESS: 'user.login.success', USER_UPDATED: 'user.updated', // AI生成相关路由键 AI_GENERATION_TASK: 'ai.generation.task', AI_GENERATION_COMPLETED: 'ai.generation.completed', AI_GENERATION_FAILED: 'ai.generation.failed', // 积分系统路由键 CREDIT_REWARD: 'credit.reward', CREDIT_CONSUME: 'credit.consume', // 广告系统路由键 AD_WATCH_COMPLETED: 'ad.watch.completed', AD_REWARD_GRANTED: 'ad.reward.granted', PLATFORM_SYNC: 'platform.sync', EXTENSION_EVENT: 'extension.event', }, }; ``` ## 2. 消息生产者服务 ```typescript // src/services/message-producer.service.ts import { Injectable, Inject } from '@nestjs/common'; import { ClientProxy } from '@nestjs/microservices'; import { QueueConfig } from '../config/queue.config'; export interface UserRegistrationMessage { userId: string; platform: string; userInfo: any; timestamp: Date; } export interface ExtensionMessage { userId: string; platform: string; dataType: string; action: string; // 'create', 'update', 'delete' data: Record; timestamp: Date; } export interface GenerationTaskMessage { taskId: string; userId: string; platform: string; type: 'image' | 'video'; templateCode?: string; inputParameters: Record; timestamp: Date; } export interface GenerationCompletedMessage { taskId: string; userId: string; platform: string; outputUrl: string; thumbnailUrl?: string; timestamp: Date; } export interface GenerationFailedMessage { taskId: string; userId: string; platform: string; errorMessage: string; timestamp: Date; } @Injectable() export class MessageProducerService { constructor( @Inject('RABBITMQ_SERVICE') private readonly client: ClientProxy, ) {} // 发送用户注册消息 async sendUserRegistration(message: UserRegistrationMessage) { return this.client.emit(QueueConfig.routingKeys.USER_REGISTERED, message); } // 发送用户更新消息 async sendUserUpdate(message: UserRegistrationMessage) { return this.client.emit(QueueConfig.routingKeys.USER_UPDATED, message); } // 发送平台同步消息 async sendPlatformSync(data: any) { return this.client.emit(QueueConfig.routingKeys.PLATFORM_SYNC, data); } // 发送扩展服务消息 async sendExtensionEvent(message: ExtensionMessage) { return this.client.emit(QueueConfig.routingKeys.EXTENSION_EVENT, message); } // AI生成相关消息 async sendGenerationTask(message: GenerationTaskMessage) { return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_TASK, message); } async sendGenerationCompleted(message: GenerationCompletedMessage) { return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_COMPLETED, message); } async sendGenerationFailed(message: GenerationFailedMessage) { return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_FAILED, message); } } ``` ## 3. 消息消费者服务 ```typescript // src/services/message-consumer.service.ts import { Injectable } from '@nestjs/common'; import { EventPattern, Payload } from '@nestjs/microservices'; import { QueueConfig } from '../config/queue.config'; import { UserService } from './user.service'; import { PaymentService } from './payment.service'; import { PushService } from './push.service'; import { PlatformService } from './platform.service'; @Injectable() export class MessageConsumerService { constructor( private readonly userService: UserService, private readonly platformService: PlatformService, private readonly extensionService: ExtensionService, private readonly generationService: GenerationService, private readonly messageProducer: MessageProducerService, ) {} // 处理用户注册消息 @EventPattern(QueueConfig.routingKeys.USER_REGISTERED) async handleUserRegistration(@Payload() data: UserRegistrationMessage) { try { console.log('Processing user registration:', data); // 同步到其他平台 await this.platformService.syncUserRegistration(data); // 触发扩展服务处理 await this.extensionService.handleUserRegistration(data); console.log('User registration processed successfully'); } catch (error) { console.error('Error processing user registration:', error); // 这里可以实现重试机制或死信队列 } } // 处理用户更新消息 @EventPattern(QueueConfig.routingKeys.USER_UPDATED) async handleUserUpdate(@Payload() data: UserRegistrationMessage) { try { console.log('Processing user update:', data); // 同步用户数据到各平台 await this.platformService.syncUserUpdate(data); // 触发扩展服务处理 await this.extensionService.handleUserUpdate(data); console.log('User update processed successfully'); } catch (error) { console.error('Error processing user update:', error); } } // 处理AI生成任务消息 @EventPattern(QueueConfig.routingKeys.AI_GENERATION_TASK) async handleGenerationTask(@Payload() data: GenerationTaskMessage) { try { console.log('Processing AI generation task:', data); // 调用AI生成服务处理任务 await this.generationService.processGenerationTask(data.taskId); console.log('AI generation task processed successfully'); } catch (error) { console.error('Error processing AI generation task:', error); // 发送失败消息 await this.messageProducer.sendGenerationFailed({ taskId: data.taskId, userId: data.userId, platform: data.platform, errorMessage: error.message, timestamp: new Date(), }); } } // 处理AI生成完成消息 @EventPattern(QueueConfig.routingKeys.AI_GENERATION_COMPLETED) async handleGenerationCompleted(@Payload() data: GenerationCompletedMessage) { try { console.log('Processing AI generation completed:', data); // 可以在这里添加后续处理逻辑,如推送通知等 // await this.pushService.sendGenerationCompleteNotification(data); console.log('AI generation completed processed successfully'); } catch (error) { console.error('Error processing AI generation completed:', error); } } // 处理AI生成失败消息 @EventPattern(QueueConfig.routingKeys.AI_GENERATION_FAILED) async handleGenerationFailed(@Payload() data: GenerationFailedMessage) { try { console.log('Processing AI generation failed:', data); // 可以在这里添加失败处理逻辑,如推送错误通知等 // await this.pushService.sendGenerationFailedNotification(data); console.log('AI generation failed processed successfully'); } catch (error) { console.error('Error processing AI generation failed:', error); } } // 处理扩展服务消息 @EventPattern(QueueConfig.routingKeys.EXTENSION_EVENT) async handleExtensionEvent(@Payload() data: ExtensionMessage) { try { console.log('Processing extension event:', data); // 根据数据类型和操作类型处理 await this.extensionService.handleExtensionEvent(data); // 同步到相关平台 await this.platformService.syncExtensionData(data); console.log('Extension event processed successfully'); } catch (error) { console.error('Error processing extension event:', error); } } // 处理平台同步消息 @EventPattern(QueueConfig.routingKeys.PLATFORM_SYNC) async handlePlatformSync(@Payload() data: any) { try { console.log('Processing platform sync:', data); await this.platformService.syncData(data); console.log('Platform sync processed successfully'); } catch (error) { console.error('Error processing platform sync:', error); } } } ``` ## 4. 在业务服务中使用消息队列 ```typescript // src/services/user.service.ts import { Injectable } from '@nestjs/common'; import { MessageProducerService } from './message-producer.service'; @Injectable() export class UserService { constructor( private readonly messageProducer: MessageProducerService, ) {} async registerUser(userData: any) { // 创建用户 const user = await this.createUser(userData); // 发送用户注册消息到队列 await this.messageProducer.sendUserRegistration({ userId: user.id, platform: userData.platform, userInfo: userData, timestamp: new Date(), }); return user; } private async createUser(userData: any) { // 实际的用户创建逻辑 return { id: 'user-id', ...userData }; } } ``` ## 5. 主应用模块配置 ```typescript // src/app.module.ts import { Module } from '@nestjs/common'; import { ConfigModule } from '@nestjs/config'; import { RabbitMQModule } from './rabbitmq/rabbitmq.module'; import { MessageProducerService } from './services/message-producer.service'; import { MessageConsumerService } from './services/message-consumer.service'; @Module({ imports: [ ConfigModule.forRoot({ isGlobal: true, }), RabbitMQModule, ], providers: [ MessageProducerService, MessageConsumerService, ], exports: [MessageProducerService], }) export class AppModule {} ``` ## 6. 错误处理和重试机制 ```typescript // src/decorators/retry.decorator.ts export function Retry(maxRetries: number = 3, delay: number = 1000) { return function (target: any, propertyName: string, descriptor: PropertyDescriptor) { const method = descriptor.value; descriptor.value = async function (...args: any[]) { let lastError: any; for (let i = 0; i <= maxRetries; i++) { try { return await method.apply(this, args); } catch (error) { lastError = error; if (i < maxRetries) { console.log(`Retry ${i + 1}/${maxRetries} for ${propertyName}:`, error.message); await new Promise(resolve => setTimeout(resolve, delay * Math.pow(2, i))); } } } throw lastError; }; }; } ``` ## 7. 队列监控和管理 ```typescript // src/services/queue-monitor.service.ts import { Injectable } from '@nestjs/common'; import * as amqp from 'amqplib'; @Injectable() export class QueueMonitorService { private connection: amqp.Connection; async getQueueInfo(queueName: string) { const channel = await this.connection.createChannel(); const queueInfo = await channel.checkQueue(queueName); await channel.close(); return { queue: queueName, messageCount: queueInfo.messageCount, consumerCount: queueInfo.consumerCount, }; } async purgeQueue(queueName: string) { const channel = await this.connection.createChannel(); await channel.purgeQueue(queueName); await channel.close(); } } ``` ## 8. Docker Compose 配置 ```yaml # docker-compose.yml version: '3.8' services: rabbitmq: image: rabbitmq:3-management-alpine container_name: rabbitmq environment: RABBITMQ_DEFAULT_USER: admin RABBITMQ_DEFAULT_PASS: password RABBITMQ_DEFAULT_VHOST: / ports: - "5672:5672" # AMQP端口 - "15672:15672" # 管理界面端口 volumes: - rabbitmq_data:/var/lib/rabbitmq healthcheck: test: ["CMD", "rabbitmq-diagnostics", "ping"] interval: 30s timeout: 10s retries: 5 volumes: rabbitmq_data: ``` ## 9. 环境变量配置 ```env # RabbitMQ配置 RABBITMQ_URL=amqp://admin:password@localhost:5672 RABBITMQ_USER=admin RABBITMQ_PASSWORD=password RABBITMQ_VHOST=/ RABBITMQ_HEARTBEAT=60 RABBITMQ_RECONNECT_TIME=5 ``` 这个配置提供了完整的RabbitMQ集成方案,包括消息生产者、消费者、错误处理和监控功能。