bw-mini-app-server/docs/rabbitmq-configuration.md

14 KiB
Raw Blame History

RabbitMQ 配置指南

1. RabbitMQ 模块配置

1.1 RabbitMQ 模块设置

// src/rabbitmq/rabbitmq.module.ts
import { Module } from '@nestjs/common';
import { ClientsModule, Transport } from '@nestjs/microservices';
import { ConfigService } from '@nestjs/config';

@Module({
  imports: [
    ClientsModule.registerAsync([
      {
        name: 'RABBITMQ_SERVICE',
        useFactory: (configService: ConfigService) => ({
          transport: Transport.RMQ,
          options: {
            urls: [configService.get<string>('RABBITMQ_URL')],
            queue: 'main_queue',
            queueOptions: {
              durable: true,
            },
            socketOptions: {
              heartbeatIntervalInSeconds: 60,
              reconnectTimeInSeconds: 5,
            },
          },
        }),
        inject: [ConfigService],
      },
    ]),
  ],
  exports: [ClientsModule],
})
export class RabbitMQModule {}

1.2 消息队列配置

// src/config/queue.config.ts
export const QueueConfig = {
  // 交换机配置
  exchanges: {
    USER: 'user.exchange',
    PAYMENT: 'payment.exchange',
    PUSH: 'push.exchange',
    PLATFORM: 'platform.exchange',
  },
  
  // 队列配置
  queues: {
    // 用户相关队列
    USER_REGISTRATION: 'user.registration.queue',
    USER_LOGIN: 'user.login.queue',
    USER_UPDATE: 'user.update.queue',

    // AI生成相关队列
    AI_GENERATION_TASK: 'ai.generation.task.queue',
    AI_GENERATION_COMPLETED: 'ai.generation.completed.queue',
    AI_GENERATION_FAILED: 'ai.generation.failed.queue',

    // 积分系统队列
    CREDIT_REWARD: 'credit.reward.queue',
    CREDIT_CONSUME: 'credit.consume.queue',

    // 广告系统队列
    AD_WATCH_COMPLETED: 'ad.watch.completed.queue',
    AD_REWARD_GRANTED: 'ad.reward.granted.queue',

    // 平台同步队列
    PLATFORM_SYNC_USER: 'platform.sync.user.queue',
    PLATFORM_SYNC_DATA: 'platform.sync.data.queue',

    // 扩展服务队列 (预留)
    EXTENSION_SERVICE: 'extension.service.queue',
    CUSTOM_EVENT: 'custom.event.queue',
  },
  
  // 路由键配置
  routingKeys: {
    USER_REGISTERED: 'user.registered',
    USER_LOGIN_SUCCESS: 'user.login.success',
    USER_UPDATED: 'user.updated',

    // AI生成相关路由键
    AI_GENERATION_TASK: 'ai.generation.task',
    AI_GENERATION_COMPLETED: 'ai.generation.completed',
    AI_GENERATION_FAILED: 'ai.generation.failed',

    // 积分系统路由键
    CREDIT_REWARD: 'credit.reward',
    CREDIT_CONSUME: 'credit.consume',

    // 广告系统路由键
    AD_WATCH_COMPLETED: 'ad.watch.completed',
    AD_REWARD_GRANTED: 'ad.reward.granted',

    PLATFORM_SYNC: 'platform.sync',
    EXTENSION_EVENT: 'extension.event',
  },
};

2. 消息生产者服务

// src/services/message-producer.service.ts
import { Injectable, Inject } from '@nestjs/common';
import { ClientProxy } from '@nestjs/microservices';
import { QueueConfig } from '../config/queue.config';

export interface UserRegistrationMessage {
  userId: string;
  platform: string;
  userInfo: any;
  timestamp: Date;
}

export interface ExtensionMessage {
  userId: string;
  platform: string;
  dataType: string;
  action: string; // 'create', 'update', 'delete'
  data: Record<string, any>;
  timestamp: Date;
}

export interface GenerationTaskMessage {
  taskId: string;
  userId: string;
  platform: string;
  type: 'image' | 'video';
  templateCode?: string;
  inputParameters: Record<string, any>;
  timestamp: Date;
}

export interface GenerationCompletedMessage {
  taskId: string;
  userId: string;
  platform: string;
  outputUrl: string;
  thumbnailUrl?: string;
  timestamp: Date;
}

export interface GenerationFailedMessage {
  taskId: string;
  userId: string;
  platform: string;
  errorMessage: string;
  timestamp: Date;
}

@Injectable()
export class MessageProducerService {
  constructor(
    @Inject('RABBITMQ_SERVICE')
    private readonly client: ClientProxy,
  ) {}

  // 发送用户注册消息
  async sendUserRegistration(message: UserRegistrationMessage) {
    return this.client.emit(QueueConfig.routingKeys.USER_REGISTERED, message);
  }

  // 发送用户更新消息
  async sendUserUpdate(message: UserRegistrationMessage) {
    return this.client.emit(QueueConfig.routingKeys.USER_UPDATED, message);
  }

  // 发送平台同步消息
  async sendPlatformSync(data: any) {
    return this.client.emit(QueueConfig.routingKeys.PLATFORM_SYNC, data);
  }

  // 发送扩展服务消息
  async sendExtensionEvent(message: ExtensionMessage) {
    return this.client.emit(QueueConfig.routingKeys.EXTENSION_EVENT, message);
  }

  // AI生成相关消息
  async sendGenerationTask(message: GenerationTaskMessage) {
    return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_TASK, message);
  }

  async sendGenerationCompleted(message: GenerationCompletedMessage) {
    return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_COMPLETED, message);
  }

  async sendGenerationFailed(message: GenerationFailedMessage) {
    return this.client.emit(QueueConfig.routingKeys.AI_GENERATION_FAILED, message);
  }
}

3. 消息消费者服务

// src/services/message-consumer.service.ts
import { Injectable } from '@nestjs/common';
import { EventPattern, Payload } from '@nestjs/microservices';
import { QueueConfig } from '../config/queue.config';
import { UserService } from './user.service';
import { PaymentService } from './payment.service';
import { PushService } from './push.service';
import { PlatformService } from './platform.service';

@Injectable()
export class MessageConsumerService {
  constructor(
    private readonly userService: UserService,
    private readonly platformService: PlatformService,
    private readonly extensionService: ExtensionService,
    private readonly generationService: GenerationService,
    private readonly messageProducer: MessageProducerService,
  ) {}

  // 处理用户注册消息
  @EventPattern(QueueConfig.routingKeys.USER_REGISTERED)
  async handleUserRegistration(@Payload() data: UserRegistrationMessage) {
    try {
      console.log('Processing user registration:', data);

      // 同步到其他平台
      await this.platformService.syncUserRegistration(data);

      // 触发扩展服务处理
      await this.extensionService.handleUserRegistration(data);

      console.log('User registration processed successfully');
    } catch (error) {
      console.error('Error processing user registration:', error);
      // 这里可以实现重试机制或死信队列
    }
  }

  // 处理用户更新消息
  @EventPattern(QueueConfig.routingKeys.USER_UPDATED)
  async handleUserUpdate(@Payload() data: UserRegistrationMessage) {
    try {
      console.log('Processing user update:', data);

      // 同步用户数据到各平台
      await this.platformService.syncUserUpdate(data);

      // 触发扩展服务处理
      await this.extensionService.handleUserUpdate(data);

      console.log('User update processed successfully');
    } catch (error) {
      console.error('Error processing user update:', error);
    }
  }

  // 处理AI生成任务消息
  @EventPattern(QueueConfig.routingKeys.AI_GENERATION_TASK)
  async handleGenerationTask(@Payload() data: GenerationTaskMessage) {
    try {
      console.log('Processing AI generation task:', data);

      // 调用AI生成服务处理任务
      await this.generationService.processGenerationTask(data.taskId);

      console.log('AI generation task processed successfully');
    } catch (error) {
      console.error('Error processing AI generation task:', error);

      // 发送失败消息
      await this.messageProducer.sendGenerationFailed({
        taskId: data.taskId,
        userId: data.userId,
        platform: data.platform,
        errorMessage: error.message,
        timestamp: new Date(),
      });
    }
  }

  // 处理AI生成完成消息
  @EventPattern(QueueConfig.routingKeys.AI_GENERATION_COMPLETED)
  async handleGenerationCompleted(@Payload() data: GenerationCompletedMessage) {
    try {
      console.log('Processing AI generation completed:', data);

      // 可以在这里添加后续处理逻辑,如推送通知等
      // await this.pushService.sendGenerationCompleteNotification(data);

      console.log('AI generation completed processed successfully');
    } catch (error) {
      console.error('Error processing AI generation completed:', error);
    }
  }

  // 处理AI生成失败消息
  @EventPattern(QueueConfig.routingKeys.AI_GENERATION_FAILED)
  async handleGenerationFailed(@Payload() data: GenerationFailedMessage) {
    try {
      console.log('Processing AI generation failed:', data);

      // 可以在这里添加失败处理逻辑,如推送错误通知等
      // await this.pushService.sendGenerationFailedNotification(data);

      console.log('AI generation failed processed successfully');
    } catch (error) {
      console.error('Error processing AI generation failed:', error);
    }
  }

  // 处理扩展服务消息
  @EventPattern(QueueConfig.routingKeys.EXTENSION_EVENT)
  async handleExtensionEvent(@Payload() data: ExtensionMessage) {
    try {
      console.log('Processing extension event:', data);

      // 根据数据类型和操作类型处理
      await this.extensionService.handleExtensionEvent(data);

      // 同步到相关平台
      await this.platformService.syncExtensionData(data);

      console.log('Extension event processed successfully');
    } catch (error) {
      console.error('Error processing extension event:', error);
    }
  }

  // 处理平台同步消息
  @EventPattern(QueueConfig.routingKeys.PLATFORM_SYNC)
  async handlePlatformSync(@Payload() data: any) {
    try {
      console.log('Processing platform sync:', data);
      
      await this.platformService.syncData(data);
      
      console.log('Platform sync processed successfully');
    } catch (error) {
      console.error('Error processing platform sync:', error);
    }
  }
}

4. 在业务服务中使用消息队列

// src/services/user.service.ts
import { Injectable } from '@nestjs/common';
import { MessageProducerService } from './message-producer.service';

@Injectable()
export class UserService {
  constructor(
    private readonly messageProducer: MessageProducerService,
  ) {}

  async registerUser(userData: any) {
    // 创建用户
    const user = await this.createUser(userData);
    
    // 发送用户注册消息到队列
    await this.messageProducer.sendUserRegistration({
      userId: user.id,
      platform: userData.platform,
      userInfo: userData,
      timestamp: new Date(),
    });
    
    return user;
  }

  private async createUser(userData: any) {
    // 实际的用户创建逻辑
    return { id: 'user-id', ...userData };
  }
}

5. 主应用模块配置

// src/app.module.ts
import { Module } from '@nestjs/common';
import { ConfigModule } from '@nestjs/config';
import { RabbitMQModule } from './rabbitmq/rabbitmq.module';
import { MessageProducerService } from './services/message-producer.service';
import { MessageConsumerService } from './services/message-consumer.service';

@Module({
  imports: [
    ConfigModule.forRoot({
      isGlobal: true,
    }),
    RabbitMQModule,
  ],
  providers: [
    MessageProducerService,
    MessageConsumerService,
  ],
  exports: [MessageProducerService],
})
export class AppModule {}

6. 错误处理和重试机制

// src/decorators/retry.decorator.ts
export function Retry(maxRetries: number = 3, delay: number = 1000) {
  return function (target: any, propertyName: string, descriptor: PropertyDescriptor) {
    const method = descriptor.value;

    descriptor.value = async function (...args: any[]) {
      let lastError: any;
      
      for (let i = 0; i <= maxRetries; i++) {
        try {
          return await method.apply(this, args);
        } catch (error) {
          lastError = error;
          
          if (i < maxRetries) {
            console.log(`Retry ${i + 1}/${maxRetries} for ${propertyName}:`, error.message);
            await new Promise(resolve => setTimeout(resolve, delay * Math.pow(2, i)));
          }
        }
      }
      
      throw lastError;
    };
  };
}

7. 队列监控和管理

// src/services/queue-monitor.service.ts
import { Injectable } from '@nestjs/common';
import * as amqp from 'amqplib';

@Injectable()
export class QueueMonitorService {
  private connection: amqp.Connection;

  async getQueueInfo(queueName: string) {
    const channel = await this.connection.createChannel();
    const queueInfo = await channel.checkQueue(queueName);
    await channel.close();
    
    return {
      queue: queueName,
      messageCount: queueInfo.messageCount,
      consumerCount: queueInfo.consumerCount,
    };
  }

  async purgeQueue(queueName: string) {
    const channel = await this.connection.createChannel();
    await channel.purgeQueue(queueName);
    await channel.close();
  }
}

8. Docker Compose 配置

# docker-compose.yml
version: '3.8'
services:
  rabbitmq:
    image: rabbitmq:3-management-alpine
    container_name: rabbitmq
    environment:
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: password
      RABBITMQ_DEFAULT_VHOST: /
    ports:
      - "5672:5672"   # AMQP端口
      - "15672:15672" # 管理界面端口
    volumes:
      - rabbitmq_data:/var/lib/rabbitmq
    healthcheck:
      test: ["CMD", "rabbitmq-diagnostics", "ping"]
      interval: 30s
      timeout: 10s
      retries: 5

volumes:
  rabbitmq_data:

9. 环境变量配置

# RabbitMQ配置
RABBITMQ_URL=amqp://admin:password@localhost:5672
RABBITMQ_USER=admin
RABBITMQ_PASSWORD=password
RABBITMQ_VHOST=/
RABBITMQ_HEARTBEAT=60
RABBITMQ_RECONNECT_TIME=5

这个配置提供了完整的RabbitMQ集成方案包括消息生产者、消费者、错误处理和监控功能。