文档章节

Netty里面的Boss和Worker【Server篇】

Bieber
 Bieber
发布于 2015/04/27 14:03
字数 2397
阅读 6449
收藏 18
点赞 4
评论 1

#Netty里面的Boss和Worker【Server篇】 最近在总结Dubbo关于Netty通信方面的实现,于是也就借此机会深入体会了一下Netty。一般启动Netty的Server端时都会设置两个ExecutorService对象,我们都习惯用boss,worker两个变量来引用这两个对象,于是从我一开始接触Netty就有了boss和worker的概念。这篇博客将对boss和worker进行介绍,但并不是涉及Netty其他部分介绍。

在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。

<!--lang:java-->
protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);
    
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

上面这段代码是Dubbo用来开启服务的,也是大部分使用Netty进行服务端开发常用的方式启动服务端。首先是设置boss和worker的线程池,以能够让它们在各自的线程池里面异步执行。当调用bootstrap.bind(getBindAddress())的时候最终受理绑定操作的是NioServerSocketPipelineSinkeventSunk方法,看类名和方法签名就应该知道是处理IO事件的。方法eventSunk实现如下:

<!--lang:java-->
public void eventSunk(
        ChannelPipeline pipeline, ChannelEvent e) throws Exception {
    Channel channel = e.getChannel();
    if (channel instanceof NioServerSocketChannel) {
        handleServerSocket(e);
    } else if (channel instanceof NioSocketChannel) {
        handleAcceptedSocket(e);
    }
}

由于这个时候Server还处于bind阶段,所以channel肯定不是NioSocketChannel,于是就到了方法handleServerSocket里面,最后将会调用bind方法来绑定某个端口启动服务。下面是bind方法实现:

<!--lang:java-->
private void bind(
        NioServerSocketChannel channel, ChannelFuture future,
        SocketAddress localAddress) {

    boolean bound = false;
    boolean bossStarted = false;
    try {
        channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
        bound = true;

        future.setSuccess();
        fireChannelBound(channel, channel.getLocalAddress());

        Executor bossExecutor =
            ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
        DeadLockProofWorker.start(
                bossExecutor,
                new ThreadRenamingRunnable(
                        new Boss(channel),
                        "New I/O server boss #" + id + " (" + channel + ')'));
        bossStarted = true;
    } catch (Throwable t) {
        future.setFailure(t);
        fireExceptionCaught(channel, t);
    } finally {
        if (!bossStarted && bound) {
            close(channel, future);
        }
    }
}

可以看到socket的绑定以及设置异步的future成功,已通知服务启动成功,同时将绑定成功事件通知出去。接下来我看的重点来了,就是bossExecutor,可以看到它是通过NioServerSocketChannelFactory里面去获取的,NioServerSocketChannelFactory里面的boss就是之前我们设置进去的,可以确定我们之前设置boss的异步线程池是在这里被使用了。紧接下来的是启动我们的异步线程池,到这里进入了Boss该做的事情,Boss其实是实现了Runnable接口,从而可以交给boss的线程池运行,接下来的关注点就是Boss的run方法,这里才是Boss做事情的地方。再此之前先看看Boss初始化做了什么事情:

<!--lang:java-->
Boss(NioServerSocketChannel channel) throws IOException {
        this.channel = channel;

        selector = Selector.open();

        boolean registered = false;
        try {
            channel.socket.register(selector, SelectionKey.OP_ACCEPT);
            registered = true;
        } finally {
            if (!registered) {
                closeSelector();
            }
        }

        channel.selector = selector;
    }

Boss初始化过程中其实就是将serversocket注册到一个selector里面,从而可以实现NIO的异步IO处理。

<!--lang:java-->
public void run() {
        final Thread currentThread = Thread.currentThread();

        channel.shutdownLock.lock();
        try {
            for (;;) {
                try {
                    if (selector.select(1000) > 0) {
                        selector.selectedKeys().clear();
                    }

                    SocketChannel acceptedSocket = channel.socket.accept();
                    if (acceptedSocket != null) {
                        registerAcceptedChannel(acceptedSocket, currentThread);
                    }
                } catch (SocketTimeoutException e) {
                    // Thrown every second to get ClosedChannelException
                    // raised.
                } catch (CancelledKeyException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedSelectorException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedChannelException e) {
                    // Closed as requested.
                    break;
                } catch (Throwable e) {
                    logger.warn(
                            "Failed to accept a connection.", e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        // Ignore
                    }
                }
            }
        } finally {
            channel.shutdownLock.unlock();
            closeSelector();
        }
    }

run方法里面是一个死循环,里面在不间断的等待客户端的连接,如果有客户端的连接,那么将会调用方法registerAcceptedChannel进行后续的处理。

<!--lang:java-->
 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
        try {
            ChannelPipeline pipeline =
                channel.getConfig().getPipelineFactory().getPipeline();
            NioWorker worker = nextWorker();
            worker.register(new NioAcceptedSocketChannel(
                    channel.getFactory(), pipeline, channel,
                    NioServerSocketPipelineSink.this, acceptedSocket,
                    worker, currentThread), null);
        } catch (Exception e) {
            logger.warn(
                    "Failed to initialize an accepted socket.", e);
            try {
                acceptedSocket.close();
            } catch (IOException e2) {
                logger.warn(
                        "Failed to close a partially accepted socket.",
                        e2);
            }
        }
    }

方法registerAcceptedChannel就是将客户端的channle分配给一个worker,而这个worker是通过方法nextWorker获取 <!--lang:java--> NioWorker nextWorker() { return workers[Math.abs( workerIndex.getAndIncrement() % workers.length)]; }

可以看到方法nextWorker是一个让worker里面的客户端channel保持平衡的作用,可能你会疑问这个workers是哪里来的,其实是在上面初始化NioServerSocketChannelFactory的时候,NioServerSocketChannelFactory再去初始化NioServerSocketPipelineSink时候构造出来的,默认情况下workers的数量是我们初始化NioServerSocketChannelFactory设置进去的。可以看到是调用worker的register方法将客户端的channel注册到worker里面的。

<!--lang:java-->
void register(NioSocketChannel channel, ChannelFuture future) {

    boolean server = !(channel instanceof NioClientSocketChannel);
    Runnable registerTask = new RegisterTask(channel, future, server);
    Selector selector;

    synchronized (startStopLock) {
        if (!started) {
            .....
                this.selector = selector = Selector.open();
           .....
                DeadLockProofWorker.start(
                        executor, new ThreadRenamingRunnable(this, threadName));
                success = true;
          .....
        } else {
            selector = this.selector;
        }

        assert selector != null && selector.isOpen();

        started = true;
        boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;
    }

    if (wakenUp.compareAndSet(false, true)) {
        selector.wakeup();
    }
}

上面对worker有一个started状态的检测,如果没启动,则启动worker,这个额一般都是将第一个客户端的channel注册到worker里面才进行的。由于worker也是实现了Rannable接口,所以启动的主要工作就是让worker在某个线程里面跑起来,并且为这个worker分配一个selector,用来进行监控IO事件。下面便是这个过程实现:

<!--lang:java-->
  DeadLockProofWorker.start(
                        executor, new ThreadRenamingRunnable(this, threadName));
                success = true;

其中的executor便是我们一开始设置的workerExecutor。 worker启动成功之后,接下来要做的便是让worker管理器客户端的channel

<!--lang:java-->
 Runnable registerTask = new RegisterTask(channel, future, server);
	.......
 boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;

worker是将客户端包装成一个RegisterTask,然后放入队列,可见RegisterTask也实现了Runnable接口。那放入队列以后谁去取这个队列里面的数据呢?当然,肯定是worker去取。上面介绍启动worker的时候是让worker在某个线程里面跑起来,并且worker是实现了Rannable方法,于是运行worker的线程肯定是调用worker的run方法。

<!--lang:java-->
 public void run() {
    thread = Thread.currentThread();
    boolean shutdown = false;
    Selector selector = this.selector;
    for (;;) {
         .....
        try {
            SelectorUtil.select(selector);
			.....

            cancelledKeys = 0;
            processRegisterTaskQueue();
            processWriteTaskQueue();
            processSelectedKeys(selector.selectedKeys());
             .....
        } catch (Throwable t) {
             
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
              
            }
        }
    }
}

可以看到run方法里面也是一个死循环,在不断的轮询调用selector的select IO的事件。接下来会调用三个方法processRegisterTaskQueue,processWriteTaskQueueprocessSelectedKeys。通过方法签名就应该知道这个三个方法具体是做什么事情的,第一个是处理上面registerTaskQueue的,并且queue里面对象的run方法,而第二个processWriteTaskQueue是处理写任务的,而processSelectedKeys是处理selector匹配的IO事件。我们先看看registerTaskQueue是做了什么?

<!--lang:java-->
 private void processRegisterTaskQueue() throws IOException {
    for (;;) {
        final Runnable task = registerTaskQueue.poll();
        if (task == null) {
            break;
        }

        task.run();
        cleanUpCancelledKeys();
    }
}

上面介绍过registerTaskQueue里面的元素是RegisterTask。所以需要去看看RegisterTask的run方法实现,其中RegisterTaskNioWorker里面的内部类,所以RegisterTask是可以访问NioWorker的元素信息。

<!--lang:java-->
 public void run() {
        SocketAddress localAddress = channel.getLocalAddress();
        SocketAddress remoteAddress = channel.getRemoteAddress();
        if (localAddress == null || remoteAddress == null) {
            if (future != null) {
                future.setFailure(new ClosedChannelException());
            }
            close(channel, succeededFuture(channel));
            return;
        }

        try {
            if (server) {
                channel.socket.configureBlocking(false);
            }

            synchronized (channel.interestOpsLock) {
                channel.socket.register(
                        selector, channel.getRawInterestOps(), channel);
            }
            if (future != null) {
                channel.setConnected();
                future.setSuccess();
            }
        } catch (IOException e) {
            if (future != null) {
                future.setFailure(e);
            }
            close(channel, succeededFuture(channel));
           ....
        }

        if (!server) {
            if (!((NioClientSocketChannel) channel).boundManually) {
                fireChannelBound(channel, localAddress);
            }
            fireChannelConnected(channel, remoteAddress);
        }
    }

可以看到这里面主要做的事情是将Boss分配给worker的客户端channel和worker的selector关联上,从而worker可以处理该客户端channel的IO事件。

到这里就完成了由Boss接收到一个客户端连接,到分配给某个worker,以及worker是怎么去和客户端的channel关联的,其中由于worker有可能为多个客户端channel服务,所以worker并不会直接和某个channel产生引用,而是将客户端的channel注册在该worker的selector上面,worker的run方法里面通过不断对selector的select轮询,以达到对channel进行处理。接下来看看worker怎么处理selector的io事件的

<!--java:lang-->
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
    for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
        SelectionKey k = i.next();
        i.remove();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                if (!read(k)) {
                    // Connection already closed - no need to handle write.
                    continue;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                writeFromSelectorLoop(k);
            }
        } catch (CancelledKeyException e) {
            close(k);
        }

        if (cleanUpCancelledKeys()) {
            break; // break the loop to avoid ConcurrentModificationException
        }
    }
}

上面的方法完成的是处理selector产生的io事件,其中如果当前IO时间是读,那么将SelectionKey中的channel流进行读出,并且向上交给Netty的Handler。如果是当前某个channel的写满足条件,则触发writeFromSelectorLoop查看是否有待写出的内容。

对于写数据Netty在worker提供了三种入口

<!--lang:java-->
void writeFromUserCode(final NioSocketChannel channel) {
    if (!channel.isConnected()) {
        cleanUpWriteBuffer(channel);
        return;
    }

    if (scheduleWriteIfNecessary(channel)) {
        return;
    }

    if (channel.writeSuspended) {
        return;
    }

    if (channel.inWriteNowLoop) {
        return;
    }

    write0(channel);
}

void writeFromTaskLoop(final NioSocketChannel ch) {
    if (!ch.writeSuspended) {
        write0(ch);
    }
}

void writeFromSelectorLoop(final SelectionKey k) {
    NioSocketChannel ch = (NioSocketChannel) k.attachment();
    ch.writeSuspended = false;
    write0(ch);
}

其中writeFromUserCode是提供外部直接写出的,writeFromTaskLoop是在worker的run方法调用processWriteTaskQueue时候会触发。

© 著作权归作者所有

共有 人打赏支持
Bieber
粉丝 204
博文 36
码字总数 83312
作品 1
杭州
高级程序员
加载中

评论(1)

最爱xiaoyu77
最爱xiaoyu77
boss 中 register worker 的 channel那一段,醍醐灌顶,:thumbsup:
一个http请求在play框架中的前世今生(下)

上一篇提到了play底层的网络通信基于netty实现,于是粗略地研究了一下netty,总结如下。(netty版本是3.2.5,不同版本的实现可能差异较大) 一、netty的组件 channelBuffer: 传输Buffer和抽象...

2k10 ⋅ 2015/03/20 ⋅ 0

Netty 框架

Netty是Reactor模式事件驱动网络框架,Netty不仅用法简单,而且性能和可靠性都很好,所以被很多的中间件作为网络层使用,像dubbo, RocketMQ底层都采用了Netty。 Netty采用的是Reactor模式,由...

AaronSheng ⋅ 2016/10/14 ⋅ 0

Netty与Reactor模式

引言 1.1 Netty、NIO、多线程 关于Netty与NIO、多线程之间的关系,可以参考@李林锋发布的一个Netty5.0架构剖析和源码解读的文章,在这篇文章中详细的介绍了Java I/O的演进过程和Linux I/O网络...

无寄语 ⋅ 2016/09/11 ⋅ 0

netty 关于 http 的问题

netty http协议的使用过程中发现这样的问题。启动Server端,端口设为801,接着跑Client端。跑 批量的Client端,第一次跑多条(假如3000条)是没有问题的。但是跑完第一次,接着跑第二次,发现...

C.X.Q ⋅ 2011/09/22 ⋅ 1

Java NIO框架--Netty4的简单示例

简介 相比Netty3, Netty4有很多显著的变化: NioEventLoopGroup 是一个处理I/O操作的多线程事件环。即为Netty4里的线程池,在3.x里,一个Channel是由ChannelFactory创建的,同时新创建的Cha...

cloud-coder ⋅ 2015/01/05 ⋅ 0

Netty关于Channel的使用的官方解释

NioEventLoopGroup is a multithreaded event loop that handles I/O operation. Netty provides various EventLoopGroup implementations for different kind of transports. We are implem......

Finley.Hamilton ⋅ 2014/11/11 ⋅ 0

高性能网络通信框架Netty-基础概念篇

一、前言 Netty是一种可以轻松快速的开发协议服务器和客户端网络应用程序的NIO框架,它大大简化了TCP或者UDP服务器的网络编程,但是你仍然可以访问和使用底层的API,Netty只是对其进行了高层的...

阿里加多 ⋅ 06/03 ⋅ 0

Netty那点事(四)Netty与Reactor模式

![Reactors][1] 一:Netty、NIO、多线程? 时隔很久终于又更新了!之前一直迟迟未动也是因为积累不够,后面比较难下手。过年期间@李林锋hw发布了一个Netty5.0架构剖析和源码解读 http://vdi...

黄亿华 ⋅ 2014/02/08 ⋅ 10

Netty服务器线程模型概览

一切从ServerBootstrap开始 ServerBootstrap 负责初始话netty服务器,并且开始监听端口的socket请求。 Java代码 bootstrap bootstrap = new ServerBootstrap( ); bootstrap.setPipelineFacto......

gongweixin ⋅ 2013/08/26 ⋅ 8

ES 源代码阅读(三)

elasticsearch 1.7.5 中 的maven依赖 io.netty netty 3.10.5.Final NettyTransport.java 首先使用 ServerSocketChannelFactory 来创建 ServerBootstrap 实例, 然后设置 piplineFactory.Serve......

parker ⋅ 2016/10/17 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

6.1 压缩打包介绍 6.2 gzip压缩工具 6.3 bzip2压缩工具 6.4 xz压缩工具

压缩打包介绍 使用压缩工具的好处: 使用压缩文件,不仅可以节省磁盘空间,而且在传输时还能节省网络宽带。 我们通常讲的家用宽带和机房宽带100M是有区别的: 机房宽带的上行和下行都是100M,...

Linux_老吴 ⋅ 16分钟前 ⋅ 0

SpringBoot热部署加持

概述 进行SpringBoot的Web开发过程中,我们很多时候经常需要重启Web服务器才能保证修改的 源代码文件、或者一些诸如xml的配置文件、以及一些静态文件生效,这样耗时又低效。所谓的热部署指的...

CodeSheep ⋅ 22分钟前 ⋅ 0

OSChina 周六乱弹 —— 假如你被熊困到树上

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @小小编辑:推荐歌曲《如果写不出好的和弦就该在洒满阳光的钢琴前一起吃布丁》 《如果写不出好的和弦就该在洒满阳光的钢琴前一起吃布丁》- 谢...

小小编辑 ⋅ 53分钟前 ⋅ 5

vbs 取文件大小 字节

dim namedim fs, s'name = Inputbox("姓名")'msgbox(name)set fs = wscript.createobject("scripting.filesystemobject") 'fs为FSO实例if (fs.folderexists("c:\temp"))......

vga ⋅ 今天 ⋅ 1

高并发之Nginx的限流

首先Nginx的版本号有要求,最低为1.11.5 如果低于这个版本,在Nginx的配置中 upstream web_app { server 到达Ip1:端口 max_conns=10; server 到达Ip2:端口 max_conns=10; } server { listen ...

算法之名 ⋅ 今天 ⋅ 0

Spring | IOC AOP 注解 简单使用

写在前面的话 很久没更新笔记了,有人会抱怨:小冯啊,你是不是在偷懒啊,没有学习了。老哥,真的冤枉:我觉得我自己很菜,还在努力学习呢,正在学习Vue.js做管理系统呢。即便这样,我还是不...

Wenyi_Feng ⋅ 今天 ⋅ 0

博客迁移到 https://www.jianshu.com/u/aa501451a235

博客迁移到 https://www.jianshu.com/u/aa501451a235 本博客不再更新

为为02 ⋅ 今天 ⋅ 0

win10怎么彻底关闭自动更新

win10自带的更新每天都很多,每一次下载都要占用大量网络,而且安装要等得时间也蛮久的。 工具/原料 Win10 方法/步骤 单击左下角开始菜单点击设置图标进入设置界面 在设置窗口中输入“服务”...

阿K1225 ⋅ 今天 ⋅ 0

Elasticsearch 6.3.0 SQL功能使用案例分享

The best elasticsearch highlevel java rest api-----bboss Elasticsearch 6.3.0 官方新推出的SQL检索插件非常不错,本文一个实际案例来介绍其使用方法。 1.代码中的sql检索 @Testpu...

bboss ⋅ 今天 ⋅ 0

informix数据库在linux中的安装以及用java/c/c++访问

一、安装前准备 安装JDK(略) 到IBM官网上下载informix软件:iif.12.10.FC9DE.linux-x86_64.tar放在某个大家都可以访问的目录比如:/mypkg,并解压到该目录下。 我也放到了百度云和天翼云上...

wangxuwei ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部