文档章节

RocketMQ源码分析(二)Producer端发送数据

乒乓狂魔
 乒乓狂魔
发布于 2016/10/09 10:34
字数 2148
阅读 766
收藏 1

1 系列

  • 整体架构图
  • producer端发送消息
  • broker端接收消息
  • broker端消息的存储
  • consumer消费消息
  • 分布式事务的实现
  • 定时消息的实现
  • 关于顺序消费话题
  • 关于重复消息话题
  • 关于高可用话题

2 发送消息案例

下面分别给出发送消息的3个官方案例

2.1 同步发送

同步发送

同步发送直接得到响应结果

2.2 异步发送

异步发送

在发送的时候需要指定一个SendCallback回调,用于处理发送成功和发送异常的结果

2.3 顺序发送

顺序发送

在发送的时候需要指定一个MessageQueueSelector,用于选择将该消息发送到哪一个消息队列。可以看到顺序发送内部也是采用同步来发送的

下面就来探讨下Producer要解决的问题

3 Producer的源码介绍

3.1 通信层的设计

RocketMQ采用RemotingClient来实现底层的通信

3.1.1 三种发送方式

RemotingClient定义了如下三种通信方式:

同步发送:

public RemotingCommand invokeSync(final String addr, final RemotingCommand request,
                                  final long timeoutMillis)

异步发送:

public void invokeAsync(final String addr, final RemotingCommand request, final long timeoutMillis,
                        final InvokeCallback invokeCallback)

单方向发送(不需要知道响应):

public void invokeOneway(final String addr, final RemotingCommand request, final long timeoutMillis)

这里发送请求的数据是RemotingCommand,会将RemotingCommand编码成字节数组,服务器端对字节数组进行解码成RemotingCommand,然后处理,得到响应结果为RemotingCommand。

上述RemotingClient接口的实现是NettyRemotingClient,采用Netty的客户端Bootstrap来实现。

需要做的就是选择EventLoopGroup、实现编解码器,以及实现Netty的ChannelHandler

代码片段如下:

this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);


        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });

Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)//
            //
            .option(ChannelOption.TCP_NODELAY, true)
            //
            .option(ChannelOption.SO_KEEPALIVE, false)
            //
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            //
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            //
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            //
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline().addLast(//
                            defaultEventExecutorGroup, //
                            new NettyEncoder(), //
                            new NettyDecoder(), //
                            new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()), //
                            new NettyConnetManageHandler(), //
                            new NettyClientHandler());
                }
            });

这里选择的EventLoopGroup是NioEventLoopGroup,编解码器是NettyEncoder、NettyDecoder。这2个编解码器就是用于上述的RemotingCommand和字节数组之间的转换的。

再来看看上述Netty的ChannelHandler的实现NettyClientHandler

3.1.2 NettyClientHandler

它将读到的数据分成2类,一类是服务器端返回的响应数据,另一类是服务器端请求客户端的数据,如下所示

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

根据RemotingCommand的类型标识位,如果是REQUEST_COMMAND类型,表明是服务器端请求客户端的,如果是RESPONSE_COMMAND表明是服务器端返回的响应数据。

如果是REQUEST_COMMAND:客户端需要根据服务器端的请求码,找到对应的处理函数以及线程池来进行处理,处理完成之后返回响应给服务器端

NettyClientHandler中有这样的一个数据结构

HashMap<Integer/* request code */, Pair<NettyRequestProcessor, ExecutorService>> processorTable

存储着请求码对应的处理器NettyRequestProcessor,以及该处理器对应的线程池ExecutorService。也就是说服务器端请求客户端执行某个操作,客户端会拿出对应的处理器在对应的线程池中来执行相应的处理。

如果是RESPONSE_COMMAND:客户端根据参数RemotingCommand得到请求id,根据这个请求id找到之前的请求,这里是ResponseFuture,设置这个ResponseFuture的状态为执行结束。并通过判断这个ResponseFuture是否有回调函数,如果有则执行回调函数。

对于上述的同步发送:创建了一个ResponseFuture,并将请求id和ResponseFuture的映射关系存放在

ConcurrentHashMap<Integer /* opaque */, ResponseFuture> responseTable

NettyClientHandler的上述responseTable中,然后客户端等待响应,等待超时时间是可以指定的。一旦服务器端有响应则会走上述RESPONSE_COMMAND处理流程,此时客户端得到响应结果不再阻塞

对于上述异步发送:创建了一个ResponseFuture,这里会将异步发送的InvokeCallback放到ResponseFuture中,并将请求id和ResponseFuture的映射关系存放在responseTable中,客户端不阻塞,直接返回。一旦服务器端有响应则会走上述RESPONSE_COMMAND处理流程,会执行前面设置的InvokeCallback

对于上述单方向发送:不创建ResponseFuture。

大部分的同步、异步、单方向的通信都是采用上述类似的方式来实现的。

3.2 API的实现

上述RemotingClient负责通信实现,而MQClientAPIImpl则是利用RemotingClient来实现RocketMQ的相关功能,把具体的功能需求转化成RemotingCommand,交给RemotingClient去执行,如创建topic

public void createTopic(final String addr, final String defaultTopic, final TopicConfig topicConfig, final long timeoutMillis)
        throws RemotingException, MQBrokerException, InterruptedException, MQClientException {
    CreateTopicRequestHeader requestHeader = new CreateTopicRequestHeader();
    requestHeader.setTopic(topicConfig.getTopicName());
    requestHeader.setDefaultTopic(defaultTopic);
    requestHeader.setReadQueueNums(topicConfig.getReadQueueNums());
    requestHeader.setWriteQueueNums(topicConfig.getWriteQueueNums());
    requestHeader.setPerm(topicConfig.getPerm());
    requestHeader.setTopicFilterType(topicConfig.getTopicFilterType().name());
    requestHeader.setTopicSysFlag(topicConfig.getTopicSysFlag());
    requestHeader.setOrder(topicConfig.isOrder());

    RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.UPDATE_AND_CREATE_TOPIC, requestHeader);

    RemotingCommand response = this.remotingClient.invokeSync(MixAll.brokerVIPChannel(this.clientConfig.isVipChannelEnabled(), addr),
            request, timeoutMillis);
    assert response != null;
    switch (response.getCode()) {
        case ResponseCode.SUCCESS: {
            return;
        }
        default:
            break;
    }

    throw new MQClientException(response.getCode(), response.getRemark());
}

以及其他的功能,如发送消息(Producer端需要的,也有上述对应的3种发送方式)、拉取消息(Consumer端需要的)等等

总结一下就是MQClientAPIImpl提供了很多的功能方法,如其名API,供调用者来使用。

3.3 服务的管理

目前MQClientAPIImpl提供了很多的一些API方法供我们使用,而MQClientInstance就是来具体使用这些API的,完成具体的服务的,这些服务有:

  • 启动一些定时任务(定时更新Name Server列表、定时更新路由信息、持久化Consumer的offset等等)

  • 启动拉取消息的线程服务(Consumer端需要的)

  • 启动rebalance线程服务(Consumer端需要的)

所以不管是Producer还是Consumer,他们都会含有MQClientInstance。MQClientInstance目前就是一个大杂烩。

3.4 对用户暴漏的接口

MQProducer则是对用户暴漏的发送消息的相关接口,如上述案例中我们使用的接口方法,该接口的实现则是由上述三部分来支撑起来的。

4 同步和异步以及阻塞和非阻塞

在很多的通信编程过程中会面临2个选择,socket编程采用BIO还是NIO?对用户暴漏的接口方法采用同步还是异步?

通常socket采用的是NIO,即请求和响应要通过请求id来进行匹配

对用户暴漏的接口方法可以提供同步或者异步的方式,如果是同步的方式,即上述的ResponseFuture等待一个超时时间,则超时时间是需要的,如果是异步方式则不需要超时时间

如RocketMQ、ZooKeeper,他们都是采用NIO,在NIO的基础上对用户暴漏同步和异步接口方法。

5 Producer要关注的问题

来看下发送失败后重试问题:

5.1 同步发送

需要一个timeout时间的

我们知道同步发送的原理就是ResponseFuture等待一个timeout时间,如果超过该时间发送端认为发送失败,然后就会进行重试,目前RocketMQ的同步重试次数设置是

private int retryTimesWhenSendFailed = 2;

默认是2

有一个重要问题就是:在NIO基础上实现同步发送,发送超时重试会存在消息重复的问题

同步发送等待一个timeout时间,如果超过该时间则发送端认为发送失败(但是服务器端可能已经接收并存储了该消息),此时客户端进行重试就可能会造成消息的重复。

其实在在BIO基础上的同步发送,发送超时重试也会存在消息重复的问题,即超时重试都是可能存在重复问题的。但是上述BIO和NIO超时的区别在于:如果是BIO超时则需要关闭该连接重新建立新的连接,如果是NIO超时则不需要关闭该连接。

重试都会从新选择消息队列来进行重试

5.2 异步发送

异步发送不需要超时时间

目前RocketMQ的重试配置如下:

private int retryTimesWhenSendAsyncFailed = 2;

也默认是2次

重试都会从新选择消息队列来进行重试

5.3 顺序发送

顺序发送通过选择一个固定的消息队列来进行同步发送,目前RocketMQ的顺序发送没有重试次数。我感觉顺序发送也可以重试啊,只是不去重新选择消息队列就行了。

欢迎关注微信公众号:乒乓狂魔

微信公众号

© 著作权归作者所有

共有 人打赏支持
乒乓狂魔
粉丝 1002
博文 105
码字总数 271356
作品 0
长宁
程序员
加载中

评论(1)

S
Shaliu
后续的章节呢,想看
消息中间件—RocketMQ的RPC通信(一)

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键 一、RocketMQ中Remoting通信模块概览 ...

癫狂侠
06/30
0
0
RocketMQ(九):消息发送(续)

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
06/23
0
0
RocketMQ(八):消息发送

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
06/19
0
0
RocketMQ源码:有序消息分析

本文主要分析RocketMQ中如何保证消息有序的。 RocketMQ的版本为:4.2.0 release。 一.时序图 还是老规矩,先把分析过程的时序图摆出来: 1.Producer发送顺序消息 2.Consumer接收顺序消息(一)...

Jacktanger
09/16
0
0
RocketMQ(四)——消息重试(包含生产 消费端的写法)

Producer,发送消息 */public class Producer { } Success consumption Failure consumption,later try to consume } Consumer,订阅消息 */public class Consumer { } consumer.subscribe("......

albertfly
05/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

搬瓦工镜像站bwh1.net被DNS污染,国内打不开搬瓦工官网

今天下午(2018年10月17日),继搬瓦工主域名bandwagonhost.com被污染后,这个国内的镜像地址bwh1.net也被墙了。那么目前应该怎么访问搬瓦工官网呢? 消息来源:搬瓦工优惠网->搬瓦工镜像站b...

flyzy2005
今天
2
0
SpringBoot自动配置

本篇介绍下,如何通过springboot的自动配置,将公司项目内的依赖jar,不需要扫描路径,依赖jar的情况下,就能将jar内配置了@configuration注解的类,创建到IOC里面 介绍下开发环境 JDK版本1.8 spr...

贺小五
今天
3
0
命令行新建Maven多项目

参考地址 # DgroupId 可以理解为包名# DartifactId 可以理解为项目名mvn archetype:generate -DgroupId=cn.modfun -DartifactId=scaffold -DarchetypeArtifactId=maven-archetype-quickst......

阿白
今天
1
0
OSChina 周四乱弹 —— 上帝对我单身年限的惩罚越来越长了

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @达尔文:分享张卫健的单曲《身体健康》 《身体健康》- 张卫健 手机党少年们想听歌,请使劲儿戳(这里) 昨天是重阳节咯, 可惜小小编辑总是晚...

小小编辑
今天
12
1
django rest framework 外键序列化方法与问题总结

django rest framework 外键序列化方法与问题总结 当借口中需要出现一对多关系的时候,我们可以用rest_framwork的序列化功能来处理,代码如下. # models.pyfrom django.db import modelscl...

_Change_
昨天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部