方式一:直接使用原生Node的方式
请参考上一节《nodejs使用RabbitMQ》
方式二:使用Nestjs内置微服务
https://docs.nestjs.com/microservices/rabbitmq
方式三:
官方建议使用,amqplib 和 amqp-connection-manager,不过也可以采取用其它库,比如:@golevelup/nestjs-rabbitmq,我觉得这个库封装的更加的简单易用,而且大家的使用量也很大。
安装
npm install @golevelup/nestjs-rabbitmq
创建模块和service
nest g module rabbitmq
nest g service rabbitmq
也可以只创建 service,然后将其注入到任意需要的模块中。
在模块导入和进行初始化配置
这里是在 rabbitmq/rabbitmq.module.ts 文件中编写的,如果您在其它模块导入,也是可以的。
import { Module } from '@nestjs/common';
import { RabbitmqService } from './rabbitmq.service';
import { RabbitMQModule } from "@golevelup/nestjs-rabbitmq";
import { RabbitmqController } from './rabbitmq.controller';
@Module({
imports: [
RabbitMQModule.forRoot(RabbitMQModule, {
exchanges: [
{ name: 'nestExchange1', type: 'direct' },
],
// amqp://账号:密码@Host:端口
uri: "amqp://laoli:123456@192.168.10.220:5672",
connectionInitOptions: { wait: false },
// 消息的序列化和反序列化
deserializer: (message: Buffer, msg: any) => {
return JSON.parse(message.toString());
},
serializer: (msg: any) => {
const encodedMessage = JSON.stringify(msg)
return Buffer.from(encodedMessage);
},
}),
],
providers: [RabbitmqService],
controllers: [RabbitmqController]
})
export class RabbitmqModule { }
编写订阅者和发布处理方法
// 在 rabbitmq\rabbitmq.service.ts 中
import { Injectable } from '@nestjs/common';
import { RabbitSubscribe, AmqpConnection } from '@golevelup/nestjs-rabbitmq';
@Injectable()
export class RabbitmqService {
constructor(private readonly amqpConnection: AmqpConnection) { }
// 标记为订阅者,当符合条件的消息到达 RabbitMQ 时
@RabbitSubscribe({
exchange: "nestExchange1",
routingKey: "nestTestKey",
queue: 'nestTestQueueKey'
})
public async pubSubHandler(message: any) {
console.log(message);
}
// 发布消息(供其它提供者或者控制器使用)
async sendMsg(msg: any) {
return await this.amqpConnection.publish('nestExchange1', 'nestTestKey', msg)
}
}
然后我们可以在控制器或者其它服务中使用发布消息的方法
// 在 rabbitmq\rabbitmq.controller.ts 中
import { Controller, Get } from '@nestjs/common';
import { RabbitmqService } from './rabbitmq.service'
@Controller('rabbitmq')
export class RabbitmqController {
constructor(private readonly rabbitmqService: RabbitmqService) { }
@Get()
async xxx() {
return await this.rabbitmqService.sendMsg({name: '如花', age: 18})
}
}
当我们发送一个消息后,我们编写的订阅者方法就会自动处理。
附1:异步注入RabbitMQModule
前面写的是同步注册RabbitMQModule,有时候我们需要从环境变量中读取配置进行配置。我们可以使用异步注册:
// 消息队列模块
RabbitMQModule.forRootAsync(RabbitMQModule, {
imports: [ConfigModule],
inject: [ConfigService],
useFactory: (config: ConfigService) => ({
exchanges: [
{ name: config.get('RABBITMQ_EXCHANGE_NAME'), type: config.get('RABBITMQ_EXCHANGE_TYPE') },
],
uri: config.get('RABBITMQ_URI'),
connectionInitOptions: { wait: false },
// 消息的序列化和反序列化
deserializer: (message: Buffer, msg: any) => {
return JSON.parse(message.toString());
},
serializer: (msg: any) => {
const encodedMessage = JSON.stringify(msg)
return Buffer.from(encodedMessage);
},
}),
}),
附2:@RabbitRPC
除了前面说的 @RabbitSubscribe 装饰器之外,该包还提供了另一个@RabbitRPC 装饰器。比如,在某些情况下,应用程序可能想要否定确认(或 Nack)消息。为了支持这一点,该库公开了 Nack 对象,当从处理程序返回时,该对象允许开发人员控制消息处理行为。只需返回一个 Nack 实例即可否定确认该消息。
默认情况下,Nacked 的消息不会重新排队。但是,如果您想重新排队消息以便另一个处理程序有机会处理它,可以传递true 给它。
import { Injectable } from '@nestjs/common';
import { Nack, RabbitRPC } from '@golevelup/nestjs-rabbitmq';
@Injectable()
export class RabbitmqService {
@RabbitRPC({
exchange: "nestExchange1",
routingKey: "nestTestKey",
queue: 'nestTestQueueKey'
})
public async pubSubHandler(message: any) {
const num = Math.random();
// 模拟失败
if (num > 0.5) {
console.log(message, '成功');
return new Nack();
} else {
console.log('失败了,重新排队');
return new Nack(true);
}
}
}
附3:可能遇到的问题
注意:如果您的nest程序中,使用了限流器,比如我使用了 @nestjs/throttler,可能会引起一些错误,比如:res.header is not a function 这个错误。这是因为消息队列不是HTTP请求,但是也被节流器处理了。我这里解决的一个临时方式是扩展节流器自带的ThrottlerGuard 令牌,让其只针对有请求头的HTTP协议进行处理。然后使用我们编写的这个令牌进行注册。
import { Injectable, ExecutionContext } from '@nestjs/common';
import { ThrottlerGuard as BaseThrottlerGuard } from '@nestjs/throttler';
@Injectable()
export class ThrottlerGuard extends BaseThrottlerGuard {
async handleRequest(context: ExecutionContext, limit: number, ttl: number, throttler: any, getTracker: any, generateKey: any) {
const request = context.switchToHttp().getRequest();
if (request.headers) {
// 如果有请求头,则按HTTP处理
// 仅在 HTTP 请求时调用父类的 handleRequest
return await super.handleRequest(context, limit, ttl, throttler, getTracker, generateKey);
}
// 对于非 HTTP 请求,直接返回 true
return true;
}
}
或者这样(推荐):
import { Injectable, ExecutionContext } from '@nestjs/common';
import { ThrottlerGuard as BaseThrottlerGuard } from '@nestjs/throttler';
@Injectable()
export class ThrottlerGuard extends BaseThrottlerGuard {
async handleRequest(context: ExecutionContext, limit: number, ttl: number, throttler: any, getTracker: any, generateKey: any) {
const contextType = context.getType()
// 对于 RabbitMQ Event 来说,它的contextType是rmq
if (contextType == 'http') {
return await super.handleRequest(context, limit, ttl, throttler, getTracker, generateKey);
}
return true;
}
}
或者这样(推荐):
import { Injectable, ExecutionContext } from '@nestjs/common';
import { ThrottlerGuard as BaseThrottlerGuard } from '@nestjs/throttler';
import { isRabbitContext } from '@golevelup/nestjs-rabbitmq';
@Injectable()
export class ThrottlerGuard extends BaseThrottlerGuard {
async handleRequest(context: ExecutionContext, limit: number, ttl: number, throttler: any, getTracker: any, generateKey: any) {
// 检验是不是RabbitMQ 上下文
if (!isRabbitContext(context)) {
return await super.handleRequest(context, limit, ttl, throttler, getTracker, generateKey);
}
return true;
}
}
其它很多错误的原因,都有可能是和这个类似。
💬 评论 0
还没有评论,快来抢沙发吧~