文档章节

源码分析Dubbo网络通信篇NettyServer、HeaderExchangeServer

中间件兴趣圈
 中间件兴趣圈
发布于 03/22 22:01
字数 1450
阅读 1.4K
收藏 2

本文主要分析一下NettyServer,HeaderExchangeServer实现细节。

1、NettyServer

NettyServer整个类图如下: 这里写图片描述 首先从全貌上大概看一下NettyServer对象所持有的属性:

  • AbstractPeer
    1. private final ChannelHandler handler 事件处理Handler。
    2. private volatile URL url 该协议的第一个服务提供者的URL, Server只需要用到 URL中的参数,与具体某一个服务没什么关系。
  • AbstractEndpoint
    1. private Codec2 codec 编码解码器。
    2. private int timeout 超时时间
    3. private int connectTimeout 连接超时时间
  • AbstractServer
    1. private InetSocketAddress localAddress :url host:port地址。
    2. private InetSocketAddress bindAddress:如果是多网卡,并且指定了 bind.ip、bind.port,如果为空,与localAddress相同。
    3. private int accepts : AbstractServer#accepts未使用到。
    4. private int idleTimeout = 600; AbstractServer#accepts未使用到。
  • NettyServer
    1. private Map< String, Channel> channels:< ip:port, channel> 所有通道。
    2. private ServerBootstrap bootstrap : netty 服务端启动器。
    3. private io.netty.channel.Channel channel:服务端监听通道。
    4. private EventLoopGroup bossGroup;Netty boss线程组(负责连接事件)
    5. private EventLoopGroup workerGroup : nety work线程组(负责IO事件)

1.1 NettyServer 构造方法

public NettyServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
}

直接调用父类的public AbstractServer(URL url, ChannelHandler handler)方法,从前面的文章中得知, ChannelHandlers.wrap方法会对ChannelHandler handler进行封装,主要是加入事件分发模式(Dispatch)。

1.1.1 AbstractServer构造方法
public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);       // @1
        localAddress = getUrl().toInetSocketAddress();   // @2

        String bindIp = getUrl().getParameter(Constants.BIND_IP_KEY, getUrl().getHost());
        int bindPort = getUrl().getParameter(Constants.BIND_PORT_KEY, getUrl().getPort());
        if (url.getParameter(Constants.ANYHOST_KEY, false) || NetUtils.isInvalidLocalHost(bindIp)) {
            bindIp = NetUtils.ANYHOST;
        }
        bindAddress = new InetSocketAddress(bindIp, bindPort);   // @3
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);  
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT); // @4
        try {
            doOpen();   // @5
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName()
                    + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //fixme replace this with better method
        DataStore dataStore = ExtensionLoader.getExtensionLoader(DataStore.class).getDefaultExtension();
        executor = (ExecutorService) dataStore.get(Constants.EXECUTOR_SERVICE_COMPONENT_KEY, Integer.toString(url.getPort()));
    }

代码@1:调用父类的构造方法,主要初始化AbstractPeer(channelHandler、url)和AbstractEndpoint(codec2、timeout、idleTimeout )

代码@2:根据URL中的host与端口,创建localAddress。

代码@3:如果配置了< dubbo:parameter key = "bind.ip" value = ""/> 与 < dubbo:parameter key = "bind.port" />,则用该IP与端口创建bindAddress,通常用于多网卡,如果未配置,bindAddress与 localAddress绑定的IP与端口一样。

代码@4:初始化accepts与idleTimeout ,这两个参数未被其他地方使用。

代码@5,调用doOpen方法,正式在相应端口建立网络监听。

1.2、源码分析NettyServer#doOpen

protected void doOpen() throws Throwable {
        NettyHelper.setNettyLoggerFactory();
        bootstrap = new ServerBootstrap();       // @1
        bossGroup = new NioEventLoopGroup(1, new DefaultThreadFactory("NettyServerBoss", true));    // @2
        workerGroup = new NioEventLoopGroup(getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS),
                new DefaultThreadFactory("NettyServerWorker", true));    // @3
        final NettyServerHandler nettyServerHandler = new NettyServerHandler(getUrl(), this);   // @4
        channels = nettyServerHandler.getChannels();
        bootstrap.group(bossGroup, workerGroup)                                                                        // @5
                .channel(NioServerSocketChannel.class)
                .childOption(ChannelOption.TCP_NODELAY, Boolean.TRUE)
                .childOption(ChannelOption.SO_REUSEADDR, Boolean.TRUE)
                .childOption(ChannelOption.ALLOCATOR, PooledByteBufAllocator.DEFAULT)
                .childHandler(new ChannelInitializer<niosocketchannel>() {
                    @Override
                    protected void initChannel(NioSocketChannel ch) throws Exception {
                        NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyServer.this);
                        ch.pipeline()//.addLast("logging",new LoggingHandler(LogLevel.INFO))//for debug                       
                                .addLast("decoder", adapter.getDecoder())   
                                .addLast("encoder", adapter.getEncoder())
                                .addLast("handler", nettyServerHandler);
                    }
                });
        // bind
        ChannelFuture channelFuture = bootstrap.bind(getBindAddress());     // @6
        channelFuture.syncUninterruptibly();
        channel = channelFuture.channel();
    }

代码@1:创建Netty服务端启动帮助类ServerBootstrap.

代码@2:创建服务端Boss线程,线程名:.NettyServerBoss,主要负责客户端的连接事件,主从多Reactor线程模型中的主线程(连接事件)。

代码@3:创建服务端Work线程组,线程名:NettyServerWorker-序号,线程个数取自参数:iothreads,默认为(CPU核数+1)与32取小值,顾名思义,IO线程数,主要处理读写事件,编码、解码都在IO线程中完成。

代码@4:创建用户Handler,这里是NettyServerHandler。 代码@5:Netty启动的常规写法,关注如下内容:

 addLast("decoder", adapter.getDecoder())  : 添加解码器
 addLast("encoder", adapter.getEncoder()) :添加编码器
 addLast("handler", nettyServerHandler) :添加业务Handler。

这里简单介绍一下流程:

  1. 客户端建立与服务端连接,此时Boss线程的连接事件触发,建立TCP连接,并向IO线程注册该通道(Channel0)的读事件。
  2. 当客户端向服务端发送请求消息后,IO线程中的读事件触发,会首先调用adapter.getDecoder() 根据对应的请求协议(例如dubbo)从二进制流中解码出一个完整的请求对象,然后传入到业务handler,例如nettyServerHandler,执行相应的事件方法,例如recive方法。
  3. 当服务端向Channel写入响应结果时,首先编码器会按照协议编码成二进制流,供客户端解码。

如果对Netty想深入学习的话,请移步到作者的《源码分析Netty系列》

2、HeaderExchangeServer

根据 Dubbo 服务端初始化流程,我们可知,Dubbo 为了封装各种不同的网络实现客户端(netty、mina)等,引入了 Exchangers 层,存在 ExchangeServer,其实现 Server 并内部持有具体的 Server 实现端,例如 NettyServer。 这里写图片描述

接下来,我们重点来关注一下 HeaderExchangeServer. 核心属性如下:

  • ScheduledExecutorService scheduled:心跳线程数,线程名称前缀,dubbo-remoting-server-heartbeat-thread-序号
  • private final Server server:具体的Server实现类,例如NettyServer。
  • private ScheduledFuture< ?> heartbeatTimer:心跳调度Future,可以通过future取消心跳等动作。
  • private int heartbeat:心跳间隔时间
  • private int heartbeatTimeout:心跳超时时间,至少为heartbeat的两倍

2.1 构造函数

public HeaderExchangeServer(Server server) {
        if (server == null) {
            throw new IllegalArgumentException("server == null");
        }
        this.server = server;
        this.heartbeat = server.getUrl().getParameter(Constants.HEARTBEAT_KEY, 0);
        this.heartbeatTimeout = server.getUrl().getParameter(Constants.HEARTBEAT_TIMEOUT_KEY, heartbeat * 3);
        if (heartbeatTimeout &lt; heartbeat * 2) {
            throw new IllegalStateException("heartbeatTimeout &lt; heartbeatInterval * 2");
        }
        startHeartbeatTimer();
    }

说明,主要是通过heartbeat参数设置心跳间隔,如果不配置,则不启动心跳检测。从上面看来HeaderExchangeServer内部持有Server,并封装了心跳的功能,在这里就不细细分析了。


>作者介绍:丁威,《RocketMQ技术内幕》作者,RocketMQ 社区优秀布道师、CSDN2019博客之星TOP10,维护公众号:中间件兴趣圈目前已陆续发表源码分析Java集合、Java 并发包(JUC)、Netty、Mycat、Dubbo、RocketMQ、Mybatis等源码专栏。可以点击链接加入中间件知识星球 ,一起探讨高并发、分布式服务架构,交流源码。

在这里插入图片描述</niosocketchannel>

© 著作权归作者所有

中间件兴趣圈

中间件兴趣圈

粉丝 50
博文 47
码字总数 128914
作品 0
青浦
私信 提问
加载中

评论(1)

cainiao1234
cainiao1234
你好,看了你的文章,觉得你是一名对技术很有追求的同学,请问有兴趣加入菜鸟国际物流技术部吗?
dubbo+zk通信数据时的注意事项

config,配置层,对外配置接口,以ServiceConfig, ReferenceConfig为中心,可以直接new配置类,也可以通过spring解析配置生成配置类 proxy,服务代理层,服务接口透明代理,生成服务的客户端...

语落心生
2019/07/18
0
0
dubbo 请求调用过程分析

服务消费方发起请求 当服务的消费方引用了某远程服务,服务的应用方在spring的配置实例如下: demoService实例其实是代理工厂生产的代理对象(大家可以参考代理那部分生成的伪代码),在代码...

赵蕊
2017/06/07
229
1
Dubbo源码分析-Remoting层

Dubbo Github地址 https://github.com/alibaba/dubbo.git Dubbo Remoting 模块是dubbo底层通信模块的实现。实现对请求/应答的各种逻辑处理,包括同步,异步,心跳等逻辑,最底层的通信借助n...

robin-yao
2016/12/11
1K
5
dubbo源码分析系列(4)dubbo通信设计

1 系列目录 - dubbo源码分析系列(1)扩展机制的实现- dubbo源码分析系列(2)服务的发布- dubbo源码分析系列(3)服务的引用- dubbo源码分析系列(4)dubbo通信设计 2 NIO通信层的抽象 目前...

乒乓狂魔
2015/10/26
8.1K
5
dubbo通信消息解析过程分析(1)

由于rpc底层涉及网络编程接口,线程模型,网络数据结构,服务协议,细到字节的处理。牵涉内容较多,今天就先从一个点说起。 说说,dubbo通过netty框架做传输层,从接到数据字节流到把字节转换...

wannshan
2018/01/19
379
0

没有更多内容

加载失败,请刷新页面

加载更多

郑州哪哪里可以开工程款发票-郑州_新闻网

【电薇同步;1.3.8 - 2.7.4.1 - 5.2.9.7.】张生、诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridge,是Android手机通用...

yyqqvip
47分钟前
30
0
Nginx 反向代理访问

在Nginx 配置 server { listen 80; server_name www.xiaocx.org www.xiaocx.org www.xiaocx.org; root /Users/maison/work/xiaocx/dist; index i......

韩庚庚
51分钟前
33
0
python笔记:环境变量已设置CMD中一直报错"python"不是内部命令,也不是可运行的程序或批处理文件

这些天虽然也写了几个小工具,但是打包都是在anaconda prompt中完成的,因为CMD中一直报错"python"不是内部命令,也不是可运行的程序或批处理文件,各种查度,千篇一律的是环境变量配置的问题...

小玲_001
53分钟前
13
0
AI+BI服务模式

术语与缩写解释 缩写、术语 解 释 BI 商业智能(Business Intelligence,简称:BI),又称商业智慧或商务智能,指用现代数据仓库技术、线上分析处理技术、数据挖掘和数据展现技术进行数据分析...

zoegu228
54分钟前
22
0
leetcode1227(面试题 17.09. 第 k 个数)--C语言实现

求: 有些数的素因子只有 3,5,7,请设计一个算法找出第 k 个数。注意,不是必须有这些素因子,而是必须不包含其他的素因子。例如,前几个数按顺序应该是 1,3,5,7,9,15,21。 示例 1:...

拓拔北海
今天
27
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部