文档章节

Jetty源码-IO-ManagedSelector

robin-yao
 robin-yao
发布于 2015/04/19 22:16
字数 1366
阅读 64
收藏 0

    本文主要介绍Jetty对NIO的封装。

    jetty对NIO的的封装主要包含几个重要的类:SelectorManager,ManagedSelector,SelectChannelEndPoint,

ExecutionStrategy,ExecuteProduceRun。下面对这几个类做详细介绍,及这几个类是如何窜连起来工作,处理IO事件的。

    在介绍这几个类之前,先介绍Jetty中的LifeCycle,实现了改LifeCycle接口的类我们可以把它当作一个组件(compent)来看待,里面的接口包含了start,stop及对组件状态(isRunning,isStarted,isTopped)的判断,同时我们可以往上面注册某些状态监听方法。

     先介绍最重要的类ManagedSelector,封装了JDK NIO中的Selector,它同时实现了Runable接口。 ManagedSelector成员变量有一个Selector和任务Queue,该任务队列放置的都是Runnable类型的任务。这Queue很重要,用来盛放通过ManagedSelector.submit方法提交上来的action,一般该方法由SelectorManager来调用。主要有三种类型的action:

    1.Acceptor,用来注册监听类型的Key OP_ACCEPT

    2.Accept,主要是把SocketChannel注册到selector上,用来读写操作;

    3.Connect,用来注册Key OP_CONNECT,用来判断连接是否就绪;

一旦有以上三个任务提交上来,该 selector上的select操作都应该被wakeup。下面贴出三个类型的action与sumit方法,这个三个类型都是ManagedSelector内部类。

class Acceptor implements Runnable
{
    private final ServerSocketChannel _channel;

    public Acceptor(ServerSocketChannel channel)
    {
        this._channel = channel;
    }

    @Override
    public void run()
    {
        try
        {
            SelectionKey key = _channel.register(_selector, SelectionKey.OP_ACCEPT, null);
            if (LOG.isDebugEnabled())
                LOG.debug("{} acceptor={}", this, key);
        }
        catch (Throwable x)
        {
            closeNoExceptions(_channel);
            LOG.warn(x);
        }
    }
}

class Accept implements Runnable
{
    private final SocketChannel channel;
    private final Object attachment;

    Accept(SocketChannel channel, Object attachment)
    {
        this.channel = channel;
        this.attachment = attachment;
    }

    @Override
    public void run()
    {
        try
        {
            final SelectionKey key = channel.register(_selector, 0, attachment);
            submit(new CreateEndPoint(channel, key));
        }
        catch (Throwable x)
        {
            closeNoExceptions(channel);
            LOG.debug(x);
        }
    }
}
class Connect implements Runnable
{
    private final AtomicBoolean failed = new AtomicBoolean();
    private final SocketChannel channel;
    private final Object attachment;
    private final Scheduler.Task timeout;

    Connect(SocketChannel channel, Object attachment)
    {
        this.channel = channel;
        this.attachment = attachment;
        this.timeout = ManagedSelector.this._selectorManager.getScheduler().schedule(new ConnectTimeout(this), ManagedSelector.this._selectorManager.getConnectTimeout(), TimeUnit.MILLISECONDS);
    }

    @Override
    public void run()
    {
        try
        {
            channel.register(_selector, SelectionKey.OP_CONNECT, this);
        }
        catch (Throwable x)
        {
            failed(x);
        }
    }

    private void failed(Throwable failure)
    {
        if (failed.compareAndSet(false, true))
        {
            timeout.cancel();
            closeNoExceptions(channel);
            ManagedSelector.this._selectorManager.connectionFailed(channel, failure, attachment);
        }
    }
}

//submit方法:
public void submit(Runnable change)
{
    if (LOG.isDebugEnabled())
        LOG.debug("Queued change {} on {}", change, this);
    try (SpinLock.Lock lock = _lock.lock())
    {    //提交action
        _actions.offer(change);
        if (_selecting)
        {
            Selector selector = _selector;
            if (selector != null)
                //唤醒阻塞的select操作
                selector.wakeup();
            // To avoid the extra select wakeup.
            _selecting = false;
        }
    }
}


还有一个重要的成员变量是执行策略类(ExecutionStrategy),该类的作用是jetty对它的注释是:ExecutionStrategy执行由Producer生产出的runnable任务,任务执行的策略根据实现的不同,或许在调用线程中直接执行或者另起一个新的线程来执行。ExecutionStrategy这里调用的生产者就是ManagedSelector内部类SelectorProducer,该内部类实现了.Producer接口。下面我们可以看看ManagedSelector.SelectorProducer的实现的一些方法:

//ManagedSelector.SelectorProducer,主要用来生产任务。
private class SelectorProducer implements ExecutionStrategy.Producer{
    private Set<SelectionKey> _keys = Collections.emptySet();
    private Iterator<SelectionKey> _cursor = Collections.emptyIterator();
    
    @Override
    public Runnable produce()
    {
        while (true)
        {
            //处理选中事件,如果有任务生成直接返回。
            //注意这里叫作task,要和action区分开来,不用弄混了,
            //这里的task都是根据selector上的相应的就绪key产生的读/写/连接task。
            Runnable task = processSelected();
            if (task != null)
                return task;
            //如果没有task产生,从actions中取任务
            //这里返回的action都是实现了Product,不会改变selector上的感兴趣事件。(待确认)
            //交由ExecutionStrategy去执行。
            Runnable action = runActions();
            if (action != null)
                return action;
            //更新相应变化的感兴趣Key
            update();
            //select,获取感兴趣的事件。
            if (!select())
                return null;
        }
    }
    private Runnable processSelected()
    {
        while (_cursor.hasNext())
        {
            SelectionKey key = _cursor.next();
            if (key.isValid())
            {
                Object attachment = key.attachment();
                try
                {
                    if (attachment instanceof SelectableEndPoint)
                    {
                        // Try to produce a task
                        //会产生读或写任务,或者读写任务。
                        SelectableEndPoint selectable = (SelectableEndPoint)attachment;
                        Runnable task = selectable.onSelected();
                        if (task != null)
                            return task;
                    }
                    else if (key.isConnectable())
                    {
                        Runnable task = processConnect(key, (Connect)attachment);
                        if (task != null)
                            return task;
                    }
                    else if (key.isAcceptable())
                    {
                        processAccept(key);
                    }
                    else
                    {
                        throw new IllegalStateException("key=" + key + ", att=" + attachment + ", iOps=" + key.interestOps() + ", rOps=" + key.readyOps());
                    }
                }
                catch (CancelledKeyException x)
                {
                    if (attachment instanceof org.eclipse.jetty.io.EndPoint)
                        closeNoExceptions((EndPoint)attachment);
                }
                catch (Throwable x)
                {
                    if (attachment instanceof org.eclipse.jetty.io.EndPoint)
                        closeNoExceptions((EndPoint)attachment);
                }
            }
            else
            {
                Object attachment = key.attachment();
                if (attachment instanceof EndPoint)
                    closeNoExceptions((EndPoint)attachment);
            }
        }
        return null;
    }

    /**
     *
     * @return
     */
    private Runnable runActions()
    {
        //循环处理任务队列中的任务,直到任务完全被处理完。
        while (true)
        {
            Runnable action;
            try (SpinLock.Lock lock = _lock.lock())
            {
                action = _actions.poll();
                if (action == null)
                {
                    // No more actions, so we need to select
                    _selecting = true;
                    return null;
                }
            }
            //***重点*** 如果是生产者生成的任务,直接返回,交给 ExecutionStrategy来执行
            if (action instanceof Product)
                return action;
            // Running the change may queue another action.
            // 执行任务,或许会产生新的任务入队。
            runChange(action);
        }
    }

    private void runChange(Runnable change)
    {
        try
        {
            change.run();
        }
        catch (Throwable x)
        {
            x.printStackTrace();
        }
    }
    private void update()
    {
        for (SelectionKey key : _keys)
            updateKey(key);
        _keys.clear();
    }
    private void updateKey(SelectionKey key)
    {
        Object attachment = key.attachment();
        if (attachment instanceof SelectableEndPoint)
            ((SelectableEndPoint)attachment).updateKey();
    }
    //重新进行选择。。。。。。
    private boolean select()
    {
        try
        {
            Selector selector = _selector;
            if (selector != null && selector.isOpen())
            {
                int selected = selector.select();
                try (SpinLock.Lock lock = _lock.lock())
                {
                    // finished selecting
                    _selecting = false;
                }
                _keys = selector.selectedKeys();
                _cursor = _keys.iterator();

                return true;
            }
        }
        catch (Throwable x)
        {
            closeNoExceptions(_selector);
        }
        return false;
    }
}

接着看ManagedSelector里的run()方法,它直接调用_strategy.execute();这里的ExecutionStrategy具体的execute实现是ExecuteProduceRun。这里不帖代码,篇幅太长了。主要是循环的调用_producer.produce()产生任务,并执行任务;

    最后介绍SelectorManager,主要是用来管理ManagedSelector。它主要是保存了对ManagedSelector数组的引用,同时也引用了一个线程池,用来执行ManagedSelector。SelectorManager实现了 LifeCycle接口,它在doStart方法中对ManagedSelector数组初始化,并在线程池里执行初始化好的ManagedSelector,具体代码如下:

protected void doStart() throws Exception
{
    super.doStart();
    for (int i = 0; i < _selectors.length; i++)
    {
        ManagedSelector selector = newSelector(i);
        _selectors[i] = selector;
        selector.start();
        execute(selector);
    }
}

   还有写地方理解不是很深入,有些地方理解的可能不正确,大家结合源码校验下。。。。。。。待续

 转发请标注来源: http://my.oschina.net/robinyao/blog/403615

END-------------------------------




© 著作权归作者所有

共有 人打赏支持
robin-yao
粉丝 151
博文 54
码字总数 61496
作品 0
杭州
eclipse使用maven tomcat插件部署无法关联源代码

eclipse maven 起服务debug无法关联源码 博客分类: maven eclipse 1. 安装sourcelookup插件: update site: 1) http://bjmi.github.io/update-site/ (3.8.2可用,后续描述均针对此插件) 2) ht...

赵作文
2015/10/20
246
0
Jetty9 源码初解(2)——IO之EndPoint

一、概述 EndPoint作为jetty-io的一个重要组成部分,是基于javaNIO的封装,用于底层网络的读写,一旦网络读写准备好,会调用相应的connection的handle方法。 二、类分析 EndPoint源码如下: ...

戴的天
2015/10/28
0
0
Jetty源码-IO-BufferUtil

jetty 源码 BufferUtil 工具类主要是封装了对JDK ByteBuffer的操作。通过BufferUtil可以更容易的操作ByteBuffer. 由于ByteBuffer分为fill模式即写模式、flush模式即读模式,我们经常会在读写...

robin-yao
2015/04/16
0
0
Spring Boot 2.0.0.M6 发布,初步支持 HTTP/2

Spring Boot 2.0.0.M6 已发布。该版本关闭了 144 个 issue 和 PR,继续向 2.0 GA 迈进。 该里程碑版改进了之前版本中的一些项目,也新增了一些特性,具体如下: 初步支持 HTTP/2:目前在 To...

淡漠悠然
2017/11/06
5.7K
23
jfinal 使用jetty启动报错 java.io.EOFException

之前都没有问题,就是前几天突然出现的,很奇怪,网上搜也没有相应的错误,望大神们指导指导 [WARN]-[Thread: main]-[org.eclipse.jetty.server.session.HashSessionManager.restoreSession(...

walala_Lee
2013/04/23
960
2

没有更多内容

加载失败,请刷新页面

加载更多

下一页

spring 容器实现对bean的管理(注解方式解析,源码阅读)

因为最近在研究学习spring boot,所以这里想详细学习回顾了一下spring 容器对bean的一些管理方式和部分源码学习。 首先初始类AnnotationConfigApplicationContext,简单源码查看,支持两个参...

小海bug
14分钟前
0
0
数据结构:二分查找 java

二分查找的前提是有序存储,利用顺序存储和元素排序 /** * 二分查找,查找成功,返回下标记 * @param values * @param begin * @param end * @param key * @param <T> * @ret...

京一
31分钟前
0
0
@SpringBootApplication 注解

@SpringBootApplication注解是一个组合注解,包含以下注解 @Target(ElementType.TYPE) 注解的作用目标 @Retention(RetentionPolicy.RUNTIME) Reteniton的作用是定义被它所注解的注解保留多久,...

java.刘
45分钟前
0
0
sentinel自定义DataSource实战

序 本文主要研究一下如何自定义sentinel的DataSource,这里以jdbc为例。 maven <dependency><groupId>org.springframework.cloud</groupId><artifactId>spring-cloud-starter-sen......

go4it
今天
1
0
xgboost/gbdt在调参时为什么树的深度很少就能达到很高的精度?

问题: 用xgboost/gbdt在在调参的时候把树的最大深度调成6就有很高的精度了。但是用DecisionTree/RandomForest的时候需要把树的深度调到15或更高。用RandomForest所需要的树的深度和Decisio...

tantexian
今天
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部