方式一:直接使用原生Node的方式

请参考上一节《nodejs使用RabbitMQ》

方式二:使用Nestjs内置微服务

https://docs.nestjs.com/microservices/rabbitmq

方式三:

官方建议使用,amqplibamqp-connection-manager,不过也可以采取用其它库,比如:@golevelup/nestjs-rabbitmq,我觉得这个库封装的更加的简单易用,而且大家的使用量也很大。

安装
npm install @golevelup/nestjs-rabbitmq
创建模块和service
nest g module rabbitmq 
nest g service rabbitmq

也可以只创建 service,然后将其注入到任意需要的模块中。

在模块导入和进行初始化配置

这里是在 rabbitmq/rabbitmq.module.ts 文件中编写的,如果您在其它模块导入,也是可以的。

import { Module } from '@nestjs/common';
import { RabbitmqService } from './rabbitmq.service';

import { RabbitMQModule } from "@golevelup/nestjs-rabbitmq";
import { RabbitmqController } from './rabbitmq.controller';

@Module({
  imports: [
    RabbitMQModule.forRoot(RabbitMQModule, {
      exchanges: [
        { name: 'nestExchange1', type: 'direct' },
      ],
      // amqp://账号:密码@Host:端口
      uri: "amqp://laoli:123456@192.168.10.220:5672",
      connectionInitOptions: { wait: false },
      // 消息的序列化和反序列化
      deserializer: (message: Buffer, msg: any) => {
        return JSON.parse(message.toString());
      },
      serializer: (msg: any) => {
        const encodedMessage = JSON.stringify(msg)
        return Buffer.from(encodedMessage);
      },
    }),
  ],
  providers: [RabbitmqService],
  controllers: [RabbitmqController]
})
export class RabbitmqModule { }
编写订阅者和发布处理方法
// 在 rabbitmq\rabbitmq.service.ts 中
import { Injectable } from '@nestjs/common';
import { RabbitSubscribe, AmqpConnection } from '@golevelup/nestjs-rabbitmq';

@Injectable()
export class RabbitmqService {
    constructor(private readonly amqpConnection: AmqpConnection) { }
    // 标记为订阅者,当符合条件的消息到达 RabbitMQ 时
    @RabbitSubscribe({
        exchange: "nestExchange1",
        routingKey: "nestTestKey",
        queue: 'nestTestQueueKey'
    })
    public async pubSubHandler(message: any) {
        console.log(message);
    }

    // 发布消息(供其它提供者或者控制器使用)
    async sendMsg(msg: any) {
        return await this.amqpConnection.publish('nestExchange1', 'nestTestKey', msg)
    }
}

然后我们可以在控制器或者其它服务中使用发布消息的方法

// 在 rabbitmq\rabbitmq.controller.ts 中
import { Controller, Get } from '@nestjs/common';
import { RabbitmqService } from './rabbitmq.service'

@Controller('rabbitmq')
export class RabbitmqController {
    constructor(private readonly rabbitmqService: RabbitmqService) { }
    @Get()
    async xxx() {
        return await this.rabbitmqService.sendMsg({name: '如花', age: 18})
    }
}

当我们发送一个消息后,我们编写的订阅者方法就会自动处理。

附1:异步注入RabbitMQModule

前面写的是同步注册RabbitMQModule,有时候我们需要从环境变量中读取配置进行配置。我们可以使用异步注册:

// 消息队列模块
RabbitMQModule.forRootAsync(RabbitMQModule, {
    imports: [ConfigModule],
    inject: [ConfigService],
    useFactory: (config: ConfigService) => ({
        exchanges: [
            { name: config.get('RABBITMQ_EXCHANGE_NAME'), type: config.get('RABBITMQ_EXCHANGE_TYPE') },
        ],
        uri: config.get('RABBITMQ_URI'),
        connectionInitOptions: { wait: false },
        // 消息的序列化和反序列化
        deserializer: (message: Buffer, msg: any) => {
            return JSON.parse(message.toString());
        },
        serializer: (msg: any) => {
            const encodedMessage = JSON.stringify(msg)
            return Buffer.from(encodedMessage);
        },
    }),
}),

附2:@RabbitRPC

除了前面说的 @RabbitSubscribe 装饰器之外,该包还提供了另一个@RabbitRPC 装饰器。比如,在某些情况下,应用程序可能想要否定确认(或 Nack)消息。为了支持这一点,该库公开了 Nack 对象,当从处理程序返回时,该对象允许开发人员控制消息处理行为。只需返回一个 Nack 实例即可否定确认该消息。

默认情况下,Nacked 的消息不会重新排队。但是,如果您想重新排队消息以便另一个处理程序有机会处理它,可以传递true 给它。

import { Injectable } from '@nestjs/common';
import { Nack, RabbitRPC } from '@golevelup/nestjs-rabbitmq';

@Injectable()
export class RabbitmqService {

    @RabbitRPC({
        exchange: "nestExchange1",
        routingKey: "nestTestKey",
        queue: 'nestTestQueueKey'
    })
    public async pubSubHandler(message: any) {
        const num = Math.random();
        // 模拟失败
        if (num > 0.5) {
            console.log(message, '成功');
            return new Nack();
        } else {
            console.log('失败了,重新排队');
            return new Nack(true);
        }
    }
}

附3:可能遇到的问题

注意:如果您的nest程序中,使用了限流器,比如我使用了 @nestjs/throttler,可能会引起一些错误,比如:res.header is not a function 这个错误。这是因为消息队列不是HTTP请求,但是也被节流器处理了。我这里解决的一个临时方式是扩展节流器自带的ThrottlerGuard 令牌,让其只针对有请求头的HTTP协议进行处理。然后使用我们编写的这个令牌进行注册。

import { Injectable, ExecutionContext } from '@nestjs/common';
import { ThrottlerGuard as BaseThrottlerGuard } from '@nestjs/throttler';

@Injectable()
export class ThrottlerGuard extends BaseThrottlerGuard {

    async handleRequest(context: ExecutionContext, limit: number, ttl: number, throttler: any, getTracker: any, generateKey: any) {
        const request = context.switchToHttp().getRequest();
        if (request.headers) {
            // 如果有请求头,则按HTTP处理
            // 仅在 HTTP 请求时调用父类的 handleRequest
            return await super.handleRequest(context, limit, ttl, throttler, getTracker, generateKey);
        }
        // 对于非 HTTP 请求,直接返回 true
        return true;
    }
}

或者这样(推荐):

import { Injectable, ExecutionContext } from '@nestjs/common';
import { ThrottlerGuard as BaseThrottlerGuard } from '@nestjs/throttler';

@Injectable()
export class ThrottlerGuard extends BaseThrottlerGuard {

    async handleRequest(context: ExecutionContext, limit: number, ttl: number, throttler: any, getTracker: any, generateKey: any) {

        const contextType = context.getType()
        // 对于 RabbitMQ Event 来说,它的contextType是rmq
        if (contextType == 'http') {
            return await super.handleRequest(context, limit, ttl, throttler, getTracker, generateKey);
        }
        return true;
    }
}

或者这样(推荐):

import { Injectable, ExecutionContext } from '@nestjs/common';
import { ThrottlerGuard as BaseThrottlerGuard } from '@nestjs/throttler';
import { isRabbitContext } from '@golevelup/nestjs-rabbitmq';

@Injectable()
export class ThrottlerGuard extends BaseThrottlerGuard {
    async handleRequest(context: ExecutionContext, limit: number, ttl: number, throttler: any, getTracker: any, generateKey: any) {
        // 检验是不是RabbitMQ 上下文
        if (!isRabbitContext(context)) {
            return await super.handleRequest(context, limit, ttl, throttler, getTracker, generateKey);
        }
        return true;
    }
}

其它很多错误的原因,都有可能是和这个类似。