文档章节

RocketMQ 消息订阅Subscribe—— Push & Pull 模式

秋风醉了
 秋风醉了
发布于 2017/06/14 15:35
字数 1277
阅读 1152
收藏 0

RocketMQ 消息订阅Subscribe—— Push & Pull 模式

 

RocketMQ消息订阅的两种模式

RocketMQ消息订阅有两种模式,一种是Push模式(MQPushConsumer),即MQServer主动向消费端推送;另外一种是Pull模式(MQPullConsumer),即消费端在需要时,主动到MQServer拉取。

但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息。

区别是:

Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

 

Push && Pull 的使用场景

慢消费

慢消费无疑是Push模式最大的致命伤,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。

反观Pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。

 

消息延迟与忙等

这是Pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次Pull取到消息了还可以继续去Pull,如果没有Pull取到则需要等待一段时间重新Pull。但等待多久就很难判定了。你可能会说,我可以有xx动态Pull拉取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,然后半个小时没有新消息产生,可能你的算法算出下次最有可能到来的时间点是31分钟之后,或者60分钟之后,结果下条消息10分钟后到了,是不是很让人沮丧?

当然也不是说延迟就没有解决方案了,业界较成熟的做法是从短时间开始(不会对broker有太大负担),然后指数级增长等待。比如开始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到来,然后再回到5ms。

即使这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的。

在RocketMQ里,有一种优化的做法——长轮询 Pull ,来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好。

长轮询 Pull 的原理类似于 :http://www.ibm.com/developerworks/cn/web/wa-lo-comet/

 

MQPullConsumer 的使用

PullConsumer.java

package com.rocketmq.demo.pull;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * Created by xinxingegeya on 2017/6/14.
 */
public class PullConsumer {

    private static final Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_consumer_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("PullConsumerTopicTest");
        for (MessageQueue mq : mqs) {
            System.err.println("Consume from the queue: " + mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt m : messageExtList) {
                                System.out.println(new String(m.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offsetTable.put(mq, offset);
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offsetTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
}

Producer.java

package com.rocketmq.demo.pull;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * Created by xinxingegeya on 2017/6/14.
 */
public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_producer_group_name_5");

        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                // Create a message instance, specifying topic, tag and message body.
                Message msg = new Message(
                        "PullConsumerTopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

参考:http://www.jianshu.com/p/453c6e7ff81c

https://yq.aliyun.com/articles/55627

http://www.cnblogs.com/maypattis/p/5689187.html

============END============

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 239
博文 573
码字总数 417084
作品 0
朝阳
程序员
私信 提问
[RocketMQ]消息中间件—RocketMQ消息消费(一)

文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送...

morpheusWB
09/29
0
0
消息中间件—RocketMQ消息消费(一)

文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送...

癫狂侠
08/12
0
0
[RocketMQ]消息中间件—RocketMQ消息消费(二)(push模式实现)

摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢? 在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下Roc...

morpheusWB
09/29
0
0
消息中间件—RocketMQ消息消费(二)(push模式实现)

摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢? 在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下Roc...

癫狂侠
08/16
0
0
说说MQ之RocketMQ(二)

原文出处:Valleylord RocketMQ 的 Java API RocketMQ 是用 Java 语言开发的,因此,其 Java API 相对是比较丰富的,当然也有部分原因是 RocketMQ 本身提供的功能就比较多。RocketMQ API 提供...

Valleylord
10/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

利用ibeetl 实现selectpicker 的三级联动

1. js 直接写在html页面上面,ibeetl 就可以动态地利用后台传上来的model List ,不需要每次点击都要ajax请求后台 2. 使用selectpicker 的时候,除了对selecct option的动态处理后,还需要 $("#...

donald121
26分钟前
1
0
Android SELinux avc dennied权限问题解决方法

1. 概述 SELinux是Google从android 5.0开始,强制引入的一套非常严格的权限管理机制,主要用于增强系统的安全性。 然而,在开发中,我们经常会遇到由于SELinux造成的各种权限不足,即使拥有“...

TreasureWe
36分钟前
2
0
阿里云ACP认证详细笔记(一)

ECS--------------------------1.云服务器Elastic Compute Service(ECS)2.Terraform:您可以使用开源工具Terraform来预配和管理ECS资源。Terraform提供一种简单机制,能够将配置文件部署...

啃不动地大坚果
41分钟前
1
0
如何实现MetaMask签名授权后DAPP一键登录功能?

1 摘要 网站太多,各种用户名/密码实在记不住。所以我们逐渐接受了BAT账号的授权登录功能。在以太坊DAPP应用中,也可以使用MetaMask实现授权后一键登录功能。MetaMask是去中心化钱包,授权信...

HiBlock
43分钟前
3
0
raspberrypi的相关网址

一、NOOBS安装 NOOBS使用说明书 http://www.shumeipai.net/thread-20009-1-1.html NOOBS自定义多系统启动 https://www.jianshu.com/p/afbcd17b785d NOOBS安装自定义系统 https://blog.csdn.......

mbzhong
43分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部