文档章节

解决dubbo优雅停机

d
 dbnaxlc
发布于 2016/11/19 12:10
字数 1335
阅读 3140
收藏 7
  • 现状

dubbo目前的不支持优雅停机,是因为服务端关闭时,客户端主动关闭长连接,导致服务端响应消息不能返回。

在服务端和客户端配置参数:-Ddubbo.service.shutdown.wait=30000,该参数为停机等待时间,但是结果也不生效,还是马上停机了。

  • 期望结果
  1. 正在读或者写的消息,需要继续处理;
  2. 线程池需等所有任务执行完成后,才关闭;
  • 服务端停机时客户端操作

当服务端停机时,客户端检测断开事件,马上关闭了与该服务端之间的连接,并无限次重连服务提供折。如果服务提供者只有一个时,则不在进行服务的远程调用。

客户端断开连接时序图:

通过zk的节点变化事件触发notify,客户端刷新服务提供者,删除该服务提供者【destroyUnusedInvokers方法】;如果只有一个服务提供者时,则销毁所有Invoker【destroyAllInvokers方法】。

  • 客户端代码修改

在dubboInvoker销毁时,直接调用了client.clise()方法,源码如下:

public void destroy() {
        if (super.isDestroyed()){
            return ;
        } else {
            destroyLock.lock();
            try{
                if (super.isDestroyed()){
                    return ;
                }
                super.destroy();
                if (invokers != null){
                    invokers.remove(this);
                }
                for (ExchangeClient client : clients) {
                    try {
                        //直接关闭
                        client.close();
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
                
            }finally {
                destroyLock.unlock();
            }
        }
    }

 修改client.close()为client.clise(timeout),代码修改如下:

public void destroy() {
        if (super.isDestroyed()){
            return ;
        } else {
            destroyLock.lock();
            try{
                if (super.isDestroyed()){
                    return ;
                }
                super.destroy();
                if (invokers != null){
                    invokers.remove(this);
                }
                for (ExchangeClient client : clients) {
                    try {
                        //修改关闭
                        this.close(client);
                    } catch (Throwable t) {
                        logger.warn(t.getMessage(), t);
                    }
                }
                
            }finally {
                destroyLock.unlock();
            }
        }
    }
    
    /**
     * 如果没有设置dubbo.service.shutdown.wait
     *    或者dubbo.service.shutdown.wait.seconds参数,则直接关闭
     * @param client
     * @author 夏志强
     */
    @SuppressWarnings("deprecation")
	private void close(ExchangeClient client) {
    	String timeout = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_KEY);
    	if(timeout != null && timeout.length() > 0) {
    		try{
    			client.close(Integer.parseInt(timeout));
    		} catch(Exception e) {
    		}
    	} else {
    		timeout = ConfigUtils.getProperty(Constants.SHUTDOWN_WAIT_SECONDS_KEY);
    		if(timeout != null && timeout.length() > 0) {
        		try{
        			client.close(Integer.parseInt(timeout));
        		} catch(Exception e) {
        		}
        	} else {
        		client.close();
        	}
    	}
    }

修改完上述代码,运行后发现客户端还是马上关闭连接,调试代码发现HeaderExchangeChannel的close方法里,判断HeaderExchangeChannel.this是否在DefaultFuture中和优雅退出是否超过超时时间,如果过了超时时间,则立即关闭,源码如下:

public void close(int timeout) {
        if (closed) {
            return;
        }
        closed = true;
        if (timeout > 0) {
            long start = System.currentTimeMillis();
            //DefaultFuture中CHANNELS不包含HeaderExchangeChannel类型
            while (DefaultFuture.hasFuture(HeaderExchangeChannel.this) 
                    && System.currentTimeMillis() - start < timeout) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        close();
    }

查看HeaderExchangeChannel的request方法,

DefaultFuture future = new DefaultFuture(channel, req, timeout);

实际上channel是NettyClient,所以channel不会是HeaderExchangeChannel类型。

修改代码如下:

// graceful close
    public void close(int timeout) {
        if (closed) {
            return;
        }
        closed = true;
        if (timeout > 0) {
            long start = System.currentTimeMillis();
            while (DefaultFuture.hasFuture(channel) 
                    && System.currentTimeMillis() - start < timeout) {
                try {
                    Thread.sleep(10);
                } catch (InterruptedException e) {
                    logger.warn(e.getMessage(), e);
                }
            }
        }
        close();
    }

再次运行代码,发现结果还是不对,客户端一直在等待关闭,而服务端已经关闭了。

  • 服务端停机时序图

目前我分析的是ProtocolConfig的destoryAll()方法,主要分两步,第一步删除zk服务提供者节点,关闭zk监听;第二步dubbo协议销毁,链接关闭。

dubbo协议销毁时序图:

通过时序图看到服务端的优雅关闭是在AbstractServer的close(timeout)方法,源码如下:

public void close(int timeout) {
        ExecutorUtil.gracefulShutdown(executor ,timeout);
        close();
    }

//ExecutorUtil类
public static void gracefulShutdown(Executor executor, int timeout) {
        if (!(executor instanceof ExecutorService) || isShutdown(executor)) {
            return;
        }
        final ExecutorService es = (ExecutorService) executor;
        try {
            es.shutdown(); // Disable new tasks from being submitted
        } catch (SecurityException ex2) {
            return ;
        } catch (NullPointerException ex2) {
            return ;
        }
        try {
            if(! es.awaitTermination(timeout, TimeUnit.MILLISECONDS)) {
                es.shutdownNow();
            }
        } catch (InterruptedException ex) {
            es.shutdownNow();
            Thread.currentThread().interrupt();
        }
        if (!isShutdown(es)){
            newThreadToCloseExecutor(es);
        }
    }

其中executor是dubbo的线程派发模型,只有executor不为空时,才会等待线程池任务执行完后关闭。这里有一个坑,那就是调用es.awaitTermination时,一定要先调用es.shutdown(),否则就算线程池中的任务执行完或者超时后,都不会关闭,一直阻塞【详情请查看jdk】。调试时发现executor为空,executor值是通过AbstractServer的构造函数来初始化的,源码如下:

ExecutorService executor;

    public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                        || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                        ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 
                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //设置executor
        if (handler instanceof WrappedChannelHandler ){
            executor = ((WrappedChannelHandler)handler).getExecutor();
        }
    }

我们默认dubbo的底层通信框架为netty,所以查看NettyServer代码,

public NettyServer(URL url, ChannelHandler handler) throws RemotingException{
        super(url, ChannelHandlers.wrap(handler, ExecutorUtil.setThreadName(url, SERVER_THREAD_POOL_NAME)));
    }

//以下为ChannelHandlers代码
public static ChannelHandler wrap(ChannelHandler handler, URL url){
        return ChannelHandlers.getInstance().wrapInternal(handler, url);
    }

    protected ChannelHandler wrapInternal(ChannelHandler handler, URL url) {
        return new MultiMessageHandler(new HeartbeatHandler(ExtensionLoader.getExtensionLoader(Dispatcher.class)
                                        .getAdaptiveExtension().dispatch(handler, url)));
    }

构造函数中将handler进行了包装,此时handler类型已经变成了MultiMessageHandler,而不是WrappedChannelHandler。看一下handler的继承关系:

因为dubbo的默认线程模型为AllChannelHandler(参照dubbo官方文档),AllChannelHandler父类为WrappedChannelHandler类型,所以需要通过反射来设置executor值。

修改AbstractServer的构造函数,给executor赋值:

public AbstractServer(URL url, ChannelHandler handler) throws RemotingException {
        super(url, handler);
        localAddress = getUrl().toInetSocketAddress();
        String host = url.getParameter(Constants.ANYHOST_KEY, false) 
                        || NetUtils.isInvalidLocalHost(getUrl().getHost()) 
                        ? NetUtils.ANYHOST : getUrl().getHost();
        bindAddress = new InetSocketAddress(host, getUrl().getPort());
        this.accepts = url.getParameter(Constants.ACCEPTS_KEY, Constants.DEFAULT_ACCEPTS);
        this.idleTimeout = url.getParameter(Constants.IDLE_TIMEOUT_KEY, Constants.DEFAULT_IDLE_TIMEOUT);
        try {
            doOpen();
            if (logger.isInfoEnabled()) {
                logger.info("Start " + getClass().getSimpleName() + " bind " + getBindAddress() + ", export " + getLocalAddress());
            }
        } catch (Throwable t) {
            throw new RemotingException(url.toInetSocketAddress(), null, "Failed to bind " + getClass().getSimpleName() 
                                        + " on " + getLocalAddress() + ", cause: " + t.getMessage(), t);
        }
        //修改s
        this.setExecutor(handler);
    }
    
    /**
     * 设置executor
     * @param handler
     * @author 夏志强
     */
    private void setExecutor(ChannelHandler handler) {
    	if(handler != null) {
    		if (handler instanceof WrappedChannelHandler ){
                executor = ((WrappedChannelHandler)handler).getExecutor();
            } else if (handler instanceof AbstractChannelHandlerDelegate ){
                try {
					Field field = AbstractChannelHandlerDelegate.class.getDeclaredField("handler");
					field.setAccessible(true);
					setExecutor((ChannelHandler)field.get(handler));
				} catch (Exception e) {
				} 
            }
    	}
    }

再次运行测试代码,这时返回结果正常。

© 著作权归作者所有

d
粉丝 2
博文 1
码字总数 1335
作品 0
深圳
程序员
私信 提问
dubbo如何正确关闭Spring容器

查看dubbo文档,Dubbo是通过JDK的ShutdownHook来完成优雅停机的: http://dubbo.io/User+Guide-zh.htm#UserGuide-zh-%E4%BC%98%E9%9B%85%E5%81%9C%E6%9C%BA 但能实现优雅停机的前提是,在启动...

squanchao
2015/10/15
816
0
dubbo如何正确关闭Spring容器

查看dubbo文档,Dubbo是通过JDK的ShutdownHook来完成优雅停机的: http://dubbo.io/User+Guide-zh.htm#UserGuide-zh-%E4%BC%98%E9%9B%85%E5%81%9C%E6%9C%BA 但能实现优雅停机的前提是,在启动...

天天顺利
2015/09/14
2.8K
0
跟我学习dubbo-使用Maven构建Dubbo服务的可执行jar包(4)

Dubbo服务的运行方式: 1、使用Servlet容器运行(Tomcat、Jetty等)----不可取 缺点:增加复杂性(端口、管理) 浪费资源(内存) 官方:服务容器是一个standalone的启动程序,因为后台服务不...

HI曲奇饼干
2016/01/06
4.4K
1
Dubbo 发布恢复维护后的第一个版本 2.5.4

Dubbo 发布了恢复维护后的第一个版本 2.5.4,主要是解决 issues 和依赖升级。 修复的 issue: 不能优雅停机的问题 MonitorFilter 监控统计功能阻塞 rpc 流程 动态配置:设置指定的 consumer...

fbf
2017/09/07
14.5K
75
史上最全 40 道 Dubbo 面试题及答案,看完碾压面试官!

想往高处走,怎么能不懂 Dubbo? Dubbo是国内最出名的分布式服务框架,也是 Java 程序员必备的必会的框架之一。Dubbo 更是中高级面试过程中经常会问的技术,无论你是否用过,你都必须熟悉。 ...

Java技术栈
2018/10/02
1K
0

没有更多内容

加载失败,请刷新页面

加载更多

mysql-connector-java升级到8.0后保存时间到数据库出现了时差

在一个新项目中用到了新版的mysql jdbc 驱动 <dependency>     <groupId>mysql</groupId>     <artifactId>mysql-connector-java</artifactId>     <version>8.0.18</version> ......

ValSong
今天
5
0
Spring Boot 如何部署到 Linux 中的服务

打包完成后的 Spring Boot 程序如何部署到 Linux 上的服务? 你可以参考官方的有关部署 Spring Boot 为 Linux 服务的文档。 文档链接如下: https://docs.ossez.com/spring-boot-docs/docs/r...

honeymoose
今天
6
0
Spring Boot 2 实战:使用 Spring Boot Admin 监控你的应用

1. 前言 生产上对 Web 应用 的监控是十分必要的。我们可以近乎实时来对应用的健康、性能等其他指标进行监控来及时应对一些突发情况。避免一些故障的发生。对于 Spring Boot 应用来说我们可以...

码农小胖哥
今天
10
0
ZetCode 教程翻译计划正式启动 | ApacheCN

原文:ZetCode 协议:CC BY-NC-SA 4.0 欢迎任何人参与和完善:一个人可以走的很快,但是一群人却可以走的更远。 ApacheCN 学习资源 贡献指南 本项目需要校对,欢迎大家提交 Pull Request。 ...

ApacheCN_飞龙
今天
5
0
CSS定位

CSS定位 relative相对定位 absolute绝对定位 fixed和sticky及zIndex relative相对定位 position特性:css position属性用于指定一个元素在文档中的定位方式。top、right、bottom、left属性则...

studywin
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部