Jetty源码-Server-Connector
Jetty源码-Server-Connector
robin-yao 发表于3年前
Jetty源码-Server-Connector
  • 发表于 3年前
  • 阅读 64
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

摘要: 主要介绍Jetty Server 用到的Connector ServerConnector AbstractConnector

    Jetty Server两个重要的概念一个是Handler,一个是Connector。 我们在内嵌Jetty启动应用时往往会通过WebAppContext来设置上下文,WebAppContext就是一个Handler,我们可以通过Server.setHandler()来赋值我们的上下文。本文主要介绍Connector。它的抽象实现类是AbstractConnector,然后ServerConnector继承了AbstractConnector,这也是重点要介绍的两个类。连接器故名思议主要是用来处理各种类型的请求包括HTTP, HTTP/2 ,WebSocket等等。Jetty 9采用NIO来处理请求,去掉了BIO模块。

    下边先贴出一段实用的Jetty内嵌启动代码,方便我们理解Handler和Connector。采用的是9.3版本的jetty jar

/** jetty pom
<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-server</artifactId>
    <version>9.3.0.M0</version>
</dependency>
<dependency>
    <groupId>org.eclipse.jetty</groupId>
    <artifactId>jetty-webapp</artifactId>
    <version>9.3.0.M0</version>
</dependency>
**/

//内嵌代码------------------

Server server = new Server();
// connector
ServerConnector connector = new ServerConnector(server);
connector.setPort(port);
server.setConnectors(new Connector[] { connector });
//context
WebAppContext context = new WebAppContext();
context.setContextPath("/");
//设定webapp,具体的路径根据自己应用来调整
context.setResourceBase("./src/main/webapp");
context.setClassLoader(Thread.currentThread().getContextClassLoader());
context.setParentLoaderPriority(true);
server.setHandler(context);
server.start();
server.join();
//-----------------------------------------

    AbstractConnector是Connector的实现类,它提供了ConnectionFactory机制来创建各种协议(HTTP, SSL)的实例。AbstractConnector封装了几个重要的属性:

         Executor:线程池,用来处理接收的连接;

         Scheduler:用来监视连接的超时;

         ByteBufferPool:ByteBuffer缓冲池,减少对ByteBuffer的创建开销,循环利用ByteBuffer;

         ConnectionFactory: 创建各种协议的工厂,包含 SslConnectionFactory,HttpConnectionFactory等等;

         Acceptors:来创建接收器的,监听端口过来的请求。AbstractConnector里面 有个 final Thread[] _acceptors  变量 ,这个变量是个线程数组,个数代表你需要创建并发的接收器,多个接收器调用同一个ServerSocketChannel实例上的accept方法,来实现高效的接收请求。AbstractConnector在doStart方法中初始化Acceptor。

protected void doStart() throws Exception
{
    _defaultConnectionFactory = getConnectionFactory(_defaultProtocol);
 
    _stopping=new CountDownLatch(_acceptors.length);
    for (int i = 0; i < _acceptors.length; i++)
    {
        Acceptor a = new Acceptor(i);
        addBean(a);
        getExecutor().execute(a);
        //初始化Acceptor就直接放到线程池去执行。        
    }
}

    下边我们分析一下Acceptor这个AbstractConnector内部类:

private class Acceptor implements Runnable
{
    private final int _id;
    private String _name;
    private Acceptor(int id)
    {
        _id = id;
    }

    @Override
    public void run()
    {    
        //刚刚我们把它放到线程池里,它会取当前执行它的线程放到AbstractConnector中的_acceptors线程数组中
        //主要是来判断acceptor是否都停掉 _stopping=new CountDownLatch(_acceptors.length)。
        //
        final Thread thread = Thread.currentThread();
        String name=thread.getName();
        _name=String.format("%s-acceptor-%d@%x-%s",name,_id,hashCode(),AbstractConnector.this.toString());
        thread.setName(_name);
        
        int priority=thread.getPriority();
        if (_acceptorPriorityDelta!=0)
            thread.setPriority(Math.max(Thread.MIN_PRIORITY,Math.min(Thread.MAX_PRIORITY,priority+_acceptorPriorityDelta)));

        synchronized (AbstractConnector.this)
        {    
            _acceptors[_id] = thread;
        }

        try
        {
            while (isAccepting())
            {
                try
                {    
                    //!!!这是个抽象方法由具体的实现类ServerConnector来实现。!!!
                    accept(_id);
                }
                catch (Throwable e)
                {
                    if (isAccepting())
                        LOG.warn(e);
                    else
                        LOG.ignore(e);
                }
            }
        }
        finally
        {
            thread.setName(name);
            if (_acceptorPriorityDelta!=0)
                thread.setPriority(priority);

            synchronized (AbstractConnector.this)
            {
                _acceptors[_id] = null;
            }
            CountDownLatch stopping=_stopping;
            if (stopping!=null)
                stopping.countDown();
        }
    }
    
    @Override
    public String toString()
    {
        String name=_name;
        if (name==null)
            return String.format("acceptor-%d@%x", _id, hashCode());
        return name;
    }
    
}

 下边我们具体介绍accept方法在ServerConnector中的具体实现,ServerConnector有两个重要的方法 open和accept

下边是具体的代码:

//主要是初始化ServerSocketChannel,绑定监听端口。剩下的接收请求交给accept方法来做。
public void open() throws IOException
{
    if (_acceptChannel == null)
    {
        ServerSocketChannel serverChannel = null;
        if (isInheritChannel())
        {
            Channel channel = System.inheritedChannel();
            if (channel instanceof ServerSocketChannel)
                serverChannel = (ServerSocketChannel)channel;
        }

        if (serverChannel == null)
        {
            serverChannel = ServerSocketChannel.open();

            InetSocketAddress bindAddress = getHost() == null ? new InetSocketAddress(getPort()) : new InetSocketAddress(getHost(), getPort());
            serverChannel.socket().setReuseAddress(getReuseAddress());
            serverChannel.socket().bind(bindAddress, getAcceptQueueSize());

            _localPort = serverChannel.socket().getLocalPort();
            if (_localPort <= 0)
                throw new IOException("Server channel not bound");

            addBean(serverChannel);
        }

        serverChannel.configureBlocking(true);
        addBean(serverChannel);

        _acceptChannel = serverChannel;
    }
}
//这个方法由AbstractConnector中的accepters调用,并发的处理接收请求。
@Override
public void accept(int acceptorID) throws IOException
{
    ServerSocketChannel serverChannel = _acceptChannel;
    if (serverChannel != null && serverChannel.isOpen())
    {
        SocketChannel channel = serverChannel.accept();
        accepted(channel);
    }
}

private void accepted(SocketChannel channel) throws IOException
{
    channel.configureBlocking(false);
    Socket socket = channel.socket();
    configure(socket);
    //交给SelectorManager来对请求过来的Channel处理
    _manager.accept(channel);
}

protected void configure(Socket socket)
{
    try
    {    
        //禁掉TCP延迟 negle算法
        socket.setTcpNoDelay(true);
        if (_lingerTime >= 0)
            socket.setSoLinger(true, _lingerTime / 1000);
        else
            socket.setSoLinger(false, 0);
    }
    catch (SocketException e)
    {
        LOG.ignore(e);
    }
}

       Connector就介绍到这里,主要是封装了ServerSocketChannel,在多线程环境下监听过来的请求,转发请求给SelectorManager。

转发标注来源:http://my.oschina.net/robinyao/blog/404687

    END------------------------------------------------------------------

共有 人打赏支持
粉丝 131
博文 53
码字总数 60598
×
robin-yao
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: