使用场景
消息队列是为了确保可靠的消息传递,不至于出现接口调用失败导致无法处理的场景,同样的,可以解耦合,尤其是目前大数据场景,实时处理是不现实的,必须要单独去消费消息去计算以作对应的动作。所以,如果别人对你说:我需要知道xxx变更的事情,我写个接口你调下吧!对此,你可以义正言辞的告诉他:我提供队列,你从队列读就好了。
特性
- 支持订阅模式
- 可以持久化消息
- 队列消息
- 队列调度分配
- 支持消息回传
使用
市面上流行的: Rabbitmq
, Kafka
。此处先了解 Rabbitmq
,Kafka
支持自定义起始游标,有需要的可以自行了解下
Rabbitmq
队列模式
默认为单次消费队列, 即读取后将标记为已读取不再下发, 多个消费者时调度分配消息
以下代码使用了:回传确认信息,自动调度分配任务(默认顺序调度)特性
// send.js
var amqp = require('amqplib/callback_api');
var readline = require('readline');
amqp.connect('amqp://localhost:32769', function (err, conn) {
if (!err) {
conn.createChannel(function (err, ch) {
var q = 'hello';
ch.assertQueue(q, { durable: false });
ch.prefetch(1);
sendInput(q, ch);
});
} else {
console.log(err);
}
});
function sendInput(q, ch) {
let rl = readline.createInterface({
input: process.stdin,
output: process.stdout
});
rl.question("你想发送的信息:", function (msg) {
rl.close();
ch.sendToQueue(q, new Buffer(msg));
console.log('已发送:', msg);
sendInput(q, ch);
});
}
var amqp = require('amqplib/callback_api');
amqp.connect('amqp://localhost:32769', function (err, conn) {
if (!err) {
conn.createChannel(function (err, ch) {
var q = 'hello';
ch.assertQueue(q, { durable: false });
console.log('准备接收信息中:')
ch.consume(q, function (msg) {
console.log("收到信息:", msg.content.toString());
setTimeout(function () {
console.log('确认收到');
ch.ack(msg);
}, msg.content.toString().length * 1000);
}, { noAck: false });
});
} else {
console.log(err);
}
});
发布/订阅模式
这个是为多消费者使用专门准备的
这个模式会借用交换机(Exchange)来使用
声明交换机:ch.assertExchange('logs', 'fanout', {durable: false})
发布交换机:ch.publish('logs', '', new Buffer('Hello World!'));
交换机的信息附加到队列:ch.assertQueue('', {exclusive: true}); ch.bindQueue(queue_name, 'logs', '');
, 空的队列名可以保证新加入的订阅者只会接收到新的消息,而不需要把老的消息再重新消费一遍
参考
https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html
https://www.cnblogs.com/frankyou/p/5283539.html
https://www.cnblogs.com/likehua/p/3999538.html