文档章节

聊聊dubbo的ExecuteLimitFilter

go4it
 go4it
发布于 06/25 22:55
字数 700
阅读 29
收藏 1

本文主要研究一下dubbo的ExecuteLimitFilter

ExecuteLimitFilter

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/ExecuteLimitFilter.java

public class ExecuteLimitFilter extends ListenableFilter {

    private static final String EXECUTELIMIT_FILTER_START_TIME = "execugtelimit_filter_start_time";

    public ExecuteLimitFilter() {
        super.listener = new ExecuteLimitListener();
    }

    @Override
    public Result invoke(Invoker<?> invoker, Invocation invocation) throws RpcException {
        URL url = invoker.getUrl();
        String methodName = invocation.getMethodName();
        int max = url.getMethodParameter(methodName, EXECUTES_KEY, 0);
        if (!RpcStatus.beginCount(url, methodName, max)) {
            throw new RpcException("Failed to invoke method " + invocation.getMethodName() + " in provider " +
                    url + ", cause: The service using threads greater than <dubbo:service executes=\"" + max +
                    "\" /> limited.");
        }

        invocation.setAttachment(EXECUTELIMIT_FILTER_START_TIME, String.valueOf(System.currentTimeMillis()));
        try {
            return invoker.invoke(invocation);
        } catch (Throwable t) {
            if (t instanceof RuntimeException) {
                throw (RuntimeException) t;
            } else {
                throw new RpcException("unexpected exception when ExecuteLimitFilter", t);
            }
        }
    }

    static class ExecuteLimitListener implements Listener {
        @Override
        public void onResponse(Result appResponse, Invoker<?> invoker, Invocation invocation) {
            RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), true);
        }

        @Override
        public void onError(Throwable t, Invoker<?> invoker, Invocation invocation) {
            RpcStatus.endCount(invoker.getUrl(), invocation.getMethodName(), getElapsed(invocation), false);
        }

        private long getElapsed(Invocation invocation) {
            String beginTime = invocation.getAttachment(EXECUTELIMIT_FILTER_START_TIME);
            return StringUtils.isNotEmpty(beginTime) ? System.currentTimeMillis() - Long.parseLong(beginTime) : 0;
        }
    }
}
  • ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
  • invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
  • ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时

RpcStatus

dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/RpcStatus.java

public class RpcStatus {

    private static final ConcurrentMap<String, RpcStatus> SERVICE_STATISTICS = new ConcurrentHashMap<String, RpcStatus>();

    private static final ConcurrentMap<String, ConcurrentMap<String, RpcStatus>> METHOD_STATISTICS = new ConcurrentHashMap<String, ConcurrentMap<String, RpcStatus>>();
    private final ConcurrentMap<String, Object> values = new ConcurrentHashMap<String, Object>();
    private final AtomicInteger active = new AtomicInteger();
    private final AtomicLong total = new AtomicLong();
    private final AtomicInteger failed = new AtomicInteger();
    private final AtomicLong totalElapsed = new AtomicLong();
    private final AtomicLong failedElapsed = new AtomicLong();
    private final AtomicLong maxElapsed = new AtomicLong();
    private final AtomicLong failedMaxElapsed = new AtomicLong();
    private final AtomicLong succeededMaxElapsed = new AtomicLong();

    //......

    public static void beginCount(URL url, String methodName) {
        beginCount(url, methodName, Integer.MAX_VALUE);
    }

    /**
     * @param url
     */
    public static boolean beginCount(URL url, String methodName, int max) {
        max = (max <= 0) ? Integer.MAX_VALUE : max;
        RpcStatus appStatus = getStatus(url);
        RpcStatus methodStatus = getStatus(url, methodName);
        if (methodStatus.active.incrementAndGet() > max) {
            methodStatus.active.decrementAndGet();
            return false;
        } else {
            appStatus.active.incrementAndGet();
            return true;
        }
    }

    /**
     * @param url
     * @param elapsed
     * @param succeeded
     */
    public static void endCount(URL url, String methodName, long elapsed, boolean succeeded) {
        endCount(getStatus(url), elapsed, succeeded);
        endCount(getStatus(url, methodName), elapsed, succeeded);
    }

    private static void endCount(RpcStatus status, long elapsed, boolean succeeded) {
        status.active.decrementAndGet();
        status.total.incrementAndGet();
        status.totalElapsed.addAndGet(elapsed);
        if (status.maxElapsed.get() < elapsed) {
            status.maxElapsed.set(elapsed);
        }
        if (succeeded) {
            if (status.succeededMaxElapsed.get() < elapsed) {
                status.succeededMaxElapsed.set(elapsed);
            }
        } else {
            status.failed.incrementAndGet();
            status.failedElapsed.addAndGet(elapsed);
            if (status.failedMaxElapsed.get() < elapsed) {
                status.failedMaxElapsed.set(elapsed);
            }
        }
    }

    //......
}
  • RpcStatus的beginCount方法会递增methodStatus.active,然后判断是否大于max值,超出则返回false并递减methodStatus.active;小于等于则递增appStatus.active;endCount方法会递减status.active,递增status.total,然后根据成功与否更新status.succeededMaxElapsed或status.failed、status.failedElapsed、status.failedMaxElapsed

小结

  • ExecuteLimitFilter继承了ListenableFilter,其构造器初始化的listener为ExecuteLimitListener
  • ExecuteLimitFilter的invoke方法先调用RpcStatus.beginCount方法来判断是否可以通过,不通过则抛出RpcException,通过则记录开始执行的时间,然后执行invoker.invoke方法,执行结束时会回调Listener的onResponse或onError方法
  • ExecuteLimitListener的onResponse及onError方法均会调用RpcStatus.endCount;而该方法会通过getElapsed方法取出execugtelimit_filter_start_time值,计算执行耗时

doc

© 著作权归作者所有

go4it
粉丝 86
博文 1035
码字总数 989892
作品 0
深圳
私信 提问
Dubbo源码之服务端并发控制——ExecuteLimitFilter

上一篇关于《Dubbo客户端并发控制——ActiveLimitFilter》 作用,设计原理,及配置方式。 这篇是关于Dubbo服务端Filter组件扩展 ExecuteLimitFilter ,它可以限制服务端的方法级别的并发处理...

BakerZhu
2018/08/25
0
0
dubbo源码分析系列——dubbo-rpc-api模块源码分析

简化的类图 该图是经过简化后的rpc-api模块的类图,去除了一些非关键的属性和方法定义,也去除了一些非核心的类和接口,只是一个简化了的的示意图,这样大家能够去除干扰看清楚该模块的核心接...

杨武兵
2016/05/29
739
3
【南京站报名中!】微服务框架到生态,Apache Dubbo 开发者沙龙

Dubbo 诞生于 2008 年,是阿里巴巴开源的高性能分布式服务框架(A High Performance Java RPC Framework),使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring 框架无缝集...

amber涂南
03/11
0
0
聊聊dubbo的TPSLimiter

序 本文主要研究一下dubbo的TPSLimiter TPSLimiter dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/TPSLimiter.java TPSLimiter定义了isAllowable方法......

go4it
06/23
0
0
聊聊dubbo的StatusChecker

序 本文主要研究一下dubbo的StatusChecker Status dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/status/Status.java Status定义了三个属性,分别是level、message、des......

go4it
06/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Leetcode PHP题解--D106 997. Find the Town Judge

D106 997. Find the Town Judge 题目链接 997. Find the Town Judge 题目分析 给定一个数组N代表人数,和给定一个数组,每个元素为一个只有两个值(a,b)的数组。 代表a信任b。 从中找到一个b...

skys215
32分钟前
1
0
日志相关---log4j2配置文件详解

一、关于配置文件的名称以及在项目中的存放位置 log4j 2.x版本不再支持像1.x中的.properties后缀的文件配置方式,2.x版本配置文件后缀名只能为".xml",".json"或者".jsn". 系统选择配置文件的...

spinachgit
33分钟前
0
0
redis 消息队列实现

方式一:通过list的阻塞读取命令,blpop或者brpop 消费者 public class Consumer extends DemoApplicationTests{ @Test public void consume(){ int timeout = 0;//永不超......

小海bug
55分钟前
2
0
如何把微信语音汇总成一个MP3文件?

本篇以苹果手机为例,安卓手机也可类似。 第一步,安装同步助手 同步助手是一款在电脑上安装,可以保存手机上的内容的软件。支持把微信的聊天历史内容导出。甚至支持筛选语音文件按顺序导出—...

吴伟祥
今天
4
0
用ffsend使用Firefox Send

导读 ffsend 是 Firefox Send 的命令行客户端。本文将展示 Firefox Send 和 ffsend 如何工作。还会详细介绍如何在 Fedora 中安装和使用它。 ffsend 是 Firefox Send 的命令行客户端。本文将展...

问题终结者
今天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部