RocketMQ系列(五)廣播與延遲消息

今天要給大家介紹RocketMQ中的兩個功能,一個是“廣播”,這個功能是比較基礎的,幾乎所有的mq產品都是支持這個功能的;另外一個是“延遲消費”,這個應該算是RocketMQ的特色功能之一了吧。接下來,我們就分別看一下這兩個功能。

廣播

廣播是把消息發送給訂閱了這個主題的所有消費者。這個定義很清楚,但是這裏邊的知識點你都掌握了嗎?咱們接着說“廣播”的機會,把消費者這端的內容好好和大家說說。

  • 首先,消費者端的概念中,最大的應該是消費者組,一個消費者組中可以有多個消費者,這些消費者必須訂閱同一個Topic。
  • 那麼什麼算是一個消費者呢?我們在寫消費端程序時,看到了setConsumeThreadMax這個方法,設置消費者的線程數,難道一個線程就是一個消費者?錯!這裏的一個消費者是一個進程,你可以理解為ip+端口。如果在同一個應用中,你實例化了兩個消費者,這兩個消費者配置了相同的消費者組名稱,那麼應用程序啟動時會報錯的,這裏不給大家演示了,感興趣的小夥伴私下里試一下吧。
  • 同一個消息,可以被不同的消費者組同時消費。假設,我有兩個消費者組cg-1和cg-2,這兩個消費者組訂閱了同一個Topic,那麼這個Topic的消息會被cg-1和cg-2同時消費。那這是不是廣播呢?錯!當然不是廣播,廣播是同一個消費者組中的多個消費者都消費這個消息。如果配置的不是廣播,像前幾個章節中的那樣,一個消息只能被一個消費者組消費一次。

好了,說了這麼多,我們實驗一下吧,先把消費者配置成廣播,如下:

@Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer broadcast() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast");
    consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    consumer.subscribe("cluster-topic","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        for (MessageExt msg : msgs) {
            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    return consumer;
}

  • 其中,NameServer,訂閱的Topic都沒有變化。
  • 注意其中consumer.setMessageModel(MessageModel.BROADCASTING);這段代碼,設置消費者為廣播。咱們可以看一下,MessageModel枚舉中只有兩個值,BROADCASTINGCLUSTERING,默認為CLUSTERING

因為要測試廣播,所以我們要啟動多個消費者,還記得什麼是消費者嗎?對了,一個ip+端口算是一個消費者,在這裏我們啟動兩個應用,端口分別是8080和8081。發送端的程序不變,如下:

@Test
public void producerTest() throws Exception {

    for (int i = 0;i<5;i++) {
        MessageExt message = new MessageExt();
        message.setTopic("cluster-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes());
        SendResult sendResult = defaultMQProducer.send(message);
        System.out.println("i=" + i);
        System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());
    }
}

我們執行一下發送端的程序,日誌如下:

i=0
BrokerName:broker-a
i=1
BrokerName:broker-a
i=2
BrokerName:broker-b
i=3
BrokerName:broker-b
i=4
BrokerName:broker-b

再來看看8080端口的應用後台打印出來的日誌:

消費了5個消息,再看看8081的後台打印的日誌,

也消費了5個。兩個消費者同時消費了消息,這就是廣播。有的小夥伴可能會有疑問了,如果不設置廣播,會怎麼樣呢?私下里實驗一下吧,上面的程序中,只要把設置廣播的那段代碼註釋掉就可以了。運行的結果當然是只有一個消費者可以消費消息。

延遲消息

延遲消息是指消費者過了一個指定的時間后,才去消費這個消息。大家想象一個電商中場景,一個訂單超過30分鐘未支付,將自動取消。這個功能怎麼實現呢?一般情況下,都是寫一個定時任務,一分鐘掃描一下超過30分鐘未支付的訂單,如果有則被取消。這種方式由於每分鐘查詢一下訂單,一是時間不精確,二是查庫效率比較低。這個場景使用RocketMQ的延遲消息最合適不過了,我們看看怎麼發送延遲消息吧,發送端代碼如下:

@Test
public void producerTest() throws Exception {

    for (int i = 0;i<1;i++) {
        MessageExt message = new MessageExt();
        message.setTopic("cluster-topic");
        message.setKeys("key-"+i);
        message.setBody(("this is simpleMQ,my NO is "+i+"---"+new Date()).getBytes());
        message.setDelayTimeLevel(2);
        SendResult sendResult = defaultMQProducer.send(message);
        System.out.println("i=" + i);
        System.out.println("BrokerName:" + sendResult.getMessageQueue().getBrokerName());
    }
}
  • 我們只是增加了一句message.setDelayTimeLevel(2);
  • 為了方便,這次我們只發送一個消息。

setDelayTimeLevel是什麼意思,設置的是2,難道是2s后消費嗎?怎麼參數也沒有時間單位呢?如果我要自定義延遲時間怎麼辦?我相信很多小夥伴都有這樣的疑問,我也是帶着這樣的疑問查了很多資料,最後在RocketMQ的Github官網上看到了說明,

  • 在RocketMQ的源碼中,有一個MessageStoreConfig類,這個類中定義了延遲的時間,我們看一下,
// org/apache/rocketmq/store/config/MessageStoreConfig.java
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  • 我們在程序中設置的是2,那麼這個消息將在5s以後被消費。
  • 目前RocketMQ還不支持自定義延遲時間,延遲時間只能從上面的時間中選。如果你非要定義一個時間怎麼辦呢?RocketMQ是開源的,下載代碼,把上面的時間改一下,再打包部署,就OK了。

再看看消費端的代碼,

@Bean(name = "broadcast", initMethod = "start",destroyMethod = "shutdown")
public DefaultMQPushConsumer broadcast() throws MQClientException {
    DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("broadcast");
    consumer.setNamesrvAddr("192.168.73.130:9876;192.168.73.131:9876;192.168.73.132:9876;");
    consumer.subscribe("cluster-topic","*");
    consumer.setMessageModel(MessageModel.BROADCASTING);
    consumer.registerMessageListener((MessageListenerConcurrently) (msgs, context) -> {
        for (MessageExt msg : msgs) {
            Date now = new Date();
            System.out.println("消費時間:"+now);

            Date msgTime = new Date();
            msgTime.setTime(msg.getBornTimestamp());
            System.out.println("消息生成時間:"+msgTime);

            System.out.println(new String(msg.getBody()));
        }
        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
    });
    return consumer;
}
  • 我們還是使用廣播的模式,沒有變。
  • 打印出了當前的時間,這個時間就是消費的時間。
  • 通過msg.getBornTimestamp()方法,獲得了消息的生成時間,也打印出來,看看是不是延遲5s。

啟動兩個消費者8080和8081,發送消息,再看看消費者的後台日誌,

消費時間:Thu Jun 11 14:45:53 CST 2020
消息生成時間:Thu Jun 11 14:45:48 CST 2020
this is simpleMQ,my NO is 0---Thu Jun 11 14:45:47 CST 2020

我們看到消費時間比生成時間晚5s,符合我們的預期。這個功能還是比較實用的,如果能夠自定義延遲時間就更好了。

總結

RocketMQ的這兩個知識點還是比較簡單的,大家要分清楚什麼是消費者組,什麼是消費者,什麼是消費者線程。另外就是延遲消息是不支持自定義的,大家可以在Github上看一下源碼。好了~今天就到這裏了。

本站聲明:網站內容來源於博客園,如有侵權,請聯繫我們,我們將及時處理

【其他文章推薦】

※為什麼 USB CONNECTOR 是電子產業重要的元件?

網頁設計一頭霧水該從何著手呢? 台北網頁設計公司幫您輕鬆架站!

※台北網頁設計公司全省服務真心推薦

※想知道最厲害的網頁設計公司"嚨底家"!

※推薦評價好的iphone維修中心