安装依赖库
npm install amqplib
简单队列使用
该结构比较简单,主要参与者是:一个生产者,一个存储消息的队列,一个消费者
生产者代码
import amqplib from 'amqplib'
// 队列的名字
const queueName = 'testQueue'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 通过连接对象创建一个 channel(通道)
const ch1 = await conn.createChannel();
// 声明一个名为 testQueue 的队列,如果这个队列不存在则会创建它。durable: false 表示队列不是持久化的。
await ch1.assertQueue(queueName, { durable: false });
const msg = { name: '如花', age: 19 }
ch1.sendToQueue(queueName, Buffer.from(JSON.stringify(msg)));
console.log('发送完毕');
消费者代码
import amqplib from 'amqplib'
// 队列的名字
const queueName = 'testQueue'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 通过连接对象创建一个 channel(通道)
const ch1 = await conn.createChannel();
//
ch1.consume(
queueName,
(msg) => {
console.log('接收到消息:', msg.content.toString());
},
// noAck: true 表示不需要确认消息
{ noAck: true }
);
确认机制
发布确认
发布确认是一种消息确认机制,允许生产者在发送消息后等待消息被 RabbitMQ 服务器确认,从而确保消息已经成功发送并存储在服务器中。
// 生产者
import amqplib from 'amqplib'
// 队列的名字
const queueName = 'testQueue2'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 创建确认通道
const ch1 = await conn.createConfirmChannel();
// 声明一个队列
await ch1.assertQueue(queueName);
const msg = { name: '如花', age: 24 }
// 使用回调函数来确认消息的发送是否成功
ch1.sendToQueue(queueName, Buffer.from(JSON.stringify(msg)), {}, (err, ok) => {
if (err) {
console.error('发送失败', err);
} else {
console.log('发送已得到确认!');
}
});
除了使用回调函数的方式来确认消息是否成功,还可以使批量确认的形式处理:
import amqplib from 'amqplib'
// 队列的名字
const queueName = 'testQueue2'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 创建确认通道
const ch1 = await conn.createConfirmChannel();
// 声明一个队列
await ch1.assertQueue(queueName);
const msg = { name: '如花', age: 24 }
try {
ch1.sendToQueue(queueName, Buffer.from(JSON.stringify(msg)));
// waitForConfirms() 方法会等待所有未确认的消息都被 RabbitMQ 确认后才会返回。这种方式适合需要确保所有消息都已发送成功后再执行后续操作的场景。
await ch1.waitForConfirms();
console.log('发送完毕');
} catch (error) {
console.log('发送失败');
}
接收确认
当消费者从队列中获取到消息后,可以选择手动确认(acknowledge)消息处理成功。如果消费者成功处理消息并确认,RabbitMQ 将会从队列中删除该消息。如果消费者在处理消息时发生错误或崩溃,消息不会被确认,RabbitMQ 将会将消息重新放回队列,等待下一个消费者处理。
import amqplib from 'amqplib'
// 队列的名字
const queueName = 'testQueue2'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 通过连接对象创建一个 channel(通道)
const ch1 = await conn.createChannel();
// 设置每个消费者一次只处理一条消息(这个不是必须的,如果有多个消费者,最好设置一次预取出1个消息)
ch1.prefetch(1);
// 消费者通过 .consume() 方法订阅了一个特定的队列,一旦消费者成功连接到队列并订阅了,它会开始接收队列中的消息
ch1.consume(queueName,
function (msg) {
const randomNum = Math.random()
const xx = JSON.parse(msg.content.toString())
try {
if (randomNum > 0.5) {
throw new Error('模拟遇到了失败')
}
console.log('正常处理完:', xx);
// 手动确认消息处理成功
ch1.ack(msg);
} catch (error) {
console.log(xx + '遇到了错误,放回去了');
// 拒绝消息,使其重新放回队列
ch1.nack(msg);
}
},
// noAck: false 表示手动确认模式,默认就是false
{ noAck: false }
);
工作队列(Work Queues)
工作队列(Work Queues),有时也被称为任务队列(Task Queues),是一种用于分布式系统中的模式,通常用于在多个工作者(Worker)之间分发任务。工作队列的主要目的是解耦任务的生产者和消费者,并提供负载均衡和任务容错能力。
我们前面讲的“简单队列”是一个生产者,一个消费者,一个队列。如果我们稍微修改一下消费者代码(主要是设置预取出数量),然后启动多个消费者程序,就成了工作队列。
生产者代码
import amqplib from 'amqplib'
// 队列的名字
const queueName = 'testQueue2'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 创建确认通道
const ch1 = await conn.createConfirmChannel();
// 声明一个队列
await ch1.assertQueue(queueName);
setInterval(() => {
const date = parseInt(Date.now() / 1000);
const msg = { date }
// 每秒向队列中发送一个时间戳数据
ch1.sendToQueue(queueName, Buffer.from(JSON.stringify(msg)), {}, (err, ok) => {
if (err) {
console.error('发送失败', err);
} else {
console.log('发送已得到确认!', date);
}
});
}, 1000);
可以看出,基本上和之前没有什么变化。
消费者代码
import amqplib from 'amqplib'
// 队列的名字
const queueName = 'testQueue2'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 通过连接对象创建一个 channel(通道)
const ch1 = await conn.createChannel();
// 设置每个消费者一次只处理一条消息
ch1.prefetch(1);
// 消费者通过 .consume() 方法订阅了一个特定的队列,一旦消费者成功连接到队列并订阅了,它会开始接收队列中的消息
ch1.consume(queueName,
function (msg) {
// 模拟比较耗时的操作
setTimeout(() => {
const randomNum = Math.random()
const xx = JSON.parse(msg.content.toString())
try {
if (randomNum > 0.5) {
throw new Error('模拟遇到了失败')
}
console.log('正常处理完:', xx);
// 手动确认消息处理成功
ch1.ack(msg);
} catch (error) {
console.log(`${xx.date}遇到了错误,放回去了`);
// 拒绝消息,使其重新放回队列
ch1.nack(msg);
}
}, 2000);
},
// noAck: false 表示手动确认模式,默认就是false
{ noAck: false }
);
我们主要是增加了 ch1.prefetch(1) 设置消费者一次预取出一个消息。我们这里采取了消息确认模式。如果我们启动多个消费者程序,我们的结构就变成了 工作队列。
发布/订阅 模式(Publish/Subscribe)
前面我们谈到的工作队列是一个生产者、一个队列、多个消费者的模式。其实对于有多个消费者的情况,我们还可以有这样一种模式:
- 每一个消费者都有自己的队列。
- 每个队列会根据配置的规则绑定到交换机(
exChange)上。 - 生产者将消息发送给交换机(
exChange),而不是直接发送到队列中。 - 生产者的消息,经过交换机规则,被分配到不同的队列中
生产者代码
import amqplib from 'amqplib'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 创建确认通道
const ch1 = await conn.createConfirmChannel();
// 声明一个fanout类型的交换机,交换机的类型有
// 参数1是交换机的名称,参数2是交换机的类型,类型有:direct 、 topic 、 headers 和 fanout 四种。
await ch1.assertExchange('myTestExChange', 'fanout', { durable: false });
setInterval(() => {
const date = parseInt(Date.now() / 1000);
const msg = { date }
// 每秒向交换机中发送一个时间戳数据
ch1.publish('myTestExChange', '', Buffer.from(JSON.stringify(msg)), {}, (err, ok) => {
if (err) {
console.error('发送失败', err);
} else {
console.log('发送已得到确认!', date);
}
})
}, 1000);
消费者代码
import amqplib from 'amqplib'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 通过连接对象创建一个 channel(通道)
const ch1 = await conn.createChannel();
// 声明一个fanout类型的交换器
await ch1.assertExchange('myTestExChange', 'fanout', { durable: false });
// 声明一个独占队列(exclusive为true)
// RabbitMQ会为我们生成一个随机的队列名称
const q = await ch1.assertQueue('', { exclusive: true });
// 将队列绑定到交换器
ch1.bindQueue(q.queue, 'myTestExChange', '');
// 设置每个消费者一次只处理一条消息(非必须)
ch1.prefetch(1);
// 消费者通过 .consume() 方法订阅了一个特定的队列,一旦消费者成功连接到队列并订阅了,它会开始接收队列中的消息
ch1.consume(q.queue,
function (msg) {
// 模拟比较耗时的操作
setTimeout(() => {
const randomNum = Math.random()
const xx = JSON.parse(msg.content.toString())
try {
if (randomNum > 0.5) {
throw new Error('模拟遇到了失败')
}
console.log('正常处理完:', xx);
// 手动确认消息处理成功
ch1.ack(msg);
} catch (error) {
console.log(`${xx.date}遇到了错误,放回去了`);
// 拒绝消息,使其重新放回队列
ch1.nack(msg);
}
}, 1000);
},
// noAck: false 表示手动确认模式,默认就是false
{ noAck: false }
);
fanout 类型交换机的消息分发具有以下特点:
- 广播消息:
fanout类型的交换机会将消息发送给所有与之绑定的队列,不论队列的数量和绑定顺序。 - 无视路由键:
fanout类型的交换机会忽略消息的路由键,因此不需要指定路由键,消息会被发送到所有绑定的队列。 - 每个消费者都接收到相同消息:每个消费者都会独立地从交换机接收到完全相同的消息副本
路由模式(Routing)
路由模式是在“发布/订阅”模式的基础上,增加了routeing key,可以根据路由键进行精准的将消息分发到指定队列。关键点有:
- 设置交换机类型为
direct类型 - 生产者发送消息的时候,指定
routeing key - 消费者在绑定队列的时候指定
routeing key - 如果多个消费者绑定相同的
routeing key,会同时接收到发送到该路由键的所有消息的副本
整体代码除了修改交换机类型为 direct 以及绑定 routeing key之外,和发布订阅模式基本一样。
生产者代码
import amqplib from 'amqplib'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
// 创建确认通道
const ch1 = await conn.createConfirmChannel();
// 声明一个fanout类型的交换机,交换机的类型有
// 这里设置 direct 类型
await ch1.assertExchange('myTestExChange', 'direct', { durable: false });
setInterval(() => {
const date = parseInt(Date.now() / 1000);
const msg = {}
const random = Math.random()
// 随机设置发送消息的routing key和生成发送的数据
let routingKey = ''
if (random > 0.7) {
routingKey = 'A1';
msg.date = 'A' + date
} else if (random > 0.3) {
routingKey = 'B1'
msg.date = 'B' + date
} else {
routingKey = 'C1'
msg.date = 'C' + date
}
// 每秒向交换机中发送一个时间戳数据 指定routingKey
ch1.publish('myTestExChange', routingKey, Buffer.from(JSON.stringify(msg)), {}, (err, ok) => {
if (err) {
console.error('发送失败', err);
} else {
console.log('发送已得到确认!', date);
}
})
}, 1000);
消费者代码
import amqplib from 'amqplib'
// 创建连接
const conn = await amqplib.connect({
protocol: 'amqp',
hostname: '192.168.10.220',
port: 5672,
username: 'laoli',
password: '123456',
vhost: '/'
});
const ch1 = await conn.createChannel();
// 声明一个direct类型的交换器
await ch1.assertExchange('myTestExChange', 'direct', { durable: false });
const q = await ch1.assertQueue('', { exclusive: true });
// 将队列绑定到交换器 这里的C1 就是routing key
ch1.bindQueue(q.queue, 'myTestExChange', 'C1');
ch1.prefetch(1);
ch1.consume(q.queue,
function (msg) {
// 模拟比较耗时的操作
setTimeout(() => {
const randomNum = Math.random()
const xx = JSON.parse(msg.content.toString())
try {
if (randomNum > 0.5) {
throw new Error('模拟遇到了失败')
}
console.log('正常处理完:', xx);
// 手动确认消息处理成功
ch1.ack(msg);
} catch (error) {
console.log(`${xx.date}遇到了错误,放回去了`);
// 拒绝消息,使其重新放回队列
ch1.nack(msg);
}
}, 1000);
},
// noAck: false 表示手动确认模式,默认就是false
{ noAck: false }
);
Topics 模式
(略)
RPC 模式
(略)
参考:
https://www.rabbitmq.com/tutorials/tutorial-one-javascript
https://amqp-node.github.io/amqplib/channel_api.html
https://github.com/coolliyong/node_rabbitMQ_mqtutorial?tab=readme-ov-file
💬 评论 0
还没有评论,快来抢沙发吧~