文档章节

RocketMQ源码分析(二)Producer端发送数据

乒乓狂魔
 乒乓狂魔
发布于 2016/10/09 10:34
字数 2148
阅读 707
收藏 1
点赞 0
评论 1

1 系列

  • 整体架构图
  • producer端发送消息
  • broker端接收消息
  • broker端消息的存储
  • consumer消费消息
  • 分布式事务的实现
  • 定时消息的实现
  • 关于顺序消费话题
  • 关于重复消息话题
  • 关于高可用话题

2 发送消息案例

下面分别给出发送消息的3个官方案例

2.1 同步发送

同步发送

同步发送直接得到响应结果

2.2 异步发送

异步发送

在发送的时候需要指定一个SendCallback回调,用于处理发送成功和发送异常的结果

2.3 顺序发送

顺序发送

在发送的时候需要指定一个MessageQueueSelector,用于选择将该消息发送到哪一个消息队列。可以看到顺序发送内部也是采用同步来发送的

下面就来探讨下Producer要解决的问题

3 Producer的源码介绍

3.1 通信层的设计

RocketMQ采用RemotingClient来实现底层的通信

3.1.1 三种发送方式

RemotingClient定义了如下三种通信方式:

同步发送:

public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
                                  final long timeoutMillis)

异步发送:

public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
                        final InvokeCallback invokeCallback)

单方向发送(不需要知道响应):

public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)

这里发送请求的数据是RemotingCommand,会将RemotingCommand编码成字节数组,服务器端对字节数组进行解码成RemotingCommand,然后处理,得到响应结果为RemotingCommand。

上述RemotingClient接口的实现是NettyRemotingClient,采用Netty的客户端Bootstrap来实现。

需要做的就是选择EventLoopGroup、实现编解码器,以及实现Netty的ChannelHandler

代码片段如下:

this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);


        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
            //
            .option(ChannelOption.TCP_NODELAY, true)
            //
            .option(ChannelOption.SO_KEEPALIVE, false)
            //
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            //
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            //
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            //
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(//
                            defaultEventExecutorGroup, //
                            new NettyEncoder(), //
                            new NettyDecoder(), //
                            new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //
                            new NettyConnetManageHandler(), //
                            new NettyClientHandler());
                }
            });

这里选择的EventLoopGroup是NioEventLoopGroup,编解码器是NettyEncoder、NettyDecoder。这2个编解码器就是用于上述的RemotingCommand和字节数组之间的转换的。

再来看看上述Netty的ChannelHandler的实现NettyClientHandler

3.1.2 NettyClientHandler

它将读到的数据分成2类,一类是服务器端返回的响应数据,另一类是服务器端请求客户端的数据,如下所示

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

根据RemotingCommand的类型标识位,如果是REQUEST_COMMAND类型,表明是服务器端请求客户端的,如果是RESPONSE_COMMAND表明是服务器端返回的响应数据。

如果是REQUEST_COMMAND:客户端需要根据服务器端的请求码,找到对应的处理函数以及线程池来进行处理,处理完成之后返回响应给服务器端

NettyClientHandler中有这样的一个数据结构

HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable

存储着请求码对应的处理器NettyRequestProcessor,以及该处理器对应的线程池ExecutorService。也就是说服务器端请求客户端执行某个操作,客户端会拿出对应的处理器在对应的线程池中来执行相应的处理。

如果是RESPONSE_COMMAND:客户端根据参数RemotingCommand得到请求id,根据这个请求id找到之前的请求,这里是ResponseFuture,设置这个ResponseFuture的状态为执行结束。并通过判断这个ResponseFuture是否有回调函数,如果有则执行回调函数。

对于上述的同步发送:创建了一个ResponseFuture,并将请求id和ResponseFuture的映射关系存放在

ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable

NettyClientHandler的上述responseTable中,然后客户端等待响应,等待超时时间是可以指定的。一旦服务器端有响应则会走上述RESPONSE_COMMAND处理流程,此时客户端得到响应结果不再阻塞

对于上述异步发送:创建了一个ResponseFuture,这里会将异步发送的InvokeCallback放到ResponseFuture中,并将请求id和ResponseFuture的映射关系存放在responseTable中,客户端不阻塞,直接返回。一旦服务器端有响应则会走上述RESPONSE_COMMAND处理流程,会执行前面设置的InvokeCallback

对于上述单方向发送:不创建ResponseFuture。

大部分的同步、异步、单方向的通信都是采用上述类似的方式来实现的。

3.2 API的实现

上述RemotingClient负责通信实现,而MQClientAPIImpl则是利用RemotingClient来实现RocketMQ的相关功能,把具体的功能需求转化成RemotingCommand,交给RemotingClient去执行,如创建topic

public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
    requestHeader.setTopic(topicConfig.getTopicName());
    requestHeader.setDefaultTopic(defaultTopic);
    requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
    requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
    requestHeader.setPerm(topicConfig.getPerm());
    requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
    requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
    requestHeader.setOrder(topicConfig.isOrder());

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }

    throw new MQClientException(response.getCode(), response.getRemark());
}

以及其他的功能,如发送消息(Producer端需要的,也有上述对应的3种发送方式)、拉取消息(Consumer端需要的)等等

总结一下就是MQClientAPIImpl提供了很多的功能方法,如其名API,供调用者来使用。

3.3 服务的管理

目前MQClientAPIImpl提供了很多的一些API方法供我们使用,而MQClientInstance就是来具体使用这些API的,完成具体的服务的,这些服务有:

  • 启动一些定时任务(定时更新Name Server列表、定时更新路由信息、持久化Consumer的offset等等)

  • 启动拉取消息的线程服务(Consumer端需要的)

  • 启动rebalance线程服务(Consumer端需要的)

所以不管是Producer还是Consumer,他们都会含有MQClientInstance。MQClientInstance目前就是一个大杂烩。

3.4 对用户暴漏的接口

MQProducer则是对用户暴漏的发送消息的相关接口,如上述案例中我们使用的接口方法,该接口的实现则是由上述三部分来支撑起来的。

4 同步和异步以及阻塞和非阻塞

在很多的通信编程过程中会面临2个选择,socket编程采用BIO还是NIO?对用户暴漏的接口方法采用同步还是异步?

通常socket采用的是NIO,即请求和响应要通过请求id来进行匹配

对用户暴漏的接口方法可以提供同步或者异步的方式,如果是同步的方式,即上述的ResponseFuture等待一个超时时间,则超时时间是需要的,如果是异步方式则不需要超时时间

如RocketMQ、ZooKeeper,他们都是采用NIO,在NIO的基础上对用户暴漏同步和异步接口方法。

5 Producer要关注的问题

来看下发送失败后重试问题:

5.1 同步发送

需要一个timeout时间的

我们知道同步发送的原理就是ResponseFuture等待一个timeout时间,如果超过该时间发送端认为发送失败,然后就会进行重试,目前RocketMQ的同步重试次数设置是

private int retryTimesWhenSendFailed = 2;

默认是2

有一个重要问题就是:在NIO基础上实现同步发送,发送超时重试会存在消息重复的问题

同步发送等待一个timeout时间,如果超过该时间则发送端认为发送失败(但是服务器端可能已经接收并存储了该消息),此时客户端进行重试就可能会造成消息的重复。

其实在在BIO基础上的同步发送,发送超时重试也会存在消息重复的问题,即超时重试都是可能存在重复问题的。但是上述BIO和NIO超时的区别在于:如果是BIO超时则需要关闭该连接重新建立新的连接,如果是NIO超时则不需要关闭该连接。

重试都会从新选择消息队列来进行重试

5.2 异步发送

异步发送不需要超时时间

目前RocketMQ的重试配置如下:

private int retryTimesWhenSendAsyncFailed = 2;

也默认是2次

重试都会从新选择消息队列来进行重试

5.3 顺序发送

顺序发送通过选择一个固定的消息队列来进行同步发送,目前RocketMQ的顺序发送没有重试次数。我感觉顺序发送也可以重试啊,只是不去重新选择消息队列就行了。

欢迎关注微信公众号:乒乓狂魔

微信公众号

© 著作权归作者所有

共有 人打赏支持
乒乓狂魔
粉丝 979
博文 105
码字总数 271356
作品 0
长宁
程序员
加载中

评论(1)

S
Shaliu
后续的章节呢,想看
消息中间件—RocketMQ的RPC通信(一)

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键 一、RocketMQ中Remoting通信模块概览 ...

癫狂侠
06/30
0
0
RocketMQ(八):消息发送

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
06/19
0
0
RocketMQ(九):消息发送(续)

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
06/23
0
0
RocketMQ(四)——消息重试(包含生产 消费端的写法)

Producer,发送消息 */public class Producer { } Success consumption Failure consumption,later try to consume } Consumer,订阅消息 */public class Consumer { } consumer.subscribe("......

albertfly
05/28
0
0
RocketMQ与Kafka对比

RocketMQ与Kafka对比(18项差异) 淘宝内部的交易系统使用了淘宝自主研发的Notify消息中间件,使用Mysql作为消息存储媒介,可完全水平扩容,为了进一步降低成本,我们认为存储部分可以进一步...

莫问viva
2015/05/08
0
0
RocketMQ部分数据消费不了问题排查

问题现象 今天忽然收到RocketMQ预警信息如下: 提醒有部分数据没有消费,产生堆积情况。 打开RocketMq-Console-Ng查看如下图形式: 备注:第一反应是Consumer Group内订阅了多个topic?(为什...

匠心零度
05/14
0
0
RocketMQ(二):RPC通讯

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
04/04
0
0
让你rocketmq用得比预期要好的 1 种方法

匠心零度 转载请注明原创出处,谢谢! 方法 让你rocketmq用得比预期要好的 1 种方法:就是认真思考下面的几个问题: 使用rocketmq能解决那些问题?那些问题是不能解决的? 我们什么时候该添加...

匠心零度
04/19
0
0
RocketMQ:一个纯java的开源消息中间件--开发测试环境搭建

一、简介 RocketMQ的前身是Metaq,当Metaq3.0发布时,产品名称改为RocketMQ MetaQ2.x版本由于依赖了alibaba公司内部其他系统,对于公司外部用户使用不够友好,推荐使用3.0版本。 项目地址:h...

cloud-coder
2014/02/18
0
15
RocketMQ 消息发送与消费源码分析

MQ在我们日常开发过程中有着不可替代的作用,不仅可以帮助我们做到信息在系统间的传递,还能进行系统间的解耦合,也就是说消息的发送端与接收端不会有强依赖关系(例如接口调用)。市场上MQ的...

数齐
07/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

Android 获取各大音乐平台的真实下载地址

废话 电脑使用谷歌浏览器或者QQ浏览器的时候。。。。。。。说不清楚,还是看图吧 大概意思就是,只要网页上需要播放,只要能播放并且开始播放,这个过程就肯定会请求到相关的音乐资源,然后就...

她叫我小渝
12分钟前
0
0
shell中的函数、shell中的数组、告警系统需求分析

shell中的函数 格式: 格式: function f_name() { command } 函数必须要放在最前面 示例1(用来打印参数) 示例2(用于定义加法) 示例3(用于显示IP) shell中的数组 shell中的数组1 定义数...

Zhouliang6
今天
2
0
用 Scikit-Learn 和 Pandas 学习线性回归

      对于想深入了解线性回归的童鞋,这里给出一个完整的例子,详细学完这个例子,对用scikit-learn来运行线性回归,评估模型不会有什么问题了。 1. 获取数据,定义问题     没有...

wangxuwei
今天
1
0
MAC安装MAVEN

一:下载maven压缩包(Zip或tar可选),解压压缩包 二:打开终端输入:vim ~/.bash_profile(如果找不到该文件新建一个:touch ./bash_profile) 三:输入i 四:输入maven环境变量配置 MAVEN_HO...

WALK_MAN
今天
0
0
33.iptables备份与恢复 firewalld的9个zone以及操作 service的操作

10.19 iptables规则备份和恢复 10.20 firewalld的9个zone 10.21 firewalld关于zone的操作 10.22 firewalld关于service的操作 10.19 iptables规则备份和恢复: ~1. 保存和备份iptables规则 ~2...

王鑫linux
今天
2
0
大数据教程(2.11):keeperalived+nginx高可用集群搭建教程

上一章节博主为大家介绍了目前大型互联网项目的系统架构体系,相信大家应该注意到其中很重要的一块知识nginx技术,在本节博主将为大家分享nginx的相关技术以及配置过程。 一、nginx相关概念 ...

em_aaron
今天
1
0
Apache Directory Studio连接Weblogic内置LDAP

OBIEE默认使用Weblogic内置LDAP管理用户及组。 要整理已存在的用户及组,此前办法是导出安全数据,文本编辑器打开认证文件,使用正则表达式获取用户及组的信息。 后来想到直接用Apache Dire...

wffger
今天
2
0
HFS

FS,它是一种上传文件的软件。 专为个人用户所设计的 HTTP 档案系统 - Http File Server,如果您觉得架设 FTP Server 太麻烦,那么这个软件可以提供您更方便的档案传输系统,下载后无须安装,...

garkey
今天
1
0
Java IO类库之BufferedInputStream

一、BufferedInputStream介绍 /** * A <code>BufferedInputStream</code> adds * functionality to another input stream-namely, * the ability to buffer the input and to * sup......

老韭菜
今天
0
0
STM 32 窗口看门狗

http://bbs.elecfans.com/jishu_805708_1_1.html https://blog.csdn.net/a1985831055/article/details/77404131...

whoisliang
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部