文档章节

Jetty源码-IO-ManagedSelector

robin-yao
 robin-yao
发布于 2015/04/19 22:16
字数 1366
阅读 70
收藏 0

    本文主要介绍Jetty对NIO的封装。

    jetty对NIO的的封装主要包含几个重要的类:SelectorManager,ManagedSelector,SelectChannelEndPoint,

ExecutionStrategy,ExecuteProduceRun。下面对这几个类做详细介绍,及这几个类是如何窜连起来工作,处理IO事件的。

    在介绍这几个类之前,先介绍Jetty中的LifeCycle,实现了改LifeCycle接口的类我们可以把它当作一个组件(compent)来看待,里面的接口包含了start,stop及对组件状态(isRunning,isStarted,isTopped)的判断,同时我们可以往上面注册某些状态监听方法。

     先介绍最重要的类ManagedSelector,封装了JDK NIO中的Selector,它同时实现了Runable接口。 ManagedSelector成员变量有一个Selector和任务Queue,该任务队列放置的都是Runnable类型的任务。这Queue很重要,用来盛放通过ManagedSelector.submit方法提交上来的action,一般该方法由SelectorManager来调用。主要有三种类型的action:

    1.Acceptor,用来注册监听类型的Key OP_ACCEPT

    2.Accept,主要是把SocketChannel注册到selector上,用来读写操作;

    3.Connect,用来注册Key OP_CONNECT,用来判断连接是否就绪;

一旦有以上三个任务提交上来,该 selector上的select操作都应该被wakeup。下面贴出三个类型的action与sumit方法,这个三个类型都是ManagedSelector内部类。

class Acceptor implements Runnable
{
    private final ServerSocketChannel _channel;

    public Acceptor(ServerSocketChannel channel)
    {
        this._channel = channel;
    }

    @Override
    public void run()
    {
        try
        {
            SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
            if (LOG.isDebugEnabled())
                LOG.debug("{} acceptor={}", this, key);
        }
        catch (Throwable x)
        {
            closeNoExceptions(_channel);
            LOG.warn(x);
        }
    }
}

class Accept implements Runnable
{
    private final SocketChannel channel;
    private final Object attachment;

    Accept(SocketChannel channel, Object attachment)
    {
        this.channel = channel;
        this.attachment = attachment;
    }

    @Override
    public void run()
    {
        try
        {
            final SelectionKey key = channel.register(_selector, 0, attachment);
            submit(new CreateEndPoint(channel, key));
        }
        catch (Throwable x)
        {
            closeNoExceptions(channel);
            LOG.debug(x);
        }
    }
}
class Connect implements Runnable
{
    private final AtomicBoolean failed = new AtomicBoolean();
    private final SocketChannel channel;
    private final Object attachment;
    private final Scheduler.Task timeout;

    Connect(SocketChannel channel, Object attachment)
    {
        this.channel = channel;
        this.attachment = attachment;
        this.timeout = ManagedSelector.this._selectorManager.getScheduler().schedule(new ConnectTimeout(this), ManagedSelector.this._selectorManager.getConnectTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void run()
    {
        try
        {
            channel.register(_selector, SelectionKey.OP_CONNECT, this);
        }
        catch (Throwable x)
        {
            failed(x);
        }
    }

    private void failed(Throwable failure)
    {
        if (failed.compareAndSet(false, true))
        {
            timeout.cancel();
            closeNoExceptions(channel);
            ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment);
        }
    }
}

//submit方法:
public void submit(Runnable change)
{
    if (LOG.isDebugEnabled())
        LOG.debug("Queued change {} on {}", change, this);
    try (SpinLock.Lock lock = _lock.lock())
    {    //提交action
        _actions.offer(change);
        if (_selecting)
        {
            Selector selector = _selector;
            if (selector != null)
                //唤醒阻塞的select操作
                selector.wakeup();
            // To avoid the extra select wakeup.
            _selecting = false;
        }
    }
}


还有一个重要的成员变量是执行策略类(ExecutionStrategy),该类的作用是jetty对它的注释是:ExecutionStrategy执行由Producer生产出的runnable任务,任务执行的策略根据实现的不同,或许在调用线程中直接执行或者另起一个新的线程来执行。ExecutionStrategy这里调用的生产者就是ManagedSelector内部类SelectorProducer,该内部类实现了.Producer接口。下面我们可以看看ManagedSelector.SelectorProducer的实现的一些方法:

//ManagedSelector.SelectorProducer,主要用来生产任务。
private class SelectorProducer implements ExecutionStrategy.Producer{
    private Set<SelectionKey> _keys = Collections.emptySet();
    private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
    
    @Override
    public Runnable produce()
    {
        while (true)
        {
            //处理选中事件,如果有任务生成直接返回。
            //注意这里叫作task,要和action区分开来,不用弄混了,
            //这里的task都是根据selector上的相应的就绪key产生的读/写/连接task。
            Runnable task = processSelected();
            if (task != null)
                return task;
            //如果没有task产生,从actions中取任务
            //这里返回的action都是实现了Product,不会改变selector上的感兴趣事件。(待确认)
            //交由ExecutionStrategy去执行。
            Runnable action = runActions();
            if (action != null)
                return action;
            //更新相应变化的感兴趣Key
            update();
            //select,获取感兴趣的事件。
            if (!select())
                return null;
        }
    }
    private Runnable processSelected()
    {
        while (_cursor.hasNext())
        {
            SelectionKey key = _cursor.next();
            if (key.isValid())
            {
                Object attachment = key.attachment();
                try
                {
                    if (attachment instanceof SelectableEndPoint)
                    {
                        // Try to produce a task
                        //会产生读或写任务,或者读写任务。
                        SelectableEndPoint selectable = (SelectableEndPoint)attachment;
                        Runnable task = selectable.onSelected();
                        if (task != null)
                            return task;
                    }
                    else if (key.isConnectable())
                    {
                        Runnable task = processConnect(key, (Connect)attachment);
                        if (task != null)
                            return task;
                    }
                    else if (key.isAcceptable())
                    {
                        processAccept(key);
                    }
                    else
                    {
                        throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps());
                    }
                }
                catch (CancelledKeyException x)
                {
                    if (attachment instanceof org.eclipse.jetty.io.EndPoint)
                        closeNoExceptions((EndPoint)attachment);
                }
                catch (Throwable x)
                {
                    if (attachment instanceof org.eclipse.jetty.io.EndPoint)
                        closeNoExceptions((EndPoint)attachment);
                }
            }
            else
            {
                Object attachment = key.attachment();
                if (attachment instanceof EndPoint)
                    closeNoExceptions((EndPoint)attachment);
            }
        }
        return null;
    }

    /**
     *
     * @return
     */
    private Runnable runActions()
    {
        //循环处理任务队列中的任务,直到任务完全被处理完。
        while (true)
        {
            Runnable action;
            try (SpinLock.Lock lock = _lock.lock())
            {
                action = _actions.poll();
                if (action == null)
                {
                    // No more actions, so we need to select
                    _selecting = true;
                    return null;
                }
            }
            //***重点*** 如果是生产者生成的任务,直接返回,交给 ExecutionStrategy来执行
            if (action instanceof Product)
                return action;
            // Running the change may queue another action.
            // 执行任务,或许会产生新的任务入队。
            runChange(action);
        }
    }

    private void runChange(Runnable change)
    {
        try
        {
            change.run();
        }
        catch (Throwable x)
        {
            x.printStackTrace();
        }
    }
    private void update()
    {
        for (SelectionKey key : _keys)
            updateKey(key);
        _keys.clear();
    }
    private void updateKey(SelectionKey key)
    {
        Object attachment = key.attachment();
        if (attachment instanceof SelectableEndPoint)
            ((SelectableEndPoint)attachment).updateKey();
    }
    //重新进行选择。。。。。。
    private boolean select()
    {
        try
        {
            Selector selector = _selector;
            if (selector != null && selector.isOpen())
            {
                int selected = selector.select();
                try (SpinLock.Lock lock = _lock.lock())
                {
                    // finished selecting
                    _selecting = false;
                }
                _keys = selector.selectedKeys();
                _cursor = _keys.iterator();

                return true;
            }
        }
        catch (Throwable x)
        {
            closeNoExceptions(_selector);
        }
        return false;
    }
}

接着看ManagedSelector里的run()方法,它直接调用_strategy.execute();这里的ExecutionStrategy具体的execute实现是ExecuteProduceRun。这里不帖代码,篇幅太长了。主要是循环的调用_producer.produce()产生任务,并执行任务;

    最后介绍SelectorManager,主要是用来管理ManagedSelector。它主要是保存了对ManagedSelector数组的引用,同时也引用了一个线程池,用来执行ManagedSelector。SelectorManager实现了 LifeCycle接口,它在doStart方法中对ManagedSelector数组初始化,并在线程池里执行初始化好的ManagedSelector,具体代码如下:

protected void doStart() throws Exception
{
    super.doStart();
    for (int i = 0; i < _selectors.length; i++)
    {
        ManagedSelector selector = newSelector(i);
        _selectors[i] = selector;
        selector.start();
        execute(selector);
    }
}

   还有写地方理解不是很深入,有些地方理解的可能不正确,大家结合源码校验下。。。。。。。待续

 转发请标注来源: http://my.oschina.net/robinyao/blog/403615

END-------------------------------




© 著作权归作者所有

共有 人打赏支持
robin-yao
粉丝 161
博文 54
码字总数 61436
作品 0
杭州
私信 提问
eclipse使用maven tomcat插件部署无法关联源代码

eclipse maven 起服务debug无法关联源码 博客分类: maven eclipse 1. 安装sourcelookup插件: update site: 1) http://bjmi.github.io/update-site/ (3.8.2可用,后续描述均针对此插件) 2) ht...

赵作文
2015/10/20
246
0
Jetty9 源码初解(2)——IO之EndPoint

一、概述 EndPoint作为jetty-io的一个重要组成部分,是基于javaNIO的封装,用于底层网络的读写,一旦网络读写准备好,会调用相应的connection的handle方法。 二、类分析 EndPoint源码如下: ...

戴的天
2015/10/28
0
0
Jetty源码-IO-BufferUtil

jetty 源码 BufferUtil 工具类主要是封装了对JDK ByteBuffer的操作。通过BufferUtil可以更容易的操作ByteBuffer. 由于ByteBuffer分为fill模式即写模式、flush模式即读模式,我们经常会在读写...

robin-yao
2015/04/16
0
0
从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式(一)

从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式(一) 收藏 如何正确使用NIO来构架网络服务器一直是最近思考的一个问题,于是乎分析了一下Jetty、Tomcat和Mina有关NIO的源码,发现...

光石头
2011/02/20
0
0
从Jetty、Tomcat和Mina中提炼NIO构架网络服务器的经典模式(一)

如何正确使用NIO来构架网络服务器一直是最近思考的一个问题,于是乎分析了一下Jetty、Tomcat和Mina有关NIO的源码,发现大伙都基于类似的方式,我感觉这应该算是NIO构架网络服务器的经典模式,...

山哥
2012/03/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Navicat怎样导入Excel表格和txt文本的数据

Navicat怎样导入Excel表格和txt文本的数据 2018年07月02日 11:29:11 零碎de記憶 阅读数:2433 版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq_39135287/ar...

linjin200
7分钟前
0
0
使用MaxCompute Java SDK运行安全相关命令

使用MaxCompute Console的同学,可能都使用过MaxCompute安全相关的命令。官方文档上有详细的MaxCompute 安全指南 ,并给出了安全相关语句汇总 。 简而言之, 权限管理 、 列级别访问控制 、 ...

阿里云云栖社区
12分钟前
0
0
中小公司的Java工程师应该如何逆袭冲进BAT?

(1)80% Java工程师都有的迷茫 这篇文章,跟大家聊一聊很多很多很多人问我的一个问题:中小公司的Java工程师应该如何规划准备,才能跳槽进入BAT这类一线互联网公司? 之所以我用了三个 “很...

Java填坑路
12分钟前
1
0
你的应用够安全吗?绿标2.0隐私权限详解

近日,最新一期的《绿色应用达标率调查报告》结果显示,应用在安全方面的通过率仅为57%,相较于其他四项标准通过率最低。其中隐私权限的过度获取是主要原因之一,需要开发者尽快完成整改。 ...

安卓绿色联盟
22分钟前
0
0
使用MaxCompute Java SDK运行安全相关命令

使用MaxCompute Console的同学,可能都使用过MaxCompute安全相关的命令。官方文档上有详细的MaxCompute安全指南,并给出了安全相关语句汇总。 简而言之,权限管理、列级别访问控制、项目空间...

阿里云官方博客
27分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部