文档章节

聊聊rocketmq producer的batch

go4it
 go4it
发布于 2019/12/13 23:43
字数 380
阅读 11
收藏 0

本文主要研究一下rocketmq producer的batch

batch

rocketmq-client-4.6.0-sources.jar!/org/apache/rocketmq/client/producer/DefaultMQProducer.java

public class DefaultMQProducer extends ClientConfig implements MQProducer {

	//......

    private MessageBatch batch(Collection<Message> msgs) throws MQClientException {
        MessageBatch msgBatch;
        try {
            msgBatch = MessageBatch.generateFromList(msgs);
            for (Message message : msgBatch) {
                Validators.checkMessage(message, this);
                MessageClientIDSetter.setUniqID(message);
                message.setTopic(withNamespace(message.getTopic()));
            }
            msgBatch.setBody(msgBatch.encode());
        } catch (Exception e) {
            throw new MQClientException("Failed to initiate the MessageBatch", e);
        }
        msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
        return msgBatch;
    }

    //......
}
  • DefaultMQProducer的batch方法接收Message集合,它会使用MessageBatch.generateFromList创建MessageBatch,之后遍历MessageBatch校验message,设置唯一id,更新topic以及body;最后返回MessageBatch

MessageBatch

rocketmq-common-4.6.0-sources.jar!/org/apache/rocketmq/common/message/MessageBatch.java

public class MessageBatch extends Message implements Iterable<Message> {

    private static final long serialVersionUID = 621335151046335557L;
    private final List<Message> messages;

    private MessageBatch(List<Message> messages) {
        this.messages = messages;
    }

    public byte[] encode() {
        return MessageDecoder.encodeMessages(messages);
    }

    public Iterator<Message> iterator() {
        return messages.iterator();
    }

    public static MessageBatch generateFromList(Collection<Message> messages) {
        assert messages != null;
        assert messages.size() > 0;
        List<Message> messageList = new ArrayList<Message>(messages.size());
        Message first = null;
        for (Message message : messages) {
            if (message.getDelayTimeLevel() > 0) {
                throw new UnsupportedOperationException("TimeDelayLevel is not supported for batching");
            }
            if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                throw new UnsupportedOperationException("Retry Group is not supported for batching");
            }
            if (first == null) {
                first = message;
            } else {
                if (!first.getTopic().equals(message.getTopic())) {
                    throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
                }
                if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                    throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
                }
            }
            messageList.add(message);
        }
        MessageBatch messageBatch = new MessageBatch(messageList);

        messageBatch.setTopic(first.getTopic());
        messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
        return messageBatch;
    }

}
  • MessageBatch继承了Message,实现了Iterable<Message>接口;其generateFromList方法会遍历messages,最后组装成messageBatch

小结

DefaultMQProducer的batch方法接收Message集合,它会使用MessageBatch.generateFromList创建MessageBatch,之后遍历MessageBatch校验message,设置唯一id,更新topic以及body;最后返回MessageBatch

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1223
码字总数 1140450
作品 0
深圳
私信 提问
rocketmq4.x快速入门指南

以下采用的是版本 相关文档如下 快速体验: http://blog.seoui.com/2018/07/24/rocketmqinstall/ rocketmq简单消息发送: http://blog.seoui.com/2018/07/24/rocketmq_simple_message/ rock......

peachyy
2018/08/02
0
0
聊聊rocketmq的ProducerImpl

序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java 发送消息的方法主要是代理给rocketmqProducer 另外调用OMSUtil.msgConver......

go4it
2018/07/28
38
0
聊聊rocketmq的SequenceProducerImpl

序 本文主要研究一下rocketmq的SequenceProducerImpl SequenceProducerImpl io/openmessaging/rocketmq/producer/SequenceProducerImpl.java 采用的是LinkedBlockingQueue,send方法实际调用......

go4it
2018/07/30
17
0
消息中间件 RocketMQ 源码解析 —— 调试环境搭建

摘要: 原创出处 www.iocoder.cn/RocketMQ/bu… 「芋道源码」欢迎转载,保留摘要,谢谢! 0. 友情提示 1. 依赖工具 2. 源码拉取 3. 启动 RocketMQ Namesrv 4. 启动 RocketMQ Broker 5. 启动 ...

芋道源码_以德服人_不服就干
2019/01/31
0
0
RocketMQ:一个纯java的开源消息中间件--开发测试环境搭建

一、简介 RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ MetaQ2.x版本由于依赖了alibaba公司内部其他系统,对于公司外部用户使用不够友好,推荐使用3.0版本。 项目地址:h...

cloud-coder
2014/02/18
1.3W
15

没有更多内容

加载失败,请刷新页面

加载更多

Kettle自定义jar包供javascript使用

我们都知道 Kettle 是用 Java 语言开发,并且可以在 JavaScript 里面直接调用 java 类方法。所以有些时候,我们可以自定义一些方法,来供 JavaScript 使用。 本篇文章有参考自:https://www...

CREATE_17
昨天
82
0
处理CSV文件中的逗号

我正在寻找有关如何处理正在创建的csv文件的建议,然后由我们的客户上传,并且该值可能带有逗号(例如公司名称)。 我们正在研究的一些想法是:带引号的标识符(值“,”值“,”等)或使用|...

javail
昨天
79
0
如何克隆一个Date对象?

将Date变量分配给另一个变量会将引用复制到同一实例。 这意味着更改一个将更改另一个。 如何实际克隆或复制Date实例? #1楼 简化版: Date.prototype.clone = function () { return new ...

技术盛宴
昨天
73
0
计算一个数的数位之和

计算一个数的数位之和 例如:128 :1+2+8 = 11 public int numSum(int num) { int sum = 0; do { sum += num % 10; } while ((num = num / 10) > 0); return sum;......

SongAlone
昨天
124
0
为什么图片反复压缩后普遍会变绿,而不是其他颜色?

作者:Lion Yang 链接:https://www.zhihu.com/question/29355920/answer/119088684 来源:知乎 著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。 业余版概要:安卓的...

shzwork
昨天
71
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部