文档章节

rocketmq之源码分析netty实现源码(六)

wangshuaixin
 wangshuaixin
发布于 05/29 17:21
字数 1873
阅读 26
收藏 0

netty的服务端核心属性

public class NettyRemotingServer extends NettyRemotingAbstract implements RemotingServer {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);
    //netty的启动核心入口
    private final ServerBootstrap serverBootstrap;
    //netty的入口连接池
    private final EventLoopGroup eventLoopGroupSelector;
    //netty的handler的连接池
    private final EventLoopGroup eventLoopGroupBoss;
    private final NettyServerConfig nettyServerConfig;

    //操作处理的线程池
    private final ExecutorService publicExecutor;
    //处理监听的回调服务
    private final ChannelEventListener channelEventListener;

    private final Timer timer = new Timer("ServerHouseKeepingService", true);
    private DefaultEventExecutorGroup defaultEventExecutorGroup;

netty服务端的核心构造

public NettyRemotingServer(final NettyServerConfig nettyServerConfig,
    final ChannelEventListener channelEventListener) {
    super(nettyServerConfig.getServerOnewaySemaphoreValue(), nettyServerConfig.getServerAsyncSemaphoreValue());
    //初始化netty的核心框架
    this.serverBootstrap = new ServerBootstrap();
    this.nettyServerConfig = nettyServerConfig;
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyServerConfig.getServerCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }

    //共享线程池的初始化
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyServerPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    //获得系统平台是否支持epoll的高性能io操作
    if (useEpoll()) {
        this.eventLoopGroupBoss = new EpollEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyEPOLLBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        this.eventLoopGroupSelector = new EpollEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerEPOLLSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    } else {
        this.eventLoopGroupBoss = new NioEventLoopGroup(1, new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyNIOBoss_%d", this.threadIndex.incrementAndGet()));
            }
        });

        this.eventLoopGroupSelector = new NioEventLoopGroup(nettyServerConfig.getServerSelectorThreads(), new ThreadFactory() {
            private AtomicInteger threadIndex = new AtomicInteger(0);
            private int threadTotal = nettyServerConfig.getServerSelectorThreads();

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, String.format("NettyServerNIOSelector_%d_%d", threadTotal, this.threadIndex.incrementAndGet()));
            }
        });
    }

    loadSslContext();
}

netty的服务端核心启动

public void start() {
    //默认的事件处理进程组初始化
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyServerConfig.getServerWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyServerCodecThread_" + this.threadIndex.incrementAndGet());
            }
        });

    //netty的服务端启动配置
    ServerBootstrap childHandler =
            //netty中服务端的标准设置,一个boss,多个worker
        this.serverBootstrap.group(this.eventLoopGroupBoss, this.eventLoopGroupSelector)
            //采用的nio模型
            .channel(useEpoll() ? EpollServerSocketChannel.class : NioServerSocketChannel.class)
            //socket参数,服务端接受链接的队列长度,如果队列已满则拒绝,默认值较小 win200 ,它128
            .option(ChannelOption.SO_BACKLOG, 1024)
            //socket参数,地址复用,默认false,快速启动的操作更优
            .option(ChannelOption.SO_REUSEADDR, true)
            //socket参数,连接保持,默认false,tcp会主动探测连接的有效性,可理解为心跳
            .option(ChannelOption.SO_KEEPALIVE, false)
            //tcp参数,立即发送数据,netty默认true,系统默认false,如果选择带宽的性能可以设置为false,一次发送多个数据款
            .childOption(ChannelOption.TCP_NODELAY, true)

            //socket参数,tcp数据发送缓冲区
            .childOption(ChannelOption.SO_SNDBUF, nettyServerConfig.getServerSocketSndBufSize())
            //socket参数,tcp数据接收缓冲区
            .childOption(ChannelOption.SO_RCVBUF, nettyServerConfig.getServerSocketRcvBufSize())

            //绑定当前服务器及端口
            .localAddress(new InetSocketAddress(this.nettyServerConfig.getListenPort()))
            //设置handler
            .childHandler(new ChannelInitializer<SocketChannel>() {
                @Override
                public void initChannel(SocketChannel ch) throws Exception {
                    ch.pipeline()
                        //
                        .addLast(defaultEventExecutorGroup, HANDSHAKE_HANDLER_NAME,
                            new HandshakeHandler(TlsSystemConfig.tlsMode))
                        //增加业务需要的handler
                        .addLast(defaultEventExecutorGroup,
                            //编码
                            new NettyEncoder(),
                            //解码
                            new NettyDecoder(),
                            //idle
                            new IdleStateHandler(0, 0, nettyServerConfig.getServerChannelMaxIdleTimeSeconds()),
                            //连接管理
                            new NettyConnectManageHandler(),
                            //业务核心处理
                            new NettyServerHandler()
                        );
                }
            });

    //buf的分配器设置
    if (nettyServerConfig.isServerPooledByteBufAllocatorEnable()) {
        childHandler.childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT);
    }

    //启动当前的服务
    try {
        ChannelFuture sync = this.serverBootstrap.bind().sync();
        InetSocketAddress addr = (InetSocketAddress) sync.channel().localAddress();
        this.port = addr.getPort();
    } catch (InterruptedException e1) {
        throw new RuntimeException("this.serverBootstrap.bind().sync() InterruptedException", e1);
    }

    //namesrv端的事件回调处理机制,会调用channelEventListener的事件处理
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }

    //执行响应数据的处理
    this.timer.scheduleAtFixedRate(new TimerTask() {

        @Override
        public void run() {
            try {
                NettyRemotingServer.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);
}

netty服务端核心业务逻辑设计

class NettyServerHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    //读取网络请求
    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}
public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                //处理请求类的操作
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                //处理响应类的操作
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}
public void processRequestCommand(final ChannelHandlerContext ctx, final RemotingCommand cmd) {
    //获得对应的事件处理对象
    final Pair<NettyRequestProcessor, ExecutorService> matched = this.processorTable.get(cmd.getCode());
    //健全的配置事件处理
    final Pair<NettyRequestProcessor, ExecutorService> pair = null == matched ? this.defaultRequestProcessor : matched;
    //消息唯一标示
    final int opaque = cmd.getOpaque();

    if (pair != null) {
        //构建异步线程操作
        Runnable run = new Runnable() {
            @Override
            public void run() {
                try {
                    //前置处理
                    doBeforeRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd);
                    //执行对应的事件处理
                    final RemotingCommand response = pair.getObject1().processRequest(ctx, cmd);
                    //后置处理
                    doAfterRpcHooks(RemotingHelper.parseChannelRemoteAddr(ctx.channel()), cmd, response);

                    //如果不是单发需要返回对应的请求唯一标识和数据
                    if (!cmd.isOnewayRPC()) {
                        if (response != null) {
                            response.setOpaque(opaque);
                            response.markResponseType();
                            try {
                                //将结果数据会写给netty的channel
                                ctx.writeAndFlush(response);
                            } catch (Throwable e) {
                                log.error("process request over, but response failed", e);
                                log.error(cmd.toString());
                                log.error(response.toString());
                            }
                        } else {

                        }
                    }
                } catch (Throwable e) {
                    log.error("process request exception", e);
                    log.error(cmd.toString());

                    if (!cmd.isOnewayRPC()) {
                        final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_ERROR,
                            RemotingHelper.exceptionSimpleDesc(e));
                        response.setOpaque(opaque);
                        ctx.writeAndFlush(response);
                    }
                }
            }
        };

        //验证请求处理类型
        if (pair.getObject1().rejectRequest()) {
            final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                "[REJECTREQUEST]system busy, start flow control for a while");
            response.setOpaque(opaque);
            ctx.writeAndFlush(response);
            return;
        }

        //封装异步处理线程,同时根据配置的事件线程池执行事件异步线程,
        try {
            final RequestTask requestTask = new RequestTask(run, ctx.channel(), cmd);
            pair.getObject2().submit(requestTask);
        } catch (RejectedExecutionException e) {
            if ((System.currentTimeMillis() % 10000) == 0) {
                log.warn(RemotingHelper.parseChannelRemoteAddr(ctx.channel())
                    + ", too many requests and system thread pool busy, RejectedExecutionException "
                    + pair.getObject2().toString()
                    + " request code: " + cmd.getCode());
            }

            if (!cmd.isOnewayRPC()) {
                final RemotingCommand response = RemotingCommand.createResponseCommand(RemotingSysResponseCode.SYSTEM_BUSY,
                    "[OVERLOAD]system busy, start flow control for a while");
                response.setOpaque(opaque);
                ctx.writeAndFlush(response);
            }
        }
    } else {
        String error = " request type " + cmd.getCode() + " not supported";
        final RemotingCommand response =
            RemotingCommand.createResponseCommand(RemotingSysResponseCode.REQUEST_CODE_NOT_SUPPORTED, error);
        response.setOpaque(opaque);
        ctx.writeAndFlush(response);
        log.error(RemotingHelper.parseChannelRemoteAddr(ctx.channel()) + error);
    }
}

===========================================================================

netty客户端的核心属性

public class NettyRemotingClient extends NettyRemotingAbstract implements RemotingClient {
    private static final InternalLogger log = InternalLoggerFactory.getLogger(RemotingHelper.ROCKETMQ_REMOTING);

    private static final long LOCK_TIMEOUT_MILLIS = 3000;

    //netty的client配置
    private final NettyClientConfig nettyClientConfig;
    //netty的启动核心入口
    private final Bootstrap bootstrap = new Bootstrap();

    //netty的启动线程池事件集合
    private final EventLoopGroup eventLoopGroupWorker;
    private final Lock lockChannelTables = new ReentrantLock();

    //channel的本地内存数据接口
    private final ConcurrentMap<String /* addr */, ChannelWrapper> channelTables = new ConcurrentHashMap<String, ChannelWrapper>();

    private final Timer timer = new Timer("ClientHouseKeepingService", true);

    private final AtomicReference<List<String>> namesrvAddrList = new AtomicReference<List<String>>();
    private final AtomicReference<String> namesrvAddrChoosed = new AtomicReference<String>();
    private final AtomicInteger namesrvIndex = new AtomicInteger(initValueIndex());
    private final Lock lockNamesrvChannel = new ReentrantLock();

    private final ExecutorService publicExecutor;

    /**
     * Invoke the callback methods in this executor when process response.
     */
    private ExecutorService callbackExecutor;
    private final ChannelEventListener channelEventListener;
    private DefaultEventExecutorGroup defaultEventExecutorGroup;

netty客户端的初始化构造

//客户端的通信实现
public NettyRemotingClient(final NettyClientConfig nettyClientConfig,
    final ChannelEventListener channelEventListener) {
    super(nettyClientConfig.getClientOnewaySemaphoreValue(), nettyClientConfig.getClientAsyncSemaphoreValue());
    this.nettyClientConfig = nettyClientConfig;
    this.channelEventListener = channelEventListener;

    int publicThreadNums = nettyClientConfig.getClientCallbackExecutorThreads();
    if (publicThreadNums <= 0) {
        publicThreadNums = 4;
    }

    //共享线程池配置
    this.publicExecutor = Executors.newFixedThreadPool(publicThreadNums, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, "NettyClientPublicExecutor_" + this.threadIndex.incrementAndGet());
        }
    });

    //netty的客户端处理eventgroup配置
    this.eventLoopGroupWorker = new NioEventLoopGroup(1, new ThreadFactory() {
        private AtomicInteger threadIndex = new AtomicInteger(0);

        @Override
        public Thread newThread(Runnable r) {
            return new Thread(r, String.format("NettyClientSelector_%d", this.threadIndex.incrementAndGet()));
        }
    });

    //验证是否加载ssl
    if (nettyClientConfig.isUseTLS()) {
        try {
            sslContext = TlsHelper.buildSslContext(true);
            log.info("SSL enabled for client");
        } catch (IOException e) {
            log.error("Failed to create SSLContext", e);
        } catch (CertificateException e) {
            log.error("Failed to create SSLContext", e);
            throw new RuntimeException("Failed to create SSLContext", e);
        }
    }
}

netty客户端的启动

public void start() {
    //构造个性的线程池执行分组,定制线程名称格式
    this.defaultEventExecutorGroup = new DefaultEventExecutorGroup(
        nettyClientConfig.getClientWorkerThreads(),
        new ThreadFactory() {

            private AtomicInteger threadIndex = new AtomicInteger(0);

            @Override
            public Thread newThread(Runnable r) {
                return new Thread(r, "NettyClientWorkerThread_" + this.threadIndex.incrementAndGet());
            }
        });

    //netty的客户端通信关键,可以参照netty的option配置详细
    Bootstrap handler = this.bootstrap.group(this.eventLoopGroupWorker).channel(NioSocketChannel.class)
        .option(ChannelOption.TCP_NODELAY, true)
        .option(ChannelOption.SO_KEEPALIVE, false)
        .option(ChannelOption.CONNECT_TIMEOUT_MILLIS, nettyClientConfig.getConnectTimeoutMillis())
        .option(ChannelOption.SO_SNDBUF, nettyClientConfig.getClientSocketSndBufSize())
        .option(ChannelOption.SO_RCVBUF, nettyClientConfig.getClientSocketRcvBufSize())
        .handler(new ChannelInitializer<SocketChannel>() {
            @Override
            public void initChannel(SocketChannel ch) throws Exception {
                ChannelPipeline pipeline = ch.pipeline();
                if (nettyClientConfig.isUseTLS()) {
                    if (null != sslContext) {
                        pipeline.addFirst(defaultEventExecutorGroup, "sslHandler", sslContext.newHandler(ch.alloc()));
                        log.info("Prepend SSL handler");
                    } else {
                        log.warn("Connections are insecure as SSLContext is null!");
                    }
                }
                pipeline.addLast(
                    defaultEventExecutorGroup,
                    //编码
                    new NettyEncoder(),
                    //解码
                    new NettyDecoder(),
                    //idle
                    new IdleStateHandler(0, 0, nettyClientConfig.getClientChannelMaxIdleTimeSeconds()),
                    //链接管理
                    new NettyConnectManageHandler(),
                    //核心请求处理
                    new NettyClientHandler());
            }
        });

    //执行扫描响应结果,每秒
    this.timer.scheduleAtFixedRate(new TimerTask() {
        @Override
        public void run() {
            try {
                NettyRemotingClient.this.scanResponseTable();
            } catch (Throwable e) {
                log.error("scanResponseTable exception", e);
            }
        }
    }, 1000 * 3, 1000);

    //事件处理机制
    if (this.channelEventListener != null) {
        this.nettyEventExecutor.start();
    }
}
//netty客户端的核心实现
class NettyClientHandler extends SimpleChannelInboundHandler<RemotingCommand> {

    @Override
    protected void channelRead0(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
        processMessageReceived(ctx, msg);
    }
}

服务端和客户端共享的代码设计

public void processMessageReceived(ChannelHandlerContext ctx, RemotingCommand msg) throws Exception {
    final RemotingCommand cmd = msg;
    if (cmd != null) {
        switch (cmd.getType()) {
            case REQUEST_COMMAND:
                //处理请求类的操作
                processRequestCommand(ctx, cmd);
                break;
            case RESPONSE_COMMAND:
                //处理响应类的操作
                processResponseCommand(ctx, cmd);
                break;
            default:
                break;
        }
    }
}

netty客户端处理服务端的响应数据

public void processResponseCommand(ChannelHandlerContext ctx, RemotingCommand cmd) {
    final int opaque = cmd.getOpaque();
    final ResponseFuture responseFuture = responseTable.get(opaque);
    if (responseFuture != null) {
        responseFuture.setResponseCommand(cmd);

        responseTable.remove(opaque);

        if (responseFuture.getInvokeCallback() != null) {
            executeInvokeCallback(responseFuture);
        } else {
            responseFuture.putResponse(cmd);
            responseFuture.release();
        }
    } else {
        log.warn("receive response, but not matched any request, " + RemotingHelper.parseChannelRemoteAddr(ctx.channel()));
        log.warn(cmd.toString());
    }
}

=============================================================================

这里只讲netty的通信设计,后面的章节会根据功能详细的分析实现不同功能的技术设计和技术实现

© 著作权归作者所有

wangshuaixin

wangshuaixin

粉丝 20
博文 52
码字总数 61196
作品 1
海淀
架构师
私信 提问
消息中间件—RocketMQ的RPC通信(一)

文章摘要:借用小厮的一句话“消息队列的本质在于消息的发送、存储和接收”。那么,对于一款消息队列来说,如何做到消息的高效发送与接收是重点和关键 一、RocketMQ中Remoting通信模块概览 ...

癫狂侠
2018/06/30
0
0
RocketMQ部分数据消费不了问题排查

问题现象 今天忽然收到RocketMQ预警信息如下: 提醒有部分数据没有消费,产生堆积情况。 打开RocketMq-Console-Ng查看如下图形式: 备注:第一反应是Consumer Group内订阅了多个topic?(为什...

匠心零度
2018/10/29
0
0
RocketMQ(二):RPC通讯

匠心零度 转载请注明原创出处,谢谢! RocketMQ网络部署图 NameServer:在系统中是做命名服务,更新和发现 broker服务。 Broker-Master:broker 消息主机服务器。 Broker-Slave: broker 消息...

匠心零度
2018/10/29
0
0
消息中间件 RocketMQ 源码解析 —— 调试环境搭建

摘要: 原创出处 www.iocoder.cn/RocketMQ/bu… 「芋道源码」欢迎转载,保留摘要,谢谢! 0. 友情提示 1. 依赖工具 2. 源码拉取 3. 启动 RocketMQ Namesrv 4. 启动 RocketMQ Broker 5. 启动 ...

芋道源码_以德服人_不服就干
01/31
0
0
消息中间件—RocketMQ的RPC通信(二)

文章摘要:如何设计RPC通信层模型是任何一款性能强劲的MQ所要重点考虑的问题 在(一)篇中主要介绍了RocketMQ的协议格式,消息编解码,通信方式(同步/异步/单向)、消息发送/接收以及异步回调...

癫狂侠
2018/07/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

带你了解 Java内存模型

Java内存模型的规定: 1、所有变量存储在主内存中; 2、每个线程都有自己的工作内存,且对变量的操作都是在工作内存中进行; 3、不同线程之间无法直接访问彼此工作内存中的变量,要想访问只能...

linux-tao
26分钟前
4
0
.net c# datetime转string 时间转字符串

.net c# datetime转string 时间转字符串 .net c# datetime转string 时间转字符串 刚开始接触net 时间转换字符串 一搜索出来的全是 字符串转时间,要么就是系统当前时间转字符串 就没有一个指...

青峰Jun19er
27分钟前
4
0
hbase demo

HbaseDao public class HbaseDao {@Testpublic void insertTest() throws Exception {Configuration conf = HBaseConfiguration.create();conf.set("hbase.zookeeper.qu......

Garphy
37分钟前
3
0
IT兄弟连 HTML5教程 HTML5表单 多样的输入类型2

4 range range类型用于包含一定范围内数字值的输入域,跟number一样,我们还可以对数值设置限定,range类型显示为滑动条用法如下: 上述代码使用了range类型输入框,为该类型设置了数值范围为...

老码农的一亩三分地
37分钟前
3
0
对比不同的数据库连接的异同

博主在学习和使用数据库连接时,遇到的问题, 这个几个数据库连接究竟有什么不同? 到底什么时候该使用哪个会更好一点? 带着这个问题我们先去了解常见的数据库连接 1. 常见的数据库连接有哪些?...

理性思考
39分钟前
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部