文档章节

聊聊rocketmq的SequenceProducerImpl

go4it
 go4it
发布于 2018/07/30 15:37
字数 323
阅读 54
收藏 0

本文主要研究一下rocketmq的SequenceProducerImpl

SequenceProducerImpl

io/openmessaging/rocketmq/producer/SequenceProducerImpl.java

public class SequenceProducerImpl extends AbstractOMSProducer implements SequenceProducer {

    private BlockingQueue<Message> msgCacheQueue;

    public SequenceProducerImpl(final KeyValue properties) {
        super(properties);
        this.msgCacheQueue = new LinkedBlockingQueue<>();
    }

    @Override
    public KeyValue properties() {
        return properties;
    }

    @Override
    public void send(final Message message) {
        checkMessageType(message);
        org.apache.rocketmq.common.message.Message rmqMessage = OMSUtil.msgConvert((BytesMessage) message);
        try {
            Validators.checkMessage(rmqMessage, this.rocketmqProducer);
        } catch (MQClientException e) {
            throw checkProducerException(rmqMessage.getTopic(), message.headers().getString(MessageHeader.MESSAGE_ID), e);
        }
        msgCacheQueue.add(message);
    }

    @Override
    public void send(final Message message, final KeyValue properties) {
        send(message);
    }

    @Override
    public synchronized void commit() {
        List<Message> messages = new ArrayList<>();
        msgCacheQueue.drainTo(messages);

        List<org.apache.rocketmq.common.message.Message> rmqMessages = new ArrayList<>();

        for (Message message : messages) {
            rmqMessages.add(OMSUtil.msgConvert((BytesMessage) message));
        }

        if (rmqMessages.size() == 0) {
            return;
        }

        try {
            SendResult sendResult = this.rocketmqProducer.send(rmqMessages);
            String[] msgIdArray = sendResult.getMsgId().split(",");
            for (int i = 0; i < messages.size(); i++) {
                Message message = messages.get(i);
                message.headers().put(MessageHeader.MESSAGE_ID, msgIdArray[i]);
            }
        } catch (Exception e) {
            throw checkProducerException("", "", e);
        }
    }

    @Override
    public synchronized void rollback() {
        msgCacheQueue.clear();
    }
}
  • 采用的是LinkedBlockingQueue,send方法实际调用的是添加到队列
  • 另外提供了commit以及rollback方法,都加了synchronized保证对LinkedBlockingQueue操作的线程安全
  • commit的时候,将queue的数据drainTo到list,然后批量发送;rollback的时候清空整个LinkedBlockingQueue

小结

rocketmq的SequenceProducerImpl在send方法的时候不是真正方法,而是添加到队列,只有在commit的时候才批量发送,rollback的时候清空队列。这里的send方法语义不是太好,可以改为pending之类的名称。

doc

© 著作权归作者所有

go4it
粉丝 91
博文 1258
码字总数 1173514
作品 0
深圳
私信 提问
加载中

评论(0)

聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
445
0
聊聊rocketmq的NettyEncoder及NettyDecoder

序 本文主要研究一下rocketmq的NettyEncoder及NettyDecoder NettyEncoder org/apache/rocketmq/remoting/netty/NettyEncoder.java 这里继承MessageToByteEncoder,类型是RemotingCommand,先......

go4it
2018/08/07
36
0
聊聊rocketmq的NettyClientConfig

序 本文主要研究一下rocketmq的NettyClientConfig NettyClientConfig org/apache/rocketmq/remoting/netty/NettyClientConfig.java 这里主要有几个参数: clientWorkerThreads,默认为4 cli......

go4it
2018/08/04
225
0
聊聊rocketmq的ProducerImpl

序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java 发送消息的方法主要是代理给rocketmqProducer 另外调用OMSUtil.msgConver......

go4it
2018/07/28
39
0
聊聊rocketmq的PullConsumerImpl

序 本文主要研究一下rocketmq的PullConsumerImpl PullConsumerImpl io/openmessaging/rocketmq/consumer/PullConsumerImpl.java 这里poll方法从LocalMessageCache里头拉取消息 LocalMessage......

go4it
2018/07/31
39
0

没有更多内容

加载失败,请刷新页面

加载更多

0228 我的潘多拉

我的潘多拉 从一个故事说起。<br />从前,有个Java程序员非常喜欢写程序,喜欢研究源码,读英文文档。但是它在一家小公司里工作,公司的技术栈很陈旧。<br /> <br />单个系统代码中含有很多的...

李福春carter
29分钟前
12
0
OSChina 周六乱弹 —— 屁会不会传染病毒

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《ハレハレヤ(朗朗晴天)》- 猫瑾 手机党少年们想听歌,请使劲儿戳(这里) @空格...

小小编辑
42分钟前
53
1
两个值得注意的问题

对成员变量的操作只能放在方法中,方法可以对成员变量和方法体中自己定义的局部 变量进行操作.在定义类的成员变量时可以同时赋予初值,如 class A { int a=12; float b=12.56f; } 但是不可以这...

咔啡
51分钟前
27
0
第三章 分布式服务框架的选择

1.大项目工程且多人维护的弊端 (1)项目团队协同成本高,业务响应越来越慢 (2)应用复杂度已超出人的认知负载(向杂乱的电线一样) (3)错误难于隔离(一个模块出错,整个系统挂掉) (4...

zxx901221
今天
68
0
eclipse 上传jar到远程仓库

使用maven的项目中,有时需要把本地的项目打成jar包上传到mevan仓库。 操作如下: 前提:pom文件中配置好远程库的地址,否则会报错 一、将maven 中的settings文件配置好用户名和密码,如下:...

文文1
昨天
63
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部