文档章节

RocketMQ 消息订阅Subscribe—— Push & Pull 模式

秋风醉了
 秋风醉了
发布于 2017/06/14 15:35
字数 1277
阅读 1381
收藏 0

RocketMQ 消息订阅Subscribe—— Push & Pull 模式

 

RocketMQ消息订阅的两种模式

RocketMQ消息订阅有两种模式,一种是Push模式(MQPushConsumer),即MQServer主动向消费端推送;另外一种是Pull模式(MQPullConsumer),即消费端在需要时,主动到MQServer拉取。

但在具体实现时,Push和Pull模式都是采用消费端主动拉取的方式,即consumer轮询从broker拉取消息。

区别是:

Push方式里,consumer把轮询过程封装了,并注册MessageListener监听器,取到消息后,唤醒MessageListener的consumeMessage()来消费,对用户而言,感觉消息是被推送过来的。

Pull方式里,取消息的过程需要用户自己写,首先通过打算消费的Topic拿到MessageQueue的集合,遍历MessageQueue集合,然后针对每个MessageQueue批量取消息,一次取完后,记录该队列下一次要取的开始offset,直到取完了,再换另一个MessageQueue。

 

Push && Pull 的使用场景

慢消费

慢消费无疑是Push模式最大的致命伤,如果消费者的速度比发送者的速度慢很多,势必造成消息在broker的堆积。假设这些消息都是有用的无法丢弃的,消息就要一直在broker端保存。当然这还不是最致命的,最致命的是broker给consumer推送一堆consumer无法处理的消息,consumer不是reject就是error,然后来回踢皮球。

反观Pull模式,consumer可以按需消费,不用担心自己处理不了的消息来骚扰自己,而broker堆积消息也会相对简单,无需记录每一个要发送消息的状态,只需要维护所有消息的队列和偏移量就可以了。所以对于慢消费,消息量有限且到来的速度不均匀的情况,pull模式比较合适。

 

消息延迟与忙等

这是Pull模式最大的短板。由于主动权在消费方,消费方无法准确地决定何时去拉取最新的消息。如果一次Pull取到消息了还可以继续去Pull,如果没有Pull取到则需要等待一段时间重新Pull。但等待多久就很难判定了。你可能会说,我可以有xx动态Pull拉取时间调整算法,但问题的本质在于,有没有消息到来这件事情决定权不在消费方。也许1分钟内连续来了1000条消息,然后半个小时没有新消息产生,可能你的算法算出下次最有可能到来的时间点是31分钟之后,或者60分钟之后,结果下条消息10分钟后到了,是不是很让人沮丧?

当然也不是说延迟就没有解决方案了,业界较成熟的做法是从短时间开始(不会对broker有太大负担),然后指数级增长等待。比如开始等5ms,然后10ms,然后20ms,然后40ms……直到有消息到来,然后再回到5ms。

即使这样,依然存在延迟问题:假设40ms到80ms之间的50ms消息到来,消息就延迟了30ms,而且对于半个小时来一次的消息,这些开销就是白白浪费的。

在RocketMQ里,有一种优化的做法——长轮询 Pull ,来平衡推拉模型各自的缺点。基本思路是:消费者如果尝试拉取失败,不是直接return,而是把连接挂在那里wait,服务端如果有新的消息到来,把连接notify起来,这也是不错的思路。但海量的长连接block对系统的开销还是不容小觑的,还是要合理的评估时间间隔,给wait加一个时间上限比较好。

长轮询 Pull 的原理类似于 :http://www.ibm.com/developerworks/cn/web/wa-lo-comet/

 

MQPullConsumer 的使用

PullConsumer.java

package com.rocketmq.demo.pull;

import org.apache.rocketmq.client.consumer.DefaultMQPullConsumer;
import org.apache.rocketmq.client.consumer.PullResult;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
 * Created by xinxingegeya on 2017/6/14.
 */
public class PullConsumer {

    private static final Map<MessageQueue, Long> offsetTable = new HashMap<MessageQueue, Long>();

    public static void main(String[] args) throws MQClientException {
        DefaultMQPullConsumer consumer = new DefaultMQPullConsumer("please_rename_unique_consumer_group_name_5");
        consumer.setNamesrvAddr("127.0.0.1:9876");
        consumer.start();
        Set<MessageQueue> mqs = consumer.fetchSubscribeMessageQueues("PullConsumerTopicTest");
        for (MessageQueue mq : mqs) {
            System.err.println("Consume from the queue: " + mq);
            SINGLE_MQ:
            while (true) {
                try {
                    PullResult pullResult = consumer.pullBlockIfNotFound(mq, null, getMessageQueueOffset(mq), 32);
                    System.out.println(pullResult);
                    putMessageQueueOffset(mq, pullResult.getNextBeginOffset());

                    switch (pullResult.getPullStatus()) {
                        case FOUND:
                            List<MessageExt> messageExtList = pullResult.getMsgFoundList();
                            for (MessageExt m : messageExtList) {
                                System.out.println(new String(m.getBody()));
                            }
                            break;
                        case NO_MATCHED_MSG:
                            break;
                        case NO_NEW_MSG:
                            break SINGLE_MQ;
                        case OFFSET_ILLEGAL:
                            break;
                        default:
                            break;
                    }

                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        }
        consumer.shutdown();
    }

    private static void putMessageQueueOffset(MessageQueue mq, long offset) {
        offsetTable.put(mq, offset);
    }

    private static long getMessageQueueOffset(MessageQueue mq) {
        Long offset = offsetTable.get(mq);
        if (offset != null)
            return offset;
        return 0;
    }
}

Producer.java

package com.rocketmq.demo.pull;

import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.SendResult;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.remoting.common.RemotingHelper;

/**
 * Created by xinxingegeya on 2017/6/14.
 */
public class Producer {

    public static void main(String[] args) throws MQClientException, InterruptedException {
        DefaultMQProducer producer = new DefaultMQProducer("please_rename_unique_producer_group_name_5");

        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();
        for (int i = 0; i < 1000; i++) {
            try {
                // Create a message instance, specifying topic, tag and message body.
                Message msg = new Message(
                        "PullConsumerTopicTest" /* Topic */,
                        "TagA" /* Tag */,
                        ("Hello RocketMQ " + i).getBytes(RemotingHelper.DEFAULT_CHARSET) /* Message body */
                );
                //Call send message to deliver message to one of brokers.
                SendResult sendResult = producer.send(msg);
                System.out.printf("%s%n", sendResult);
            } catch (Exception e) {
                e.printStackTrace();
                Thread.sleep(1000);
            }
        }
        // Shut down once the producer instance is not longer in use.
        producer.shutdown();
    }
}

参考:http://www.jianshu.com/p/453c6e7ff81c

https://yq.aliyun.com/articles/55627

http://www.cnblogs.com/maypattis/p/5689187.html

============END============

© 著作权归作者所有

共有 人打赏支持
秋风醉了
粉丝 241
博文 566
码字总数 417505
作品 0
朝阳
程序员
私信 提问
[RocketMQ]消息中间件—RocketMQ消息消费(一)

文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送...

morpheusWB
2018/09/29
0
0
消息中间件—RocketMQ消息消费(一)

文章摘要:在发送消息给RocketMQ后,消费者需要消费。消息的消费比发送要复杂一些,那么RocketMQ是如何来做的呢? 在RocketMQ系列文章的前面几篇幅中已经对其“RPC通信部分”和“普通消息发送...

癫狂侠
2018/08/12
0
0
消息中间件—RocketMQ消息消费(二)(push模式实现)

摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢? 在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下Roc...

癫狂侠
2018/08/16
0
0
[RocketMQ]消息中间件—RocketMQ消息消费(二)(push模式实现)

摘要:在RocketMQ中,消息消费都是基于Pull消息方式,那么Push模式中又是如何实现Consumer端准实时消费的呢? 在上一篇—“消息中间件—RocketMQ消息消费(一)”中,已经简要地介绍了下Roc...

morpheusWB
2018/09/29
0
0
说说MQ之RocketMQ(二)

原文出处:Valleylord RocketMQ 的 Java API RocketMQ 是用 Java 语言开发的,因此,其 Java API 相对是比较丰富的,当然也有部分原因是 RocketMQ 本身提供的功能就比较多。RocketMQ API 提供...

Valleylord
2018/10/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

MongoDB

关于MongoDb 1、MongoDB是一个开源的、基于分布式的、面向文档存储的非关系型数据库。 2、是非关系型数据库当中功能最丰富、最像关系数据库的。 3、由C++编写, MongoDB可以运行在Windows、u...

谢思华
17分钟前
1
0
Node.js 进程平滑离场剖析

本文由云+社区发表 作者:草小灰 使用 Node.js 搭建 HTTP Server 已是司空见惯的事。在生产环境中,Node 进程平滑重启直接关系到服务的可靠性,它的重要性不容我们忽视。既然是平滑重启,就涉...

腾讯云加社区
25分钟前
1
0
druid等 olap框架对比分析

简介 Druid 是一个开源的,分布式的,列存储的,适用于实时数据分析的存储系统,能够快速聚合、灵活过滤、毫秒级查询、和低延迟数据导入。 Druid在设计时充分考虑到了高可用性,各种节点挂掉...

hblt-j
25分钟前
0
0
Idea 禁用代码重复等警告

在相应方法上添加注解 @SuppressWarnings("action") @Transactional@SuppressWarnings("Duplicates")public void analyzeDpBook(DpBook dpBook) { EpubBook _epubBook = epubBookSe......

lemos
28分钟前
0
0
MaxCompute studio与权限那些事儿

背景知识 MaxCompute拥有一套强大的安全体系,来保护项目空间里的数据安全。用户在使用MaxCompute时,应理解权限的一些基本概念: 权限可分解为三要素,即主体(用户账号或角色),客体(表/...

阿里云官方博客
29分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部