文档章节

聊聊rocketmq的NettyClientConfig

go4it
 go4it
发布于 2018/08/04 22:52
字数 829
阅读 369
收藏 1

3 月,跳不动了?>>>

本文主要研究一下rocketmq的NettyClientConfig

NettyClientConfig

org/apache/rocketmq/remoting/netty/NettyClientConfig.java

public class NettyClientConfig {
    /**
     * Worker thread number
     */
    private int clientWorkerThreads = 4;
    private int clientCallbackExecutorThreads = Runtime.getRuntime().availableProcessors();
    private int clientOnewaySemaphoreValue = NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE;
    private int clientAsyncSemaphoreValue = NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE;
    private int connectTimeoutMillis = 3000;
    private long channelNotActiveInterval = 1000 * 60;

    /**
     * IdleStateEvent will be triggered when neither read nor write was performed for
     * the specified period of this time. Specify {@code 0} to disable
     */
    private int clientChannelMaxIdleTimeSeconds = 120;

    private int clientSocketSndBufSize = NettySystemConfig.socketSndbufSize;
    private int clientSocketRcvBufSize = NettySystemConfig.socketRcvbufSize;
    private boolean clientPooledByteBufAllocatorEnable = false;
    private boolean clientCloseSocketIfTimeout = false;

    private boolean useTLS;

    //......
}

这里主要有几个参数:

  • clientWorkerThreads,默认为4
  • clientCallbackExecutorThreads,默认为cpu核数
  • clientOnewaySemaphoreValue,默认为NettySystemConfig.CLIENT_ONEWAY_SEMAPHORE_VALUE,如果没有指定,则默认为65535
  • clientAsyncSemaphoreValue,默认为NettySystemConfig.CLIENT_ASYNC_SEMAPHORE_VALUE,如果没有指定,则默认为65535
  • connectTimeoutMillis,默认为3秒
  • channelNotActiveInterval,默认为60秒
  • clientChannelMaxIdleTimeSeconds,默认为为120秒
  • clientSocketSndBufSize,取NettySystemConfig.socketSndbufSize,如果没有指定,默认为65535
  • clientSocketRcvBufSize,取NettySystemConfig.socketRcvbufSize,如果没有指定,默认为65535
  • clientPooledByteBufAllocatorEnable,默认为false
  • clientCloseSocketIfTimeout,默认为false
  • useTLS,默认为false

NettySystemConfig

org/apache/rocketmq/remoting/netty/NettySystemConfig.java

public class NettySystemConfig {
    public static final String COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE =
        "com.rocketmq.remoting.nettyPooledByteBufAllocatorEnable";
    public static final String COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE =
        "com.rocketmq.remoting.socket.sndbuf.size";
    public static final String COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE =
        "com.rocketmq.remoting.socket.rcvbuf.size";
    public static final String COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE =
        "com.rocketmq.remoting.clientAsyncSemaphoreValue";
    public static final String COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE =
        "com.rocketmq.remoting.clientOnewaySemaphoreValue";

    public static final boolean NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE = //
        Boolean.parseBoolean(System.getProperty(COM_ROCKETMQ_REMOTING_NETTY_POOLED_BYTE_BUF_ALLOCATOR_ENABLE, "false"));
    public static final int CLIENT_ASYNC_SEMAPHORE_VALUE = //
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ASYNC_SEMAPHORE_VALUE, "65535"));
    public static final int CLIENT_ONEWAY_SEMAPHORE_VALUE =
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_CLIENT_ONEWAY_SEMAPHORE_VALUE, "65535"));
    public static int socketSndbufSize =
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_SNDBUF_SIZE, "65535"));
    public static int socketRcvbufSize =
        Integer.parseInt(System.getProperty(COM_ROCKETMQ_REMOTING_SOCKET_RCVBUF_SIZE, "65535"));
}
  • 采取先从系统变量读取,没有则返回默认值

NettyRemotingClient

org/apache/rocketmq/remoting/netty/NettyRemotingClient.java

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
    private static final Logger log = LoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    private static final long LOCK_TIMEOUT_MILLIS = 3000;

    private final NettyClientConfig nettyClientConfig;
    private final Bootstrap bootstrap = new Bootstrap();
    private final EventLoopGroup eventLoopGroupWorker;
    private final Lock lockChannelTables = new ReentrantLock();
    private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();

    private final Timer timer = new Timer("ClientHouseKeepingService", true);

    private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
    private final Lock lockNamesrvChannel = new ReentrantLock();

    private final ExecutorService publicExecutor;

    /**
     * Invoke the callback methods in this executor when process response.
     */
    private ExecutorService callbackExecutor;
    private final ChannelEventListener channelEventListener;
    private DefaultEventExecutorGroup defaultEventExecutorGroup;
    private RPCHook rpcHook;

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig) {
        this(nettyClientConfig, null);
    }

    public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
        final ChannelEventListener channelEventListener) {
        super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
        this.nettyClientConfig = nettyClientConfig;
        this.channelEventListener = channelEventListener;

        int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
        if (publicThreadNums <= 0) {
            publicThreadNums = 4;
        }

        this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
            }
        });

        this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
            }
        });

        if (nettyClientConfig.isUseTLS()) {
            try {
                sslContext = TlsHelper.buildSslContext(true);
                log.info("SSL enabled for client");
            } catch (IOException e) {
                log.error("Failed to create SSLContext", e);
            } catch (CertificateException e) {
                log.error("Failed to create SSLContext", e);
                throw new RuntimeException("Failed to create SSLContext", e);
            }
        }
    }

    private static int initValueIndex() {
        Random r = new Random();

        return Math.abs(r.nextInt() % 999) % 999;
    }

    @Override
    public void start() {
        this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
            nettyClientConfig.getClientWorkerThreads(),
            new ThreadFactory() {

                private AtomicInteger threadIndex = new AtomicInteger(0);

                @Override
                public Thread newThread(Runnable r) {
                    return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
                }
            });

        Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
            .option(ChannelOption.TCP_NODELAY, true)
            .option(ChannelOption.SO_KEEPALIVE, false)
            .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
            .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
            .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
            .handler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ChannelPipeline pipeline = ch.pipeline();
                    if (nettyClientConfig.isUseTLS()) {
                        if (null != sslContext) {
                            pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                            log.info("Prepend SSL handler");
                        } else {
                            log.warn("Connections are insecure as SSLContext is null!");
                        }
                    }
                    pipeline.addLast(
                        defaultEventExecutorGroup,
                        new NettyEncoder(),
                        new NettyDecoder(),
                        new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                        new NettyConnectManageHandler(),
                        new NettyClientHandler());
                }
            });

        this.timer.scheduleAtFixedRate(new TimerTask() {
            @Override
            public void run() {
                try {
                    NettyRemotingClient.this.scanResponseTable();
                } catch (Throwable e) {
                    log.error("scanResponseTable exception", e);
                }
            }
        }, 1000 * 3, 1000);

        if (this.channelEventListener != null) {
            this.nettyEventExecutor.start();
        }
    }

    //......
}
  • NettyRemotingClient使用了NettyClientConfig的相关配置来初始化NettyClient
  • ChannelOption.CONNECT_TIMEOUT_MILLIS使用的是nettyClientConfig.getConnectTimeoutMillis()
  • ChannelOption.SO_SNDBUF使用的是nettyClientConfig.getClientSocketSndBufSize()
  • ChannelOption.SO_RCVBUF使用的是nettyClientConfig.getClientSocketRcvBufSize()
  • IdleStateHandler的allIdleTimeSeconds参数使用的是nettyClientConfig.getClientChannelMaxIdleTimeSeconds()

小结

rocketmq的NettyClientConfig指定了NettyRemotingClient相关参数,这里除了Netty相关的,还指定了两个信号量,一个是clientOnewaySemaphoreValue,一个是clientAsyncSemaphoreValue。clientOnewaySemaphoreValue指定的是invokeOnewayImpl方法的调用控制,clientAsyncSemaphoreValue指定的是invokeAsyncImpl方法的调用频率。

doc

© 著作权归作者所有

go4it
粉丝 93
博文 1287
码字总数 1203528
作品 0
深圳
私信 提问
加载中

评论(0)

聊聊rocketmq的RemotingException

序 本文主要研究一下rocketmq的RemotingException RemotingException org/apache/rocketmq/remoting/exception/RemotingException.java 继承自checked exception,底下有RemotingCommandExce......

go4it
2018/08/08
530
0
聊聊rocketmq的NettyEncoder及NettyDecoder

序 本文主要研究一下rocketmq的NettyEncoder及NettyDecoder NettyEncoder org/apache/rocketmq/remoting/netty/NettyEncoder.java 这里继承MessageToByteEncoder,类型是RemotingCommand,先......

go4it
2018/08/07
42
0
聊聊rocketmq的ProducerImpl

序 本文主要研究一下rocketmq的ProducerImpl ProducerImpl io/openmessaging/rocketmq/producer/ProducerImpl.java 发送消息的方法主要是代理给rocketmqProducer 另外调用OMSUtil.msgConver......

go4it
2018/07/28
44
0
聊聊rocketmq的SequenceProducerImpl

序 本文主要研究一下rocketmq的SequenceProducerImpl SequenceProducerImpl io/openmessaging/rocketmq/producer/SequenceProducerImpl.java 采用的是LinkedBlockingQueue,send方法实际调用......

go4it
2018/07/30
23
0
聊聊rocketmq的PullConsumerImpl

序 本文主要研究一下rocketmq的PullConsumerImpl PullConsumerImpl io/openmessaging/rocketmq/consumer/PullConsumerImpl.java 这里poll方法从LocalMessageCache里头拉取消息 LocalMessage......

go4it
2018/07/31
39
0

没有更多内容

加载失败,请刷新页面

加载更多

如何使div的浏览器窗口高度为100% - How to make a div 100% height of the browser window

问题: I have a layout with two columns - a left div and a right div . 我有两列的布局-左div和右div 。 The right div has a grey background-color , and I need it to expand vertic......

技术盛宴
39分钟前
9
0
SSM框架整合

mybatis逆向工程 mybatis-generator生成pojo、mapper接口及映射文件 mapper放到e3-manager-dao层中 导入sql到数据库中; 导入逆向工程工具,配置xml文件 运行main方法 重复运行main不会覆盖!...

七宝1
今天
30
0
OSChina 周日乱弹 —— 和网友的第一次开房经历

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 《Ljósið》- Ólafur Arnalds 手机党少年们想听歌,请使劲儿戳(这里) @xiaos...

小小编辑
今天
48
0
程序员职业生涯指引

程序员应该尽早规划自己的职业生涯 为什么写 众所周知 IT 这一行到了一定的年龄、大部分人都或多或少有危机感,特别是今年全国乃至全球发生的疫情、导致整体经济受到很大的影响、这次的疫情影...

科比可比克
今天
11
0
JVM调优实战分析

一、查看服务器项目JVM参数以及参数分析 1、jps 命令 : 列出系统中所有的 Java 应用程序以及PID 如下图所示,26647就是我部署在服务器的一个小项目的 PID 2、jmap命令:查看堆的使用情况 如...

IT-Mamba
今天
42
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部