文档章节

Netty里面的Boss和Worker【Server篇】

Bieber
 Bieber
发布于 2015/04/27 14:03
字数 2397
阅读 7186
收藏 18

#Netty里面的Boss和Worker【Server篇】 最近在总结Dubbo关于Netty通信方面的实现,于是也就借此机会深入体会了一下Netty。一般启动Netty的Server端时都会设置两个ExecutorService对象,我们都习惯用boss,worker两个变量来引用这两个对象,于是从我一开始接触Netty就有了boss和worker的概念。这篇博客将对boss和worker进行介绍,但并不是涉及Netty其他部分介绍。

在Netty的里面有一个Boss,他开了一家公司(开启一个服务端口)对外提供业务服务,它手下有一群做事情的workers。Boss一直对外宣传自己公司提供的业务,并且接受(accept)有需要的客户(client),当一位客户找到Boss说需要他公司提供的业务,Boss便会为这位客户安排一个worker,这个worker全程为这位客户服务(read/write)。如果公司业务繁忙,一个worker可能会为多个客户进行服务。这就是Netty里面Boss和worker之间的关系。下面看看Netty是如何让Boss和Worker进行协助的。

<!--lang:java-->
protected void doOpen() throws Throwable {
    NettyHelper.setNettyLoggerFactory();
    ExecutorService boss = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerBoss", true));
    ExecutorService worker = Executors.newCachedThreadPool(new NamedThreadFactory("NettyServerWorker", true));
    ChannelFactory channelFactory = new NioServerSocketChannelFactory(boss, worker, getUrl().getPositiveParameter(Constants.IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS));
    bootstrap = new ServerBootstrap(channelFactory);
    
    final NettyHandler nettyHandler = new NettyHandler(getUrl(), this);
    channels = nettyHandler.getChannels();
    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {
        public ChannelPipeline getPipeline() {
            NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec() ,getUrl(), NettyServer.this);
            ChannelPipeline pipeline = Channels.pipeline();
            pipeline.addLast("decoder", adapter.getDecoder());
            pipeline.addLast("encoder", adapter.getEncoder());
            pipeline.addLast("handler", nettyHandler);
            return pipeline;
        }
    });
    // bind
    channel = bootstrap.bind(getBindAddress());
}

上面这段代码是Dubbo用来开启服务的,也是大部分使用Netty进行服务端开发常用的方式启动服务端。首先是设置boss和worker的线程池,以能够让它们在各自的线程池里面异步执行。当调用bootstrap.bind(getBindAddress())的时候最终受理绑定操作的是NioServerSocketPipelineSinkeventSunk方法,看类名和方法签名就应该知道是处理IO事件的。方法eventSunk实现如下:

<!--lang:java-->
public void eventSunk(
        ChannelPipeline pipeline, ChannelEvent e) throws Exception {
    Channel channel = e.getChannel();
    if (channel instanceof NioServerSocketChannel) {
        handleServerSocket(e);
    } else if (channel instanceof NioSocketChannel) {
        handleAcceptedSocket(e);
    }
}

由于这个时候Server还处于bind阶段,所以channel肯定不是NioSocketChannel,于是就到了方法handleServerSocket里面,最后将会调用bind方法来绑定某个端口启动服务。下面是bind方法实现:

<!--lang:java-->
private void bind(
        NioServerSocketChannel channel, ChannelFuture future,
        SocketAddress localAddress) {

    boolean bound = false;
    boolean bossStarted = false;
    try {
        channel.socket.socket().bind(localAddress, channel.getConfig().getBacklog());
        bound = true;

        future.setSuccess();
        fireChannelBound(channel, channel.getLocalAddress());

        Executor bossExecutor =
            ((NioServerSocketChannelFactory) channel.getFactory()).bossExecutor;
        DeadLockProofWorker.start(
                bossExecutor,
                new ThreadRenamingRunnable(
                        new Boss(channel),
                        "New I/O server boss #" + id + " (" + channel + ')'));
        bossStarted = true;
    } catch (Throwable t) {
        future.setFailure(t);
        fireExceptionCaught(channel, t);
    } finally {
        if (!bossStarted && bound) {
            close(channel, future);
        }
    }
}

可以看到socket的绑定以及设置异步的future成功,已通知服务启动成功,同时将绑定成功事件通知出去。接下来我看的重点来了,就是bossExecutor,可以看到它是通过NioServerSocketChannelFactory里面去获取的,NioServerSocketChannelFactory里面的boss就是之前我们设置进去的,可以确定我们之前设置boss的异步线程池是在这里被使用了。紧接下来的是启动我们的异步线程池,到这里进入了Boss该做的事情,Boss其实是实现了Runnable接口,从而可以交给boss的线程池运行,接下来的关注点就是Boss的run方法,这里才是Boss做事情的地方。再此之前先看看Boss初始化做了什么事情:

<!--lang:java-->
Boss(NioServerSocketChannel channel) throws IOException {
        this.channel = channel;

        selector = Selector.open();

        boolean registered = false;
        try {
            channel.socket.register(selector, SelectionKey.OP_ACCEPT);
            registered = true;
        } finally {
            if (!registered) {
                closeSelector();
            }
        }

        channel.selector = selector;
    }

Boss初始化过程中其实就是将serversocket注册到一个selector里面,从而可以实现NIO的异步IO处理。

<!--lang:java-->
public void run() {
        final Thread currentThread = Thread.currentThread();

        channel.shutdownLock.lock();
        try {
            for (;;) {
                try {
                    if (selector.select(1000) > 0) {
                        selector.selectedKeys().clear();
                    }

                    SocketChannel acceptedSocket = channel.socket.accept();
                    if (acceptedSocket != null) {
                        registerAcceptedChannel(acceptedSocket, currentThread);
                    }
                } catch (SocketTimeoutException e) {
                    // Thrown every second to get ClosedChannelException
                    // raised.
                } catch (CancelledKeyException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedSelectorException e) {
                    // Raised by accept() when the server socket was closed.
                } catch (ClosedChannelException e) {
                    // Closed as requested.
                    break;
                } catch (Throwable e) {
                    logger.warn(
                            "Failed to accept a connection.", e);
                    try {
                        Thread.sleep(1000);
                    } catch (InterruptedException e1) {
                        // Ignore
                    }
                }
            }
        } finally {
            channel.shutdownLock.unlock();
            closeSelector();
        }
    }

run方法里面是一个死循环,里面在不间断的等待客户端的连接,如果有客户端的连接,那么将会调用方法registerAcceptedChannel进行后续的处理。

<!--lang:java-->
 private void registerAcceptedChannel(SocketChannel acceptedSocket, Thread currentThread) {
        try {
            ChannelPipeline pipeline =
                channel.getConfig().getPipelineFactory().getPipeline();
            NioWorker worker = nextWorker();
            worker.register(new NioAcceptedSocketChannel(
                    channel.getFactory(), pipeline, channel,
                    NioServerSocketPipelineSink.this, acceptedSocket,
                    worker, currentThread), null);
        } catch (Exception e) {
            logger.warn(
                    "Failed to initialize an accepted socket.", e);
            try {
                acceptedSocket.close();
            } catch (IOException e2) {
                logger.warn(
                        "Failed to close a partially accepted socket.",
                        e2);
            }
        }
    }

方法registerAcceptedChannel就是将客户端的channle分配给一个worker,而这个worker是通过方法nextWorker获取 <!--lang:java--> NioWorker nextWorker() { return workers[Math.abs( workerIndex.getAndIncrement() % workers.length)]; }

可以看到方法nextWorker是一个让worker里面的客户端channel保持平衡的作用,可能你会疑问这个workers是哪里来的,其实是在上面初始化NioServerSocketChannelFactory的时候,NioServerSocketChannelFactory再去初始化NioServerSocketPipelineSink时候构造出来的,默认情况下workers的数量是我们初始化NioServerSocketChannelFactory设置进去的。可以看到是调用worker的register方法将客户端的channel注册到worker里面的。

<!--lang:java-->
void register(NioSocketChannel channel, ChannelFuture future) {

    boolean server = !(channel instanceof NioClientSocketChannel);
    Runnable registerTask = new RegisterTask(channel, future, server);
    Selector selector;

    synchronized (startStopLock) {
        if (!started) {
            .....
                this.selector = selector = Selector.open();
           .....
                DeadLockProofWorker.start(
                        executor, new ThreadRenamingRunnable(this, threadName));
                success = true;
          .....
        } else {
            selector = this.selector;
        }

        assert selector != null && selector.isOpen();

        started = true;
        boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;
    }

    if (wakenUp.compareAndSet(false, true)) {
        selector.wakeup();
    }
}

上面对worker有一个started状态的检测,如果没启动,则启动worker,这个额一般都是将第一个客户端的channel注册到worker里面才进行的。由于worker也是实现了Rannable接口,所以启动的主要工作就是让worker在某个线程里面跑起来,并且为这个worker分配一个selector,用来进行监控IO事件。下面便是这个过程实现:

<!--lang:java-->
  DeadLockProofWorker.start(
                        executor, new ThreadRenamingRunnable(this, threadName));
                success = true;

其中的executor便是我们一开始设置的workerExecutor。 worker启动成功之后,接下来要做的便是让worker管理器客户端的channel

<!--lang:java-->
 Runnable registerTask = new RegisterTask(channel, future, server);
	.......
 boolean offered = registerTaskQueue.offer(registerTask);
        assert offered;

worker是将客户端包装成一个RegisterTask,然后放入队列,可见RegisterTask也实现了Runnable接口。那放入队列以后谁去取这个队列里面的数据呢?当然,肯定是worker去取。上面介绍启动worker的时候是让worker在某个线程里面跑起来,并且worker是实现了Rannable方法,于是运行worker的线程肯定是调用worker的run方法。

<!--lang:java-->
 public void run() {
    thread = Thread.currentThread();
    boolean shutdown = false;
    Selector selector = this.selector;
    for (;;) {
         .....
        try {
            SelectorUtil.select(selector);
			.....

            cancelledKeys = 0;
            processRegisterTaskQueue();
            processWriteTaskQueue();
            processSelectedKeys(selector.selectedKeys());
             .....
        } catch (Throwable t) {
             
            try {
                Thread.sleep(1000);
            } catch (InterruptedException e) {
              
            }
        }
    }
}

可以看到run方法里面也是一个死循环,在不断的轮询调用selector的select IO的事件。接下来会调用三个方法processRegisterTaskQueue,processWriteTaskQueueprocessSelectedKeys。通过方法签名就应该知道这个三个方法具体是做什么事情的,第一个是处理上面registerTaskQueue的,并且queue里面对象的run方法,而第二个processWriteTaskQueue是处理写任务的,而processSelectedKeys是处理selector匹配的IO事件。我们先看看registerTaskQueue是做了什么?

<!--lang:java-->
 private void processRegisterTaskQueue() throws IOException {
    for (;;) {
        final Runnable task = registerTaskQueue.poll();
        if (task == null) {
            break;
        }

        task.run();
        cleanUpCancelledKeys();
    }
}

上面介绍过registerTaskQueue里面的元素是RegisterTask。所以需要去看看RegisterTask的run方法实现,其中RegisterTaskNioWorker里面的内部类,所以RegisterTask是可以访问NioWorker的元素信息。

<!--lang:java-->
 public void run() {
        SocketAddress localAddress = channel.getLocalAddress();
        SocketAddress remoteAddress = channel.getRemoteAddress();
        if (localAddress == null || remoteAddress == null) {
            if (future != null) {
                future.setFailure(new ClosedChannelException());
            }
            close(channel, succeededFuture(channel));
            return;
        }

        try {
            if (server) {
                channel.socket.configureBlocking(false);
            }

            synchronized (channel.interestOpsLock) {
                channel.socket.register(
                        selector, channel.getRawInterestOps(), channel);
            }
            if (future != null) {
                channel.setConnected();
                future.setSuccess();
            }
        } catch (IOException e) {
            if (future != null) {
                future.setFailure(e);
            }
            close(channel, succeededFuture(channel));
           ....
        }

        if (!server) {
            if (!((NioClientSocketChannel) channel).boundManually) {
                fireChannelBound(channel, localAddress);
            }
            fireChannelConnected(channel, remoteAddress);
        }
    }

可以看到这里面主要做的事情是将Boss分配给worker的客户端channel和worker的selector关联上,从而worker可以处理该客户端channel的IO事件。

到这里就完成了由Boss接收到一个客户端连接,到分配给某个worker,以及worker是怎么去和客户端的channel关联的,其中由于worker有可能为多个客户端channel服务,所以worker并不会直接和某个channel产生引用,而是将客户端的channel注册在该worker的selector上面,worker的run方法里面通过不断对selector的select轮询,以达到对channel进行处理。接下来看看worker怎么处理selector的io事件的

<!--java:lang-->
private void processSelectedKeys(Set<SelectionKey> selectedKeys) throws IOException {
    for (Iterator<SelectionKey> i = selectedKeys.iterator(); i.hasNext();) {
        SelectionKey k = i.next();
        i.remove();
        try {
            int readyOps = k.readyOps();
            if ((readyOps & SelectionKey.OP_READ) != 0 || readyOps == 0) {
                if (!read(k)) {
                    // Connection already closed - no need to handle write.
                    continue;
                }
            }
            if ((readyOps & SelectionKey.OP_WRITE) != 0) {
                writeFromSelectorLoop(k);
            }
        } catch (CancelledKeyException e) {
            close(k);
        }

        if (cleanUpCancelledKeys()) {
            break; // break the loop to avoid ConcurrentModificationException
        }
    }
}

上面的方法完成的是处理selector产生的io事件,其中如果当前IO时间是读,那么将SelectionKey中的channel流进行读出,并且向上交给Netty的Handler。如果是当前某个channel的写满足条件,则触发writeFromSelectorLoop查看是否有待写出的内容。

对于写数据Netty在worker提供了三种入口

<!--lang:java-->
void writeFromUserCode(final NioSocketChannel channel) {
    if (!channel.isConnected()) {
        cleanUpWriteBuffer(channel);
        return;
    }

    if (scheduleWriteIfNecessary(channel)) {
        return;
    }

    if (channel.writeSuspended) {
        return;
    }

    if (channel.inWriteNowLoop) {
        return;
    }

    write0(channel);
}

void writeFromTaskLoop(final NioSocketChannel ch) {
    if (!ch.writeSuspended) {
        write0(ch);
    }
}

void writeFromSelectorLoop(final SelectionKey k) {
    NioSocketChannel ch = (NioSocketChannel) k.attachment();
    ch.writeSuspended = false;
    write0(ch);
}

其中writeFromUserCode是提供外部直接写出的,writeFromTaskLoop是在worker的run方法调用processWriteTaskQueue时候会触发。

© 著作权归作者所有

共有 人打赏支持
上一篇: Memcached那些事
下一篇: 论软件工程
Bieber
粉丝 208
博文 36
码字总数 83312
作品 1
杭州
高级程序员
私信 提问
加载中

评论(1)

最爱xiaoyu77
最爱xiaoyu77
boss 中 register worker 的 channel那一段,醍醐灌顶,👍
Netty那点事(四)Netty与Reactor模式

![Reactors][1] 一:Netty、NIO、多线程? 时隔很久终于又更新了!之前一直迟迟未动也是因为积累不够,后面比较难下手。过年期间@李林锋hw发布了一个Netty5.0架构剖析和源码解读 http://vdi...

黄亿华
2014/02/08
0
10
Netty 框架

Netty是Reactor模式事件驱动网络框架,Netty不仅用法简单,而且性能和可靠性都很好,所以被很多的中间件作为网络层使用,像dubbo, RocketMQ底层都采用了Netty。 Netty采用的是Reactor模式,由...

AaronSheng
2016/10/14
63
0
Netty与Reactor模式

引言 1.1 Netty、NIO、多线程 关于Netty与NIO、多线程之间的关系,可以参考@李林锋发布的一个Netty5.0架构剖析和源码解读的文章,在这篇文章中详细的介绍了Java I/O的演进过程和Linux I/O网络...

无寄语
2016/09/11
94
0
一个http请求在play框架中的前世今生(下)

上一篇提到了play底层的网络通信基于netty实现,于是粗略地研究了一下netty,总结如下。(netty版本是3.2.5,不同版本的实现可能差异较大) 一、netty的组件 channelBuffer: 传输Buffer和抽象...

2k10
2015/03/20
0
0
高性能网络通信框架Netty-基础概念篇

一、前言 Netty是一种可以轻松快速的开发协议服务器和客户端网络应用程序的NIO框架,它大大简化了TCP或者UDP服务器的网络编程,但是你仍然可以访问和使用底层的API,Netty只是对其进行了高层的...

阿里加多
06/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

RestClientUtil和ConfigRestClientUtil区别说明

RestClientUtil directly executes the DSL defined in the code. ConfigRestClientUtil gets the DSL defined in the configuration file by the DSL name and executes it. RestClientUtil......

bboss
40分钟前
6
0

中国龙-扬科
昨天
2
0
Linux系统设置全局的默认网络代理

更改全局配置文件/etc/profile all_proxy="all_proxy=socks://rahowviahva.ml:80/"ftp_proxy="ftp_proxy=http://rahowviahva.ml:80/"http_proxy="http_proxy=http://rahowviahva.ml:80/"......

临江仙卜算子
昨天
8
0
java框架学习日志-6(bean作用域和自动装配)

本章补充bean的作用域和自动装配 bean作用域 之前提到可以用scope来设置单例模式 <bean id="type" class="cn.dota2.tpye.Type" scope="singleton"></bean> 除此之外还有几种用法 singleton:......

白话
昨天
8
0
在PC上测试移动端网站和模拟手机浏览器的5大方法

总结很全面,保存下来以备不时之需。原文地址:https://www.cnblogs.com/coolfeng/p/4708942.html

kitty1116
昨天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部