Jetty源码-Continuation
Jetty源码-Continuation
robin-yao 发表于3年前
Jetty源码-Continuation
  • 发表于 3年前
  • 阅读 31
  • 收藏 1
  • 点赞 0
  • 评论 0
摘要: jetty 异步请求 Continuation

    本文主要介绍Jetty 异步的请求。Jetty Contination可以用来处理大量的时间比较长的连接请求。

    http异步请求在Servlet3中已经实现,使用十分方便,通过Request获取异步AsyncContext,然后在上面注册自己的异步监听器AsynListener即可。Tomcat7 和Jetty8以上版本等主流的容器都支持Servlet3的异步请求。具体介绍见http://www.importnew.com/8864.html,详细的Demohttps://github.com/WangErXiao/Servlet3-Async。这里不多介绍Servlet3的异步请求。

    Jetty的异步请求的实现模块主要是Continuation。通过continuation机制,HTTP 请求可以被挂起,超时或者异步事件发生后重新开始。Continuation接口包含两个重要的方法是suspend()和resume()。

    当调用continuation.suspend()方法时,会把当前servlet的request和相关的response挂起,request的生命周期将被从Servlet.service(ServletRequest, ServletResponse)扩展到容器,response将不会被提交除非有ContinuationThrowable抛出。调用完suspend方法,注册完异步的handler,直接调用return从Servlet.service方法返回,当前的线程就可以去处理其他任务了。

    任务处理完之后,通过注册好的回调方法调用continuation.resume()。重新用原来的request和reponse发起请求,并从request中获取结果,通过response返回。

    下边是具体的Demo代码:

public class SimpleSuspendResumeServlet extends HttpServlet {
    private MyAsyncHandler myAsyncHandler;
    //初始化异步处理器
    public void init() throws ServletException {
        myAsyncHandler = new MyAsyncHandler() {
            public void register(final MyHandler myHandler) {
                new Thread(new Runnable() {
                    public void run() {
                        try {
                            Thread.sleep(10000);
                            //设置结果,调用resume方法
                            myHandler.onMyEvent("complete!");
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }

                    }
                }).start();
            }
        };

    }

    public void doGet(HttpServletRequest request, HttpServletResponse response)
            throws ServletException, IOException {
        final PrintWriter writer = response.getWriter();
        final Continuation continuation = ContinuationSupport
                .getContinuation(request);
        if (continuation.isInitial()) {
            sendMyFirstResponse(response);
            continuation.suspend(); // always suspend before registration
            myAsyncHandler.register(new MyHandler() {
                public void onMyEvent(Object result) {
                    continuation.setAttribute("results", result);
                    continuation.resume();
                }
            });
            return; // 然后该线程就可以去处理其他任务了
        }

        if (continuation.isExpired()) {
            sendMyTimeoutResponse(response);
            return;
        }
        //Send the results
        Object results = request.getAttribute("results");
        if(results==null){
            response.getWriter().write("why reach here??");
            continuation.resume();
            return;
        }
        sendMyResultResponse(response, results);
    }

    private interface MyAsyncHandler {
        public void register(MyHandler myHandler);
    }

    private interface MyHandler {
        public void onMyEvent(Object result);
    }

    private void sendMyFirstResponse(HttpServletResponse response) throws IOException {
        response.setContentType("text/html");
        response.getWriter().write("start---------");
        response.getWriter().flush();
    }

    private void sendMyResultResponse(HttpServletResponse response,
                                      Object results) throws IOException {
        response.getWriter().write("results:" + results);
        response.getWriter().flush();

    }
    private void sendMyTimeoutResponse(HttpServletResponse response)
            throws IOException {
        response.getWriter().write("timeout");
    }
}

    上面代码中的Support,是工厂类,用来生成Continuation。如果当前容器采用的是Servlet3,返回Servlet3Continuation类型的,如果不是返回FauxContinuation类型的。Servlet3Continuation顾名思义是借助Servlet3里的异步机制来实现的,底层的具体实现还是由具体的容器实现,tomcat7,jetty8以上版本都实现Servlet3异步请求功能。FauxContinuation是Continuation阻塞实现方式,主要是针对才非Servlet3的容器,通过ContinuationFilter过滤器,把FauxContinuation添加到request属性中:

public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException
{
    if (_filtered)
    {
        Continuation c = (Continuation) request.getAttribute(Continuation.ATTRIBUTE);
        FilteredContinuation fc;
        if (_faux && (c==null || !(c instanceof FauxContinuation)))
        {
            fc = new FauxContinuation(request);
            request.setAttribute(Continuation.ATTRIBUTE,fc);
        }
        else
            fc=(FilteredContinuation)c;

        boolean complete=false;
        while (!complete)
        {
            try
            {
                if (fc==null || (fc).enter(response))
                    chain.doFilter(request,response);
            }
            catch (ContinuationThrowable e)
            {
                debug("faux",e);
            }
            finally
            {
                if (fc==null)
                    fc = (FilteredContinuation) request.getAttribute(Continuation.ATTRIBUTE);

                complete=fc==null || (fc).exit();
            }
        }
    }
    else
    {
        try
        {
            chain.doFilter(request,response);
        }
        catch (ContinuationThrowable e)
        {
            debug("caught",e);
        }
    }
}

END-------------------------------

转发请标注来源http://my.oschina.net/robinyao/blog/406544

共有 人打赏支持
粉丝 130
博文 53
码字总数 60598
×
robin-yao
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: