MINA,xSocket同样的性能缺陷及陷阱,Grizzly better

原创
2012/10/27 14:22
阅读数 848
本文转自博客:http://www.blogjava.net/adapterofcoms/articles/314560.html
因最近想重构下底层的服务器代码,特意查了下开源框架nio的东西 下面的不错 大家一起学校下 :evil: [url][/url]


MINA,Grizzly[grizzly-nio-framework],xSocket都是基于 java nio的 server framework.
这里的性能缺陷的焦点是指当一条channel上的SelectionKey.OP_READ ready时,1.是由select thread读完数据之后再分发给应用程序的handler,2.还是直接就分发,由handler thread来负责读数据和handle.
mina,xsocket是1. grizzly-nio-framework是2.
尽管读channel buffer中bytes是很快的,但是如果我们放大,当连接channel达到上万数量级,甚至更多,这种延迟响应的效果将会愈加明显.
MINA:
for all selectedKeys 
{
    read data then fireMessageReceived.

xSocket:
for all selectedKeys 
{
    read data ,append it to readQueue then performOnData.

其中mina在fireMessageReceived时没有使用threadpool来分发,所以需要应用程序在handler.messageReceived中再分发.而xsocket的performOnData默认是分发给threadpool[WorkerPool],WorkerPool虽然解决了线程池中的线程不能充到最大的问题[跟tomcat6的做法一样],但是它的调度机制依然缺乏灵活性.
Grizzly:
for all selectedKeys 
{
   [NIOContext---filterChain.execute--->our filter.execute]<------run In DefaultThreadPool
}
grizzly的DefaultThreadPool几乎重写了java util concurrent threadpool,并使用自己的LinkedTransferQueue,但同样缺乏灵活的池中线程的调度机制. 


下面分别是MINA,xSocket,Grizzly的源码分析:
Apache MINA (mina-2.0.0-M6源码为例):
    我们使用mina nio tcp最常用的样例如下:
        NioSocketAcceptor acceptor = new NioSocketAcceptor(/*NioProcessorPool's size*/);
        DefaultIoFilterChainBuilder chain = acceptor.getFilterChain();        
        //chain.addLast("codec", new ProtocolCodecFilter(
                //new TextLineCodecFactory()));
        ......
        // Bind
        acceptor.setHandler(/*our IoHandler*/);
        acceptor.bind(new InetSocketAddress(port));
------------------------------------------------------------------------------------
    首先从NioSocketAcceptor(extends AbstractPollingIoAcceptor)开始,
bind(SocketAddress)--->bindInternal--->startupAcceptor:启动AbstractPollingIoAcceptor.Acceptor.run使用executor[Executor]的线程,注册[interestOps:SelectionKey.OP_ACCEPT],然后wakeup selector.
一旦有连接进来就构建NioSocketSession--对应--channal,然后session.getProcessor().add(session)将当前的channal加入到NioProcessor的selector中去[interestOps:SelectionKey.OP_READ],这样每个连接中有请求过来就由相应的NioProcessor来处理.


这里有几点要说明的是:
1.一个NioSocketAcceptor对应了多个NioProcessor,比如NioSocketAcceptor就使用了SimpleIoProcessorPool DEFAULT_SIZE = Runtime.getRuntime().availableProcessors() + 1.当然这个size在new NioSocketAcceptor的时候可以设定.
2.一个NioSocketAcceptor对应一个java nio selector[OP_ACCEPT],一个NioProcessor也对应一个java nio selector[OP_READ].
3.一个NioSocketAcceptor对应一个内部的AbstractPollingIoAcceptor.Acceptor---thread.
4.一个NioProcessor也对应一个内部的AbstractPollingIoProcessor.Processor---thread.
5.在new NioSocketAcceptor的时候如果你不提供Executor(线程池)的话,那么默认使用Executors.newCachedThreadPool().
这个Executor将被NioSocketAcceptor和NioProcessor公用,也就是说上面的Acceptor---thread(一条)和Processor---thread(多条)都是源于这个Executor.
      当一个连接java nio channal--NioSession被加到ProcessorPool[i]--NioProcessor中去后就转入了AbstractPollingIoProcessor.Processor.run,
AbstractPollingIoProcessor.Processor.run方法是运行在上面的Executor中的一条线程中的,当前的NioProcessor将处理注册在它的selector上的所有连接的请求[interestOps:SelectionKey.OP_READ].


AbstractPollingIoProcessor.Processor.run的主要执行流程:
for (;;) {      
       ......
       int selected = selector(final SELECT_TIMEOUT = 1000L);
       .......
       if (selected > 0) {
          process();
       }
       ......
}


process()-->for all session-channal:OP_READ -->read(session):这个read方法是AbstractPollingIoProcessor.private void read(T session)方法.
read(session)的主要执行流程是read channal-data to buf,if readBytes>0 then IoFilterChain.fireMessageReceived(buf)/*我们的IoHandler.messageReceived将在其中被调用*/;
    到此mina Nio 处理请求的流程已经明了.
    mina处理请求的线程模型也出来了,性能问题也来了,那就是在AbstractPollingIoProcessor.Processor.run-->process-->read(per session)中,在process的时候mina是for all selected-channals 逐次read data再fireMessageReceived到我们的IoHandler.messageReceived中,而不是并发处理,这样一来很明显后来的请求将被延迟处理.
我们假设:如果NioProcessorPool's size=2 现在有200个客户端同时连接过来,假设每个NioProcessor都注册了100个连接,对于每个NioProcessor将依次顺序处理这100个请求,那么这其中的第100个请求要得到处理,那它只有等到前面的99个被处理完了.
    有人提出了改进方案,那就是在我们自己的IoHandler.messageReceived中利用线程池再进行分发dispatching,这个当然是个好主意.
    但是请求还是被延迟处理了,因为还有read data所消耗的时间,这样第100个请求它的数据要被读,就要等前面的99个都被读完才行,即便是增加ProcessorPool的尺寸也不能解决这个问题.
    此外mina的陷阱(这个词较时髦)也出来了,就是在read(session)中,在说这个陷阱之前先说明一下,我们的client端向server端发送一个消息体的时候不一定是完整的只发送一次,可能分多次发送,特别是在client端忙或要发送的消息体的长度较长的时候.而mina在这种情况下就会call我们的IoHandler.messageReceived多次,结果就是消息体被分割了若干份,等于我们在IoHandler.messageReceived中每次处理的数据都是不完整的,这会导致数据丢失,无效.
下面是read(session)的源码:
private void read(T session) {
        IoSessionConfig config = session.getConfig();
        IoBuffer buf = IoBuffer.allocate(config.getReadBufferSize());


        final boolean hasFragmentation =
            session.getTransportMetadata().hasFragmentation();


        try {
            int readBytes = 0;
            int ret;


            try {
                if (hasFragmentation/*hasFragmentation一定为ture,也许mina的开发人员也意识到了传输数据的碎片问题,但是靠下面的处理是远远不够的,因为client一旦间隔发送,ret就可能为0,退出while,不完整的readBytes将被fire*/) {
                    while ((ret = read(session, buf)) > 0) {
                        readBytes += ret;
                        if (!buf.hasRemaining()) {
                            break;
                        }
                    }
                } else {
                    ret = read(session, buf);
                    if (ret > 0) {
                        readBytes = ret;
                    }
                }
            } finally {
                buf.flip();
            }


            if (readBytes > 0) {
                IoFilterChain filterChain = session.getFilterChain(); 
                filterChain.fireMessageReceived(buf);
                buf = null;


                if (hasFragmentation) {
                    if (readBytes << 1 < config.getReadBufferSize()) {
                        session.decreaseReadBufferSize();
                    } else if (readBytes == config.getReadBufferSize()) {
                        session.increaseReadBufferSize();
                    }
                }
            }
            if (ret < 0) {
                scheduleRemove(session);
            }
        } catch (Throwable e) {
            if (e instanceof IOException) {
                scheduleRemove(session);
            }
            IoFilterChain filterChain = session.getFilterChain(); 
            filterChain.fireExceptionCaught(e);
        }
    }
这个陷阱大家可以测试一下,看会不会一个完整的消息被多次发送,你的IoHandler.messageReceived有没有被多次调用.
要保持我们应用程序消息体的完整性也很简单只需创建一个断点breakpoint,然后set it to the current IoSession,一旦消息体数据完整就dispatching it and remove it from the current session.
-------------------------------------------------------------------------------------------------- 
下面以xSocket v2_8_8源码为例:
tcp usage e.g:
IServer srv = new Server(8090, new EchoHandler());
srv.start() or run(); 
-----------------------------------------------------------------------
class EchoHandler implements IDataHandler {   
    public boolean onData(INonBlockingConnection nbc) 
             throws IOException, 
             BufferUnderflowException, 
             MaxReadSizeExceededException {
       String data = nbc.readStringByDelimiter("\r\n");
       nbc.write(data + "\r\n");
       return true;
    }
  }
------------------------------------------------------------------------
说明1.Server:Acceptor:IDataHandler ------1:1:1
Server.run-->IoAcceptor.accept()在port上阻塞,一旦有channel就从IoSocketDispatcherPool中获取一个IoSocketDispatcher,同时构建一个IoSocketHandler和NonBlockingConnection,调用Server.LifeCycleHandler.onConnectionAccepted(ioHandler)  initialize the IoSocketHandler.注意:IoSocketDispatcherPool.size默认为2,也就是说只有2条do select的线程和相应的2个IoSocketDispatcher.这个和MINA的NioProcessor数是一样的.
说明2.IoSocketDispatcher[java nio Selector]:IoSocketHandler:NonBlockingConnection------1:1:1
在IoSocketDispatcher[对应一个Selector].run中--->IoSocketDispatcher.handleReadWriteKeys:
for all selectedKeys 
{
    IoSocketHandler.onReadableEvent/onWriteableEvent.

IoSocketHandler.onReadableEvent的处理过程如下:
1.readSocket();
2.NonBlockingConnection.IoHandlerCallback.onData 
NonBlockingConnection.onData--->appendDataToReadBuffer: readQueue append data
3.NonBlockingConnection.IoHandlerCallback.onPostData
NonBlockingConnection.onPostData--->HandlerAdapter.onData[our dataHandler] performOnData in WorkerPool[threadpool]. 


因为是把channel中的数据读到readQueue中,应用程序的dataHandler.onData会被多次调用直到readQueue中的数据读完为止.所以依然存在类似mina的陷阱.解决的方法依然类似,因为这里有NonBlockingConnection.
----------------------------------------------------------------------------------------------
再下面以grizzly-nio-framework v1.9.18源码为例:
tcp usage e.g:
Controller sel = new Controller();
         sel.setProtocolChainInstanceHandler(new DefaultProtocolChainInstanceHandler(){
             public ProtocolChain poll() {
                 ProtocolChain protocolChain = protocolChains.poll();
                 if (protocolChain == null){
                     protocolChain = new DefaultProtocolChain();
                     //protocolChain.addFilter(our app's filter/*应用程序的处理从filter开始,类似mina.ioHandler,xSocket.dataHandler*/);
                     //protocolChain.addFilter(new ReadFilter());
                 }
                 return protocolChain;
             }
         });
         //如果你不增加自己的SelectorHandler,Controller就默认使用TCPSelectorHandler port:18888
         sel.addSelectorHandler(our app's selectorHandler on special port);         
  sel.start();
------------------------------------------------------------------------------------------------------------
 说明1.Controller:ProtocolChain:Filter------1:1:n,Controller:SelectorHandler------1:n,
SelectorHandler[对应一个Selector]:SelectorHandlerRunner------1:1,
Controller. start()--->for per SelectorHandler start SelectorHandlerRunner to run.
SelectorHandlerRunner.run()--->selectorHandler.select()  then handleSelectedKeys:
for all selectedKeys 
{
   NIOContext.execute:dispatching to threadpool for ProtocolChain.execute--->our filter.execute.



你会发现这里没有read data from channel的动作,因为这将由你的filter来完成.所以自然没有mina,xsocket它们的陷阱问题,分发提前了.但是你要注意SelectorHandler:Selector:SelectorHandlerRunner:Thread[SelectorHandlerRunner.run]都是1:1:1:1,也就是说只有一条线程在doSelect then handleSelectedKeys.


    相比之下虽然grizzly在并发性能上更优,但是在易用性方面却不如mina,xsocket,比如类似mina,xsocket中表示当前连接或会话的IoSession,INonBlockingConnection对象在grizzly中由NIOContext来负责,但是NIOContext并没有提供session/connection lifecycle event,以及常规的read/write操作,这些都需要你自己去扩展SelectorHandler和ProtocolFilter,从另一个方面也可以说明grizzly的可扩展性,灵活性更胜一筹.


///////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////


一般网络通讯程序是,客户端请求,然后服务端回复响应,即key.isReadable()时,服务器端接收请求,key.isWriteable()时,服务端返回响应!一般我们在处理完接收后,我们就注册写事件,然后有写事件时处理写事件,正常网上的实例都是这样写的,可是这样的话,客户端第二次以后发送时,服务器端再也没有进入到读事件中,还写事件回复的数据也很怪异,返回很多重复的数据。


再改进一点的是,写事件完事以后,又注册一个读事件,这样又可以读了,然后读了以后再注册写,这样反复重复,的确,能很好的解决问题了。可是是因为什么原因呢?当时我是这样解决的,凑合用着,但是不知道为什么!


昨天偶尔想起这个问题,然后查看mina源码时,发现其只注册过读事件,而对写事件并没有注册过,只是发现有一些key.interestOps(SelectionKey.OP_WRITE)的操作,不是很明白,后来查看帮忙文档,知道这是管道当前感兴趣的事件时突然有些明白了,后来又查看jdk源码,发现,每当重复注册时,都会检测事件是否已经注册过,如果已经注册过的事件,会把当前感兴趣的事件切换成现在感兴趣的事件,这样所有的都明白了。


即:管道注册事件,可以有四种事件,它可以注册所有的事件,但是管道每次只能对一种事件有效,即注册读事件时,此时只对读事件有效,注册写事件时,此时只对写事件有效,这样上面的笨方法来回注册刚好达到这种效果,多的只是每次注册时的一次判断是否已经注册过。当然我们也有更好的方法,那就是通过key.interestOps方法来切换当前感兴趣的事件,这样就可以避免每次注册时都需要判断了。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部