随笔

消息队列

使用场景

消息队列是为了确保可靠的消息传递,不至于出现接口调用失败导致无法处理的场景,同样的,可以解耦合,尤其是目前大数据场景,实时处理是不现实的,必须要单独去消费消息去计算以作对应的动作。所以,如果别人对你说:我需要知道xxx变更的事情,我写个接口你调下吧!对此,你可以义正言辞的告诉他:我提供队列,你从队列读就好了。

特性

  1. 支持订阅模式
  2. 可以持久化消息
  3. 队列消息
  4. 队列调度分配
  5. 支持消息回传

使用

市面上流行的: Rabbitmq, Kafka。此处先了解 RabbitmqKafka 支持自定义起始游标,有需要的可以自行了解下

Rabbitmq

队列模式

alt
默认为单次消费队列, 即读取后将标记为已读取不再下发, 多个消费者时调度分配消息
以下代码使用了:回传确认信息,自动调度分配任务(默认顺序调度)特性

// send.js
var amqp = require('amqplib/callback_api');
var readline = require('readline');

amqp.connect('amqp://localhost:32769', function (err, conn) {
    if (!err) {
        conn.createChannel(function (err, ch) {
            var q = 'hello';
            ch.assertQueue(q, { durable: false });
            ch.prefetch(1);
            sendInput(q, ch);
        });
    } else {
        console.log(err);
    }
});

function sendInput(q, ch) {
    let rl = readline.createInterface({
        input: process.stdin,
        output: process.stdout
    });
    rl.question("你想发送的信息:", function (msg) {
        rl.close();

        ch.sendToQueue(q, new Buffer(msg));
        console.log('已发送:', msg);
        sendInput(q, ch);
    });
}
var amqp = require('amqplib/callback_api');

amqp.connect('amqp://localhost:32769', function (err, conn) {
    if (!err) {
        conn.createChannel(function (err, ch) {
            var q = 'hello';
            ch.assertQueue(q, { durable: false });
            console.log('准备接收信息中:')
            ch.consume(q, function (msg) {
                console.log("收到信息:", msg.content.toString());

                setTimeout(function () {
                    console.log('确认收到');
                    ch.ack(msg);
                }, msg.content.toString().length * 1000);
            }, { noAck: false });
        });
    } else {
        console.log(err);
    }
});

发布/订阅模式

alt
这个是为多消费者使用专门准备的
这个模式会借用交换机(Exchange)来使用
声明交换机:ch.assertExchange('logs', 'fanout', {durable: false})
发布交换机:ch.publish('logs', '', new Buffer('Hello World!'));
交换机的信息附加到队列:ch.assertQueue('', {exclusive: true}); ch.bindQueue(queue_name, 'logs', '');, 空的队列名可以保证新加入的订阅者只会接收到新的消息,而不需要把老的消息再重新消费一遍

参考

https://www.rabbitmq.com/tutorials/tutorial-one-javascript.html
https://www.cnblogs.com/frankyou/p/5283539.html
https://www.cnblogs.com/likehua/p/3999538.html

本文链接:https://note.lilonghe.net/post/message-queue.html

-- EOF --