509 lines
14 KiB
Markdown
509 lines
14 KiB
Markdown
# RabbitMQ 配置指南
|
||
|
||
## 1. RabbitMQ 模块配置
|
||
|
||
### 1.1 RabbitMQ 模块设置
|
||
```typescript
|
||
// src/rabbitmq/rabbitmq.module.ts
|
||
import { Module } from '@nestjs/common';
|
||
import { ClientsModule, Transport } from '@nestjs/microservices';
|
||
import { ConfigService } from '@nestjs/config';
|
||
|
||
@Module({
|
||
imports: [
|
||
ClientsModule.registerAsync([
|
||
{
|
||
name: 'RABBITMQ_SERVICE',
|
||
useFactory: (configService: ConfigService) => ({
|
||
transport: Transport.RMQ,
|
||
options: {
|
||
urls: [configService.get<string>('RABBITMQ_URL')],
|
||
queue: 'main_queue',
|
||
queueOptions: {
|
||
durable: true,
|
||
},
|
||
socketOptions: {
|
||
heartbeatIntervalInSeconds: 60,
|
||
reconnectTimeInSeconds: 5,
|
||
},
|
||
},
|
||
}),
|
||
inject: [ConfigService],
|
||
},
|
||
]),
|
||
],
|
||
exports: [ClientsModule],
|
||
})
|
||
export class RabbitMQModule {}
|
||
```
|
||
|
||
### 1.2 消息队列配置
|
||
```typescript
|
||
// src/config/queue.config.ts
|
||
export const QueueConfig = {
|
||
// 交换机配置
|
||
exchanges: {
|
||
USER: 'user.exchange',
|
||
PAYMENT: 'payment.exchange',
|
||
PUSH: 'push.exchange',
|
||
PLATFORM: 'platform.exchange',
|
||
},
|
||
|
||
// 队列配置
|
||
queues: {
|
||
// 用户相关队列
|
||
USER_REGISTRATION: 'user.registration.queue',
|
||
USER_LOGIN: 'user.login.queue',
|
||
USER_UPDATE: 'user.update.queue',
|
||
|
||
// AI生成相关队列
|
||
AI_GENERATION_TASK: 'ai.generation.task.queue',
|
||
AI_GENERATION_COMPLETED: 'ai.generation.completed.queue',
|
||
AI_GENERATION_FAILED: 'ai.generation.failed.queue',
|
||
|
||
// 积分系统队列
|
||
CREDIT_REWARD: 'credit.reward.queue',
|
||
CREDIT_CONSUME: 'credit.consume.queue',
|
||
|
||
// 广告系统队列
|
||
AD_WATCH_COMPLETED: 'ad.watch.completed.queue',
|
||
AD_REWARD_GRANTED: 'ad.reward.granted.queue',
|
||
|
||
// 平台同步队列
|
||
PLATFORM_SYNC_USER: 'platform.sync.user.queue',
|
||
PLATFORM_SYNC_DATA: 'platform.sync.data.queue',
|
||
|
||
// 扩展服务队列 (预留)
|
||
EXTENSION_SERVICE: 'extension.service.queue',
|
||
CUSTOM_EVENT: 'custom.event.queue',
|
||
},
|
||
|
||
// 路由键配置
|
||
routingKeys: {
|
||
USER_REGISTERED: 'user.registered',
|
||
USER_LOGIN_SUCCESS: 'user.login.success',
|
||
USER_UPDATED: 'user.updated',
|
||
|
||
// AI生成相关路由键
|
||
AI_GENERATION_TASK: 'ai.generation.task',
|
||
AI_GENERATION_COMPLETED: 'ai.generation.completed',
|
||
AI_GENERATION_FAILED: 'ai.generation.failed',
|
||
|
||
// 积分系统路由键
|
||
CREDIT_REWARD: 'credit.reward',
|
||
CREDIT_CONSUME: 'credit.consume',
|
||
|
||
// 广告系统路由键
|
||
AD_WATCH_COMPLETED: 'ad.watch.completed',
|
||
AD_REWARD_GRANTED: 'ad.reward.granted',
|
||
|
||
PLATFORM_SYNC: 'platform.sync',
|
||
EXTENSION_EVENT: 'extension.event',
|
||
},
|
||
};
|
||
```
|
||
|
||
## 2. 消息生产者服务
|
||
|
||
```typescript
|
||
// src/services/message-producer.service.ts
|
||
import { Injectable, Inject } from '@nestjs/common';
|
||
import { ClientProxy } from '@nestjs/microservices';
|
||
import { QueueConfig } from '../config/queue.config';
|
||
|
||
export interface UserRegistrationMessage {
|
||
userId: string;
|
||
platform: string;
|
||
userInfo: any;
|
||
timestamp: Date;
|
||
}
|
||
|
||
export interface ExtensionMessage {
|
||
userId: string;
|
||
platform: string;
|
||
dataType: string;
|
||
action: string; // 'create', 'update', 'delete'
|
||
data: Record<string, any>;
|
||
timestamp: Date;
|
||
}
|
||
|
||
export interface GenerationTaskMessage {
|
||
taskId: string;
|
||
userId: string;
|
||
platform: string;
|
||
type: 'image' | 'video';
|
||
templateCode?: string;
|
||
inputParameters: Record<string, any>;
|
||
timestamp: Date;
|
||
}
|
||
|
||
export interface GenerationCompletedMessage {
|
||
taskId: string;
|
||
userId: string;
|
||
platform: string;
|
||
outputUrl: string;
|
||
thumbnailUrl?: string;
|
||
timestamp: Date;
|
||
}
|
||
|
||
export interface GenerationFailedMessage {
|
||
taskId: string;
|
||
userId: string;
|
||
platform: string;
|
||
errorMessage: string;
|
||
timestamp: Date;
|
||
}
|
||
|
||
@Injectable()
|
||
export class MessageProducerService {
|
||
constructor(
|
||
@Inject('RABBITMQ_SERVICE')
|
||
private readonly client: ClientProxy,
|
||
) {}
|
||
|
||
// 发送用户注册消息
|
||
async sendUserRegistration(message: UserRegistrationMessage) {
|
||
return this.client.emit(QueueConfig.routingKeys.USER_REGISTERED, message);
|
||
}
|
||
|
||
// 发送用户更新消息
|
||
async sendUserUpdate(message: UserRegistrationMessage) {
|
||
return this.client.emit(QueueConfig.routingKeys.USER_UPDATED, message);
|
||
}
|
||
|
||
// 发送平台同步消息
|
||
async sendPlatformSync(data: any) {
|
||
return this.client.emit(QueueConfig.routingKeys.PLATFORM_SYNC, data);
|
||
}
|
||
|
||
// 发送扩展服务消息
|
||
async sendExtensionEvent(message: ExtensionMessage) {
|
||
return this.client.emit(QueueConfig.routingKeys.EXTENSION_EVENT, message);
|
||
}
|
||
|
||
// AI生成相关消息
|
||
async sendGenerationTask(message: GenerationTaskMessage) {
|
||
return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_TASK, message);
|
||
}
|
||
|
||
async sendGenerationCompleted(message: GenerationCompletedMessage) {
|
||
return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_COMPLETED, message);
|
||
}
|
||
|
||
async sendGenerationFailed(message: GenerationFailedMessage) {
|
||
return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_FAILED, message);
|
||
}
|
||
}
|
||
```
|
||
|
||
## 3. 消息消费者服务
|
||
|
||
```typescript
|
||
// src/services/message-consumer.service.ts
|
||
import { Injectable } from '@nestjs/common';
|
||
import { EventPattern, Payload } from '@nestjs/microservices';
|
||
import { QueueConfig } from '../config/queue.config';
|
||
import { UserService } from './user.service';
|
||
import { PaymentService } from './payment.service';
|
||
import { PushService } from './push.service';
|
||
import { PlatformService } from './platform.service';
|
||
|
||
@Injectable()
|
||
export class MessageConsumerService {
|
||
constructor(
|
||
private readonly userService: UserService,
|
||
private readonly platformService: PlatformService,
|
||
private readonly extensionService: ExtensionService,
|
||
private readonly generationService: GenerationService,
|
||
private readonly messageProducer: MessageProducerService,
|
||
) {}
|
||
|
||
// 处理用户注册消息
|
||
@EventPattern(QueueConfig.routingKeys.USER_REGISTERED)
|
||
async handleUserRegistration(@Payload() data: UserRegistrationMessage) {
|
||
try {
|
||
console.log('Processing user registration:', data);
|
||
|
||
// 同步到其他平台
|
||
await this.platformService.syncUserRegistration(data);
|
||
|
||
// 触发扩展服务处理
|
||
await this.extensionService.handleUserRegistration(data);
|
||
|
||
console.log('User registration processed successfully');
|
||
} catch (error) {
|
||
console.error('Error processing user registration:', error);
|
||
// 这里可以实现重试机制或死信队列
|
||
}
|
||
}
|
||
|
||
// 处理用户更新消息
|
||
@EventPattern(QueueConfig.routingKeys.USER_UPDATED)
|
||
async handleUserUpdate(@Payload() data: UserRegistrationMessage) {
|
||
try {
|
||
console.log('Processing user update:', data);
|
||
|
||
// 同步用户数据到各平台
|
||
await this.platformService.syncUserUpdate(data);
|
||
|
||
// 触发扩展服务处理
|
||
await this.extensionService.handleUserUpdate(data);
|
||
|
||
console.log('User update processed successfully');
|
||
} catch (error) {
|
||
console.error('Error processing user update:', error);
|
||
}
|
||
}
|
||
|
||
// 处理AI生成任务消息
|
||
@EventPattern(QueueConfig.routingKeys.AI_GENERATION_TASK)
|
||
async handleGenerationTask(@Payload() data: GenerationTaskMessage) {
|
||
try {
|
||
console.log('Processing AI generation task:', data);
|
||
|
||
// 调用AI生成服务处理任务
|
||
await this.generationService.processGenerationTask(data.taskId);
|
||
|
||
console.log('AI generation task processed successfully');
|
||
} catch (error) {
|
||
console.error('Error processing AI generation task:', error);
|
||
|
||
// 发送失败消息
|
||
await this.messageProducer.sendGenerationFailed({
|
||
taskId: data.taskId,
|
||
userId: data.userId,
|
||
platform: data.platform,
|
||
errorMessage: error.message,
|
||
timestamp: new Date(),
|
||
});
|
||
}
|
||
}
|
||
|
||
// 处理AI生成完成消息
|
||
@EventPattern(QueueConfig.routingKeys.AI_GENERATION_COMPLETED)
|
||
async handleGenerationCompleted(@Payload() data: GenerationCompletedMessage) {
|
||
try {
|
||
console.log('Processing AI generation completed:', data);
|
||
|
||
// 可以在这里添加后续处理逻辑,如推送通知等
|
||
// await this.pushService.sendGenerationCompleteNotification(data);
|
||
|
||
console.log('AI generation completed processed successfully');
|
||
} catch (error) {
|
||
console.error('Error processing AI generation completed:', error);
|
||
}
|
||
}
|
||
|
||
// 处理AI生成失败消息
|
||
@EventPattern(QueueConfig.routingKeys.AI_GENERATION_FAILED)
|
||
async handleGenerationFailed(@Payload() data: GenerationFailedMessage) {
|
||
try {
|
||
console.log('Processing AI generation failed:', data);
|
||
|
||
// 可以在这里添加失败处理逻辑,如推送错误通知等
|
||
// await this.pushService.sendGenerationFailedNotification(data);
|
||
|
||
console.log('AI generation failed processed successfully');
|
||
} catch (error) {
|
||
console.error('Error processing AI generation failed:', error);
|
||
}
|
||
}
|
||
|
||
// 处理扩展服务消息
|
||
@EventPattern(QueueConfig.routingKeys.EXTENSION_EVENT)
|
||
async handleExtensionEvent(@Payload() data: ExtensionMessage) {
|
||
try {
|
||
console.log('Processing extension event:', data);
|
||
|
||
// 根据数据类型和操作类型处理
|
||
await this.extensionService.handleExtensionEvent(data);
|
||
|
||
// 同步到相关平台
|
||
await this.platformService.syncExtensionData(data);
|
||
|
||
console.log('Extension event processed successfully');
|
||
} catch (error) {
|
||
console.error('Error processing extension event:', error);
|
||
}
|
||
}
|
||
|
||
// 处理平台同步消息
|
||
@EventPattern(QueueConfig.routingKeys.PLATFORM_SYNC)
|
||
async handlePlatformSync(@Payload() data: any) {
|
||
try {
|
||
console.log('Processing platform sync:', data);
|
||
|
||
await this.platformService.syncData(data);
|
||
|
||
console.log('Platform sync processed successfully');
|
||
} catch (error) {
|
||
console.error('Error processing platform sync:', error);
|
||
}
|
||
}
|
||
}
|
||
```
|
||
|
||
## 4. 在业务服务中使用消息队列
|
||
|
||
```typescript
|
||
// src/services/user.service.ts
|
||
import { Injectable } from '@nestjs/common';
|
||
import { MessageProducerService } from './message-producer.service';
|
||
|
||
@Injectable()
|
||
export class UserService {
|
||
constructor(
|
||
private readonly messageProducer: MessageProducerService,
|
||
) {}
|
||
|
||
async registerUser(userData: any) {
|
||
// 创建用户
|
||
const user = await this.createUser(userData);
|
||
|
||
// 发送用户注册消息到队列
|
||
await this.messageProducer.sendUserRegistration({
|
||
userId: user.id,
|
||
platform: userData.platform,
|
||
userInfo: userData,
|
||
timestamp: new Date(),
|
||
});
|
||
|
||
return user;
|
||
}
|
||
|
||
private async createUser(userData: any) {
|
||
// 实际的用户创建逻辑
|
||
return { id: 'user-id', ...userData };
|
||
}
|
||
}
|
||
```
|
||
|
||
## 5. 主应用模块配置
|
||
|
||
```typescript
|
||
// src/app.module.ts
|
||
import { Module } from '@nestjs/common';
|
||
import { ConfigModule } from '@nestjs/config';
|
||
import { RabbitMQModule } from './rabbitmq/rabbitmq.module';
|
||
import { MessageProducerService } from './services/message-producer.service';
|
||
import { MessageConsumerService } from './services/message-consumer.service';
|
||
|
||
@Module({
|
||
imports: [
|
||
ConfigModule.forRoot({
|
||
isGlobal: true,
|
||
}),
|
||
RabbitMQModule,
|
||
],
|
||
providers: [
|
||
MessageProducerService,
|
||
MessageConsumerService,
|
||
],
|
||
exports: [MessageProducerService],
|
||
})
|
||
export class AppModule {}
|
||
```
|
||
|
||
## 6. 错误处理和重试机制
|
||
|
||
```typescript
|
||
// src/decorators/retry.decorator.ts
|
||
export function Retry(maxRetries: number = 3, delay: number = 1000) {
|
||
return function (target: any, propertyName: string, descriptor: PropertyDescriptor) {
|
||
const method = descriptor.value;
|
||
|
||
descriptor.value = async function (...args: any[]) {
|
||
let lastError: any;
|
||
|
||
for (let i = 0; i <= maxRetries; i++) {
|
||
try {
|
||
return await method.apply(this, args);
|
||
} catch (error) {
|
||
lastError = error;
|
||
|
||
if (i < maxRetries) {
|
||
console.log(`Retry ${i + 1}/${maxRetries} for ${propertyName}:`, error.message);
|
||
await new Promise(resolve => setTimeout(resolve, delay * Math.pow(2, i)));
|
||
}
|
||
}
|
||
}
|
||
|
||
throw lastError;
|
||
};
|
||
};
|
||
}
|
||
```
|
||
|
||
## 7. 队列监控和管理
|
||
|
||
```typescript
|
||
// src/services/queue-monitor.service.ts
|
||
import { Injectable } from '@nestjs/common';
|
||
import * as amqp from 'amqplib';
|
||
|
||
@Injectable()
|
||
export class QueueMonitorService {
|
||
private connection: amqp.Connection;
|
||
|
||
async getQueueInfo(queueName: string) {
|
||
const channel = await this.connection.createChannel();
|
||
const queueInfo = await channel.checkQueue(queueName);
|
||
await channel.close();
|
||
|
||
return {
|
||
queue: queueName,
|
||
messageCount: queueInfo.messageCount,
|
||
consumerCount: queueInfo.consumerCount,
|
||
};
|
||
}
|
||
|
||
async purgeQueue(queueName: string) {
|
||
const channel = await this.connection.createChannel();
|
||
await channel.purgeQueue(queueName);
|
||
await channel.close();
|
||
}
|
||
}
|
||
```
|
||
|
||
## 8. Docker Compose 配置
|
||
|
||
```yaml
|
||
# docker-compose.yml
|
||
version: '3.8'
|
||
services:
|
||
rabbitmq:
|
||
image: rabbitmq:3-management-alpine
|
||
container_name: rabbitmq
|
||
environment:
|
||
RABBITMQ_DEFAULT_USER: admin
|
||
RABBITMQ_DEFAULT_PASS: password
|
||
RABBITMQ_DEFAULT_VHOST: /
|
||
ports:
|
||
- "5672:5672" # AMQP端口
|
||
- "15672:15672" # 管理界面端口
|
||
volumes:
|
||
- rabbitmq_data:/var/lib/rabbitmq
|
||
healthcheck:
|
||
test: ["CMD", "rabbitmq-diagnostics", "ping"]
|
||
interval: 30s
|
||
timeout: 10s
|
||
retries: 5
|
||
|
||
volumes:
|
||
rabbitmq_data:
|
||
```
|
||
|
||
## 9. 环境变量配置
|
||
|
||
```env
|
||
# RabbitMQ配置
|
||
RABBITMQ_URL=amqp://admin:password@localhost:5672
|
||
RABBITMQ_USER=admin
|
||
RABBITMQ_PASSWORD=password
|
||
RABBITMQ_VHOST=/
|
||
RABBITMQ_HEARTBEAT=60
|
||
RABBITMQ_RECONNECT_TIME=5
|
||
```
|
||
|
||
这个配置提供了完整的RabbitMQ集成方案,包括消息生产者、消费者、错误处理和监控功能。
|