文档章节

聊聊rocketmq的AccessChannel

go4it
 go4it
发布于 11/12 23:55
字数 434
阅读 23
收藏 0

【推荐】2019 Java 开发者跳槽指南.pdf(吐血整理) >>>

本文主要研究一下rocketmq的AccessChannel

AccessChannel

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/AccessChannel.java

public enum AccessChannel {
    /**
     * Means connect to private IDC cluster.
     */
    LOCAL,

    /**
     * Means connect to Cloud service.
     */
    CLOUD,
}
  • AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD

TraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/TraceDispatcher.java

public interface TraceDispatcher {

    /**
     * Initialize asynchronous transfer data module
     */
    void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException;

    /**
     * Append the transfering data
     * @param ctx data infomation
     * @return
     */
    boolean append(Object ctx);

    /**
     * Write flush action
     *
     * @throws IOException
     */
    void flush() throws IOException;

    /**
     * Close the trace Hook
     */
    void shutdown();
}
  • TraceDispatcher的start方法会接收AccessChannel类型的参数

AsyncTraceDispatcher

rocketmq-client-4.5.2-sources.jar!/org/apache/rocketmq/client/trace/AsyncTraceDispatcher.java

public class AsyncTraceDispatcher implements TraceDispatcher {

    private final static InternalLogger log = ClientLogger.getLog();
    private final int queueSize;
    private final int batchSize;
    private final int maxMsgSize;
    private final DefaultMQProducer traceProducer;
    private final ThreadPoolExecutor traceExecutor;
    // The last discard number of log
    private AtomicLong discardCount;
    private Thread worker;
    private ArrayBlockingQueue<TraceContext> traceContextQueue;
    private ArrayBlockingQueue<Runnable> appenderQueue;
    private volatile Thread shutDownHook;
    private volatile boolean stopped = false;
    private DefaultMQProducerImpl hostProducer;
    private DefaultMQPushConsumerImpl hostConsumer;
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex();
    private String dispatcherId = UUID.randomUUID().toString();
    private String traceTopicName;
    private AtomicBoolean isStarted = new AtomicBoolean(false);
    private AccessChannel accessChannel = AccessChannel.LOCAL;

    //......

    public void start(String nameSrvAddr, AccessChannel accessChannel) throws MQClientException {
        if (isStarted.compareAndSet(false, true)) {
            traceProducer.setNamesrvAddr(nameSrvAddr);
            traceProducer.setInstanceName(TRACE_INSTANCE_NAME + "_" + nameSrvAddr);
            traceProducer.start();
        }
        this.accessChannel = accessChannel;
        this.worker = new Thread(new AsyncRunnable(), "MQ-AsyncTraceDispatcher-Thread-" + dispatcherId);
        this.worker.setDaemon(true);
        this.worker.start();
        this.registerShutDownHook();
    }

    //......

    class AsyncAppenderRequest implements Runnable {
        List<TraceContext> contextList;

        public AsyncAppenderRequest(final List<TraceContext> contextList) {
            if (contextList != null) {
                this.contextList = contextList;
            } else {
                this.contextList = new ArrayList<TraceContext>(1);
            }
        }

        private void sendTraceDataByMQ(Set<String> keySet, final String data, String dataTopic, String regionId) {
            String traceTopic = traceTopicName;
            if (AccessChannel.CLOUD == accessChannel) {
                traceTopic = TraceConstants.TRACE_TOPIC_PREFIX + regionId;
            }
            final Message message = new Message(traceTopic, data.getBytes());
            // Keyset of message trace includes msgId of or original message
            message.setKeys(keySet);

            //......
        }

        //......
    }

    //......
}
  • AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic

小结

AccessChannel定义了两个枚举值,分别是LOCAL及CLOUD;TraceDispatcher的start方法会接收AccessChannel类型的参数;AsyncTraceDispatcher内部类AsyncAppenderRequest的sendTraceDataByMQ方法,针对accessChannel为AccessChannel.CLOUD类型的,会给TraceConstants.TRACE_TOPIC_PREFIX加上regionId作为traceTopic

doc

© 著作权归作者所有

go4it
粉丝 89
博文 1179
码字总数 1103596
作品 0
深圳
私信 提问
聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
288
0
聊聊rocketmq的NettyEncoder及NettyDecoder

序 本文主要研究一下rocketmq的NettyEncoder及NettyDecoder NettyEncoder org/apache/rocketmq/remoting/netty/NettyEncoder.java 这里继承MessageToByteEncoder,类型是RemotingCommand,先......

go4it
2018/08/07
25
0
聊聊rocketmq的ProducerImpl

序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java 发送消息的方法主要是代理给rocketmqProducer 另外调用OMSUtil.msgConver......

go4it
2018/07/28
33
0
聊聊rocketmq的NettyClientConfig

序 本文主要研究一下rocketmq的NettyClientConfig NettyClientConfig org/apache/rocketmq/remoting/netty/NettyClientConfig.java 这里主要有几个参数: clientWorkerThreads,默认为4 cli......

go4it
2018/08/04
192
0
聊聊rocketmq的SequenceProducerImpl

序 本文主要研究一下rocketmq的SequenceProducerImpl SequenceProducerImpl io/openmessaging/rocketmq/producer/SequenceProducerImpl.java 采用的是LinkedBlockingQueue,send方法实际调用......

go4it
2018/07/30
15
0

没有更多内容

加载失败,请刷新页面

加载更多

阿里巴巴的 Kubernetes 应用管理实践经验与教训

作者 | 孙健波(天元) 阿里巴巴技术专家 导读:本文整理自孙健波在 ArchSummit 大会 2019 北京站演讲稿记录。首先介绍了阿里巴巴基于 Kubernetes 项目进行大规模应用实践过程中遇到的问题;...

阿里巴巴云原生
16分钟前
3
0
pinpoint采样原理分析

使用pinpoint进行全链路监控时,支持对请求的采样,某条请求是否被采样,取决于整个链路开始的机器。该机器使用特定的采样算法。采样的标志会一直在链路中透传。比如在http里面,会在header里...

xiaomin0322
20分钟前
3
0
在IDEA开发工具中使用lombok

1. 首先我们需要安装IntelliJ IDEA中的lombok插件,打开IntelliJ IDEA后点击菜单栏中的File-->Settings,或者使用快捷键Ctrl+Alt+S进入到设置页面 我们点击设置中的Plugins进行插件的安装,在...

欧阳飘
22分钟前
3
0
爱码仕 5G生活畅想 (五) 每个人每个家庭都有一朵私有的云

30年前,微软让每个家庭都有一台电脑的理念成为了现实;而今云计算的观念已为老百姓们所熟识。数据就是能源;数据就是财富;谁生产了数据,这数据的所有权就归谁所有。随着原生云基础设施的完...

LitStone
23分钟前
3
0
嵌入式入门:嵌入式领域的职业发展方向是什么?

嵌入式入门:嵌入式领域的职业发展方向是什么? 在如今的IT市场上看,嵌入式的发展的应用都是广受欢迎的,在嵌入式入门学习中,我们可以发现嵌入式的发展方向有很多,门槛高低不一样。下面就...

xyd118
24分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部