文档章节

RocketMQ client客户端模块源码分析一(生产者)

杨武兵
 杨武兵
发布于 2016/05/27 10:23
字数 1994
阅读 1878
收藏 9

类结构图

该图只列出了核心的接口和一些关键的实现类,简化了结构图,让我们对client模块有个整体的认识。从图中我们可以看出以下关键点。

  1. 客户端包的核心角色是MQ管理者、MQ消息生产者、MQ消息消费者。这与它的功能相互吻合。
  2. 客户端包的设计应用了门面模式。对外针对开发者有一套简单的api,对内有内部的接口和实现,这些是不会暴露给开发者的,因此开发者再使用的时候请不要面向impl包及其下面的实现类进行编程,而应该面向抽象编程,否则当客户端包升级的时候,我们就会很尴尬了。
  3. 生产者和消费者有公共的职责和实现,他们通过组合都使用了MQClientApiImpl这个类实现了与nameserver和broker的远程通讯。

生产者源码分析

生产者角色的实现相对简单一些,让我们先来看看RocketMQ的客户端中的生产者源码是什么样的。

使用案例

消息生产者使用方法如下面代码所示:

package com.sean;

import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;

public class Producer {
	public static void main(String[] args){
		DefaultMQProducer producer = new DefaultMQProducer("Producer");
		producer.setNamesrvAddr("192.168.58.163:9876"); 
		try {
			producer.start();
			
			Message msg = new Message("PushTopic", 
					"push", 
					"1", 
					"Just for test.".getBytes());
			
			SendResult result = producer.send(msg);
			System.out.println("id:" + result.getMsgId() +
					" result:" + result.getSendStatus());
			
			msg = new Message("PushTopic", 
					"push", 
					"2", 
					"Just for test.".getBytes());
			
			result = producer.send(msg);
			System.out.println("id:" + result.getMsgId() +
					" result:" + result.getSendStatus());
			
			msg = new Message("PullTopic", 
					"pull", 
					"1", 
					"Just for test.".getBytes());
			
			result = producer.send(msg);
			System.out.println("id:" + result.getMsgId() +
					" result:" + result.getSendStatus());
		} catch (Exception e) {
			e.printStackTrace();
		}finally{
			producer.shutdown();
		}
	}
}

可以看出来先构造一个生产者对象,设置必要的配置信息包括nameServer服务器地址namesrvAddr,生产者集群名称producerGroup(一般是应用的名称)即可,然后调用api发送消息到broker上去了。

核心源码分析

发送消息核心源码实现位于类DefaultMQProducer中,它的核心方法send又委托给了内部实现类DefaultMQProducerImpl去实现了,因此我进入到DefaultMQProducerImpl的sendi方法中一探究竟。

send方法最终委托给了一个内部方法sendDefaultImpl,关键是这个方法的源码实现,我们一起看看这个源码。

 private SendResult sendDefaultImpl(//
            Message msg,//
            final CommunicationMode communicationMode,//
            final SendCallback sendCallback, final long timeout//
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK(); //检查服务状态
        Validators.checkMessage(msg, this.defaultMQProducer);//检查消息合法性

        final long maxTimeout = this.defaultMQProducer.getSendMsgTimeout() + 1000; //发送超时时间。
        final long beginTimestamp = System.currentTimeMillis(); //发送开始时间。
        long endTimestamp = beginTimestamp;
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());//获得topic对应的发布信息。
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            int timesTotal = 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed(); //重试次数,重试几次避免网络引起的不可用。
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal && (endTimestamp - beginTimestamp) < maxTimeout; times++) {//尝试发送设置的次数,若已经超时了则不再发送。
                String lastBrokerName = null == mq ? null : mq.getBrokerName();//上次尝试发送使用的broker。
                MessageQueue tmpmq = topicPublishInfo.selectOneMessageQueue(lastBrokerName);//选择一个borker,一般会避免使用上次尝试过的broker。
                if (tmpmq != null) {
                    mq = tmpmq;
                    brokersSent[times] = mq.getBrokerName();//记录发送过的broker列表。
                    try {
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, timeout);//调用该方法真正发送。
                        endTimestamp = System.currentTimeMillis();//记录发送耗时。
                        switch (communicationMode) {//根据通讯模式来针对发送结果做处理。
                        case ASYNC:
                            return null;//异步发送立即返回。
                        case ONEWAY:
                            return null;//单向通讯也立即返回。
                        case SYNC: //同步发送则如果未成功则重试,否则直接返回发送结果。
                            if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                    continue;
                                }
                            }

                            return sendResult;
                        default:
                            break;
                        }
                    }
                    catch (RemotingException e) {
                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        exception = e;
                        endTimestamp = System.currentTimeMillis();
                        continue;
                    }
                    catch (MQClientException e) {
                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        exception = e;
                        endTimestamp = System.currentTimeMillis();
                        continue;
                    }
                    catch (MQBrokerException e) {
                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        exception = e;
                        endTimestamp = System.currentTimeMillis();
                        switch (e.getResponseCode()) {
                        case ResponseCode.TOPIC_NOT_EXIST:
                        case ResponseCode.SERVICE_NOT_AVAILABLE:
                        case ResponseCode.SYSTEM_ERROR:
                        case ResponseCode.NO_PERMISSION:
                        case ResponseCode.NO_BUYER_ID:
                        case ResponseCode.NOT_IN_CURRENT_UNIT:
                            continue;
                        default:
                            if (sendResult != null) {
                                return sendResult;
                            }

                            throw e;
                        }
                    }
                    catch (InterruptedException e) {
                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                }
                else {
                    break;
                }
            } // end of for

            if (sendResult != null) {
                return sendResult; //返回发送结果。
            }

            String info =
                    String.format("Send [%d] times, still failed, cost [%d]ms, Topic: %s, BrokersSent: %s", //
                        times, //
                        (System.currentTimeMillis() - beginTimestamp), //
                        msg.getTopic(),//
                        Arrays.toString(brokersSent)); //构造错误信息。

            info += FAQUrl.suggestTodo(FAQUrl.SEND_MSG_FAILED);

            throw new MQClientException(info, exception); //发送失败抛出异常,则发送应用要做好相应的业务处理。
        }

        List<String> nsList = this.getmQClientFactory().getMQClientAPIImpl().getNameServerAddressList();
        if (null == nsList || nsList.isEmpty()) {
            throw new MQClientException("No name server address, please set it."
                    + FAQUrl.suggestTodo(FAQUrl.NAME_SERVER_ADDR_NOT_EXIST_URL), null);
        }//获得队注册服务器失败异常。



        throw new MQClientException("No route info of this topic, " + msg.getTopic()
                + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO), null); //没有对应的路由信息异常。
    }

 

从代码我们可以发现发送消息有这些特点。

  1. 发送消息支持失败重试,还支持超时退出抛异常的可能性。这主要都是为了提升可用性和容错性。
  2. 发送消息支持异步、同步和单向等三种通讯模型。重试次数只对同步模式有效。
  3. 发送可能导致发送重复消息,因为发送失败有可能不是真的失败,服务器可能已经存储了消息,因此消费要支持幂等性。
  4. 发送应用要做好发送失败的异常处理,避免丢失消息。
  5. 发送消息选择broker和queueId的时候是轮询的,这个路由策略可能需要支持更多扩展,比如就近发送。

该方法又调用了方法sendKernelImpl发送消息,再进入该方法看看。

 private SendResult sendKernelImpl(final Message msg,//
            final MessageQueue mq,//
            final CommunicationMode communicationMode,//
            final SendCallback sendCallback,//
            final long timeout) throws MQClientException, RemotingException, MQBrokerException,
            InterruptedException {
        String brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName()); //获得broker服务器的地址。
        if (null == brokerAddr) {
            tryToFindTopicPublishInfo(mq.getTopic());
            brokerAddr = this.mQClientFactory.findBrokerAddressInPublish(mq.getBrokerName());//如果地址为空,则再更新一次,。
        }

        SendMessageContext context = null;
        if (brokerAddr != null) { //地址不为空,则继续发送消息。
            byte[] prevBody = msg.getBody();
            try {
                int sysFlag = 0;
                if (this.tryToCompressMessage(msg)) { //根据压缩配置执行消息压缩。
                    sysFlag |= MessageSysFlag.CompressedFlag; //压缩标记会记录在消息附件信息中,接收到消息根据标记解压
                }

                final String tranMsg = msg.getProperty(MessageConst.PROPERTY_TRANSACTION_PREPARED); //事务标志。
                if (tranMsg != null && Boolean.parseBoolean(tranMsg)) {
                    sysFlag |= MessageSysFlag.TransactionPreparedType;
                }

                if (hasCheckForbiddenHook()) {
                    CheckForbiddenContext checkForbiddenContext = new CheckForbiddenContext();
                    checkForbiddenContext.setNameSrvAddr(this.defaultMQProducer.getNamesrvAddr());
                    checkForbiddenContext.setGroup(this.defaultMQProducer.getProducerGroup());
                    checkForbiddenContext.setCommunicationMode(communicationMode);
                    checkForbiddenContext.setBrokerAddr(brokerAddr);
                    checkForbiddenContext.setMessage(msg);
                    checkForbiddenContext.setMq(mq);
                    checkForbiddenContext.setUnitMode(this.isUnitMode());
                    this.executeCheckForbiddenHook(checkForbiddenContext);
                }

                if (this.hasSendMessageHook()) {
                    context = new SendMessageContext();
                    context.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                    context.setCommunicationMode(communicationMode);
                    context.setBornHost(this.defaultMQProducer.getClientIP());
                    context.setBrokerAddr(brokerAddr);
                    context.setMessage(msg);
                    context.setMq(mq);
                    this.executeSendMessageHookBefore(context);
                }

                //组装发送消息的请求头。

                SendMessageRequestHeader requestHeader = new SendMessageRequestHeader();
                requestHeader.setProducerGroup(this.defaultMQProducer.getProducerGroup());
                requestHeader.setTopic(msg.getTopic());
                requestHeader.setDefaultTopic(this.defaultMQProducer.getCreateTopicKey());
                requestHeader.setDefaultTopicQueueNums(this.defaultMQProducer.getDefaultTopicQueueNums());
                requestHeader.setQueueId(mq.getQueueId());
                requestHeader.setSysFlag(sysFlag);
                requestHeader.setBornTimestamp(System.currentTimeMillis());
                requestHeader.setFlag(msg.getFlag());
                requestHeader.setProperties(MessageDecoder.messageProperties2String(msg.getProperties()));
                requestHeader.setReconsumeTimes(0);
                requestHeader.setUnitMode(this.isUnitMode());
                //重试消息特殊处理。
                if (requestHeader.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
                    String reconsumeTimes = MessageAccessor.getReconsumeTime(msg);
                    if (reconsumeTimes != null) {
                        requestHeader.setReconsumeTimes(new Integer(reconsumeTimes));
                        MessageAccessor.clearProperty(msg, MessageConst.PROPERTY_RECONSUME_TIME);
                    }
                }

                //调用客户端api发送消息。
                SendResult sendResult = this.mQClientFactory.getMQClientAPIImpl().sendMessage(//
                    brokerAddr,// 1
                    mq.getBrokerName(),// 2
                    msg,// 3
                    requestHeader,// 4
                    timeout,// 5
                    communicationMode,// 6
                    sendCallback// 7
                    );

                if (this.hasSendMessageHook()) {
                    context.setSendResult(sendResult);
                    this.executeSendMessageHookAfter(context);
                }

                return sendResult;
            }
            catch (RemotingException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            }
            catch (MQBrokerException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            }
            catch (InterruptedException e) {
                if (this.hasSendMessageHook()) {
                    context.setException(e);
                    this.executeSendMessageHookAfter(context);
                }
                throw e;
            }
            finally {
                msg.setBody(prevBody);
            }
        }

该方法的职责是组装发送消息的格式,更加底层的一些消息发送协议处理,它最终又调用了this.mQClientFactory.getMQClientAPIImpl().sendMessage发送消息,我们继续最终它。

    public SendResult sendMessage(//
            final String addr,// 1
            final String brokerName,// 2
            final Message msg,// 3
            final SendMessageRequestHeader requestHeader,// 4
            final long timeoutMillis,// 5
            final CommunicationMode communicationMode,// 6
            final SendCallback sendCallback// 7
    ) throws RemotingException, MQBrokerException, InterruptedException {
        if (!UtilAll.isBlank(projectGroupPrefix)) {
            msg.setTopic(VirtualEnvUtil.buildWithProjectGroup(msg.getTopic(), projectGroupPrefix));
            requestHeader.setProducerGroup(VirtualEnvUtil.buildWithProjectGroup(
                requestHeader.getProducerGroup(), projectGroupPrefix));
            requestHeader.setTopic(VirtualEnvUtil.buildWithProjectGroup(requestHeader.getTopic(),
                projectGroupPrefix));
        }
        //转化为底层的远程通讯命令对象,这是底层通讯协议。
        RemotingCommand request = null;
        if (sendSmartMsg) {
            SendMessageRequestHeaderV2 requestHeaderV2 =
                    SendMessageRequestHeaderV2.createSendMessageRequestHeaderV2(requestHeader);
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE_V2, requestHeaderV2);
        }
        else {
            request = RemotingCommand.createRequestCommand(RequestCode.SEND_MESSAGE, requestHeader);
        }

        request.setBody(msg.getBody());
        
        //三种通讯模型,调用对应的方法处理。
        switch (communicationMode) {
        case ONEWAY:
            this.remotingClient.invokeOneway(addr, request, timeoutMillis);
            return null;
        case ASYNC:
            this.sendMessageAsync(addr, brokerName, msg, timeoutMillis, request, sendCallback);
            return null;
        case SYNC:
            return this.sendMessageSync(addr, brokerName, msg, timeoutMillis, request);
        default:
            assert false;
            break;
        }

        return null;
    }

该方法最终又调用了rocketmq-remoting模块的远程通讯方法,通过该方法发送请求消息给broker服务器,根据地址找到缓存在本地的可用连接,这些连接缓存达到复用的目的,详细的内容参考该模块的源码分析。

© 著作权归作者所有

共有 人打赏支持
杨武兵

杨武兵

粉丝 259
博文 61
码字总数 123254
作品 1
昌平
架构师
私信 提问
消息中间件—RocketMQ的RPC通信(一)

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键 一、RocketMQ中Remoting通信模块概览 ...

癫狂侠
2018/06/30
0
0
消息中间件—RocketMQ消息发送

摘要:使用客户端发送一条消息很Easy,在这背后RocketMQ完成了怎么样的操作呢? 大道至简,消息队列可以简单概括为:“一发一存一收”,在这三个过程中消息发送最为简单,也比较容易入手,适...

癫狂侠
2018/08/05
0
0
RocketMQ源码分析之从官方示例窥探:RocketMQ事务消息实现基本思想

摘要: RocketMQ源码分析之从官方示例窥探RocketMQ事务消息实现基本思想。 RocketMQ4.3.0版本开始支持事务消息,后续分享将开始将剖析事务消息的实现原理。首先从官方给出的Demo实例入手,以...

阿里云云栖社区
01/08
0
0
RocketMQ(二):RPC通讯

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
2018/10/29
0
0
RocketMQ(八):消息发送

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
2018/10/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

JDK 12又来了,我学不动了...

写在前面 看到 JDK 12又发布了,萌新不知不觉感觉瑟瑟发抖,从 Java 1.8的函数式编程思维和范式 到 Java 1.9的模块化特性的加持 以及还没来得及深切感受一下 Java 1.11 的 ZGC强大之后,这次...

CodeSheep
11分钟前
0
0
解决在gradle构建project时,发生peer not authenticated错误的方法

问题: FAILURE: Build failed with an exception.* What went wrong:Could not resolve all dependencies for configuration ':wl01-service:compile'.> Could not resolve com.ali......

Benz001
20分钟前
1
0
Apache Ignite上的TensorFlow

任何深度学习都是从数据开始的,这是关键点。没有数据,就无法训练模型,也无法评估模型质量,更无法做出预测,因此,数据源非常重要。在做研究、构建新的神经网络架构、以及做实验时,会习惯...

李玉珏
45分钟前
6
0
Java记录日志附带请求标识

起因 系统是web接口服务,排查故障的时候经常需要记录某次请求调用链路日志。这样我们拉日志的时候只要匹配这个traceid就行了 第一版解决方案 原来我们一直用了个很low的办法,在请求开始的时...

Tree
49分钟前
2
0
使用split_size优化的ODPS SQL的场景

使用split_size优化的ODPS SQL的场景 首先有两个大背景需要说明如下: 说明1:split_size,设定一个map的最大数据输入量,单位M,默认256M。用户可以通过控制这个变量,从而达到对map端输入的...

阿里云官方博客
54分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部