文档章节

聊聊dubbo的EagerThreadPool

go4it
 go4it
发布于 06/18 23:12
字数 991
阅读 14
收藏 0

本文主要研究一下dubbo的EagerThreadPool

EagerThreadPool

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPool.java

public class EagerThreadPool implements ThreadPool {

    @Override
    public Executor getExecutor(URL url) {
        String name = url.getParameter(THREAD_NAME_KEY, DEFAULT_THREAD_NAME);
        int cores = url.getParameter(CORE_THREADS_KEY, DEFAULT_CORE_THREADS);
        int threads = url.getParameter(THREADS_KEY, Integer.MAX_VALUE);
        int queues = url.getParameter(QUEUES_KEY, DEFAULT_QUEUES);
        int alive = url.getParameter(ALIVE_KEY, DEFAULT_ALIVE);

        // init queue and executor
        TaskQueue<Runnable> taskQueue = new TaskQueue<Runnable>(queues <= 0 ? 1 : queues);
        EagerThreadPoolExecutor executor = new EagerThreadPoolExecutor(cores,
                threads,
                alive,
                TimeUnit.MILLISECONDS,
                taskQueue,
                new NamedInternalThreadFactory(name, true),
                new AbortPolicyWithReport(name, url));
        taskQueue.setExecutor(executor);
        return executor;
    }
}
  • EagerThreadPool实现了ThreadPool接口,其getExecutor创建的是EagerThreadPoolExecutor,它使用的queue为TaskQueue,使用的threadFactory为NamedInternalThreadFactory,使用的rejectedExecutionHandler为AbortPolicyWithReport

EagerThreadPoolExecutor

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/EagerThreadPoolExecutor.java

public class EagerThreadPoolExecutor extends ThreadPoolExecutor {

    /**
     * task count
     */
    private final AtomicInteger submittedTaskCount = new AtomicInteger(0);

    public EagerThreadPoolExecutor(int corePoolSize,
                                   int maximumPoolSize,
                                   long keepAliveTime,
                                   TimeUnit unit, TaskQueue<Runnable> workQueue,
                                   ThreadFactory threadFactory,
                                   RejectedExecutionHandler handler) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue, threadFactory, handler);
    }

    /**
     * @return current tasks which are executed
     */
    public int getSubmittedTaskCount() {
        return submittedTaskCount.get();
    }

    @Override
    protected void afterExecute(Runnable r, Throwable t) {
        submittedTaskCount.decrementAndGet();
    }

    @Override
    public void execute(Runnable command) {
        if (command == null) {
            throw new NullPointerException();
        }
        // do not increment in method beforeExecute!
        submittedTaskCount.incrementAndGet();
        try {
            super.execute(command);
        } catch (RejectedExecutionException rx) {
            // retry to offer the task into queue.
            final TaskQueue queue = (TaskQueue) super.getQueue();
            try {
                if (!queue.retryOffer(command, 0, TimeUnit.MILLISECONDS)) {
                    submittedTaskCount.decrementAndGet();
                    throw new RejectedExecutionException("Queue capacity is full.", rx);
                }
            } catch (InterruptedException x) {
                submittedTaskCount.decrementAndGet();
                throw new RejectedExecutionException(x);
            }
        } catch (Throwable t) {
            // decrease any way
            submittedTaskCount.decrementAndGet();
            throw t;
        }
    }
}
  • EagerThreadPoolExecutor继承了ThreadPoolExecutor,它维护了submittedTaskCount,在执行任务之前递增,在afterExecute的时候胡递减;其execute方法会捕获RejectedExecutionException,然后使用TaskQueue的retryOffer再重新入队,入队不成功才抛出RejectedExecutionException

TaskQueue

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/eager/TaskQueue.java

public class TaskQueue<R extends Runnable> extends LinkedBlockingQueue<Runnable> {

    private static final long serialVersionUID = -2635853580887179627L;

    private EagerThreadPoolExecutor executor;

    public TaskQueue(int capacity) {
        super(capacity);
    }

    public void setExecutor(EagerThreadPoolExecutor exec) {
        executor = exec;
    }

    @Override
    public boolean offer(Runnable runnable) {
        if (executor == null) {
            throw new RejectedExecutionException("The task queue does not have executor!");
        }

        int currentPoolThreadSize = executor.getPoolSize();
        // have free worker. put task into queue to let the worker deal with task.
        if (executor.getSubmittedTaskCount() < currentPoolThreadSize) {
            return super.offer(runnable);
        }

        // return false to let executor create new worker.
        if (currentPoolThreadSize < executor.getMaximumPoolSize()) {
            return false;
        }

        // currentPoolThreadSize >= max
        return super.offer(runnable);
    }

    /**
     * retry offer task
     *
     * @param o task
     * @return offer success or not
     * @throws RejectedExecutionException if executor is terminated.
     */
    public boolean retryOffer(Runnable o, long timeout, TimeUnit unit) throws InterruptedException {
        if (executor.isShutdown()) {
            throw new RejectedExecutionException("Executor is shutdown!");
        }
        return super.offer(o, timeout, unit);
    }
}
  • TaskQueue继承了LinkedBlockingQueue,它覆盖了offer方法,该方法在submittedTaskCount小于poolSize的时候会入队,如果大于等于poolSize则再判断currentPoolThreadSize是否小于maximumPoolSize,如果小于则返回false让线程池创建新线程,最后在currentPoolThreadSize大于等于maximumPoolSize的时候入队

NamedInternalThreadFactory

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadlocal/NamedInternalThreadFactory.java

public class NamedInternalThreadFactory extends NamedThreadFactory {

    public NamedInternalThreadFactory() {
        super();
    }

    public NamedInternalThreadFactory(String prefix) {
        super(prefix, false);
    }

    public NamedInternalThreadFactory(String prefix, boolean daemon) {
        super(prefix, daemon);
    }

    @Override
    public Thread newThread(Runnable runnable) {
        String name = mPrefix + mThreadNum.getAndIncrement();
        InternalThread ret = new InternalThread(mGroup, runnable, name, 0);
        ret.setDaemon(mDaemon);
        return ret;
    }
}
  • NamedInternalThreadFactory继承了NamedThreadFactory,这里创建的是InternalThread

AbortPolicyWithReport

dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/threadpool/support/AbortPolicyWithReport.java

public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {

    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);

    private final String threadName;

    private final URL url;

    private static volatile long lastPrintTime = 0;

    private static final long TEN_MINUTES_MILLS = 10 * 60 * 1000;

    private static final String OS_WIN_PREFIX = "win";

    private static final String OS_NAME_KEY = "os.name";

    private static final String WIN_DATETIME_FORMAT = "yyyy-MM-dd_HH-mm-ss";

    private static final String DEFAULT_DATETIME_FORMAT = "yyyy-MM-dd_HH:mm:ss";

    private static Semaphore guard = new Semaphore(1);

    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }

    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: "
                + "%d)," +
                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!",
            threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(),
            e.getLargestPoolSize(),
            e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
            url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        dumpJStack();
        throw new RejectedExecutionException(msg);
    }

    private void dumpJStack() {
        long now = System.currentTimeMillis();

        //dump every 10 minutes
        if (now - lastPrintTime < TEN_MINUTES_MILLS) {
            return;
        }

        if (!guard.tryAcquire()) {
            return;
        }

        ExecutorService pool = Executors.newSingleThreadExecutor();
        pool.execute(() -> {
            String dumpPath = url.getParameter(DUMP_DIRECTORY, System.getProperty("user.home"));

            SimpleDateFormat sdf;

            String os = System.getProperty(OS_NAME_KEY).toLowerCase();

            // window system don't support ":" in file name
            if (os.contains(OS_WIN_PREFIX)) {
                sdf = new SimpleDateFormat(WIN_DATETIME_FORMAT);
            } else {
                sdf = new SimpleDateFormat(DEFAULT_DATETIME_FORMAT);
            }

            String dateStr = sdf.format(new Date());
            //try-with-resources
            try (FileOutputStream jStackStream = new FileOutputStream(
                new File(dumpPath, "Dubbo_JStack.log" + "." + dateStr))) {
                JVMUtil.jstack(jStackStream);
            } catch (Throwable t) {
                logger.error("dump jStack error", t);
            } finally {
                guard.release();
            }
            lastPrintTime = System.currentTimeMillis();
        });
        //must shutdown thread pool ,if not will lead to OOM
        pool.shutdown();

    }

}
  • AbortPolicyWithReport继承了ThreadPoolExecutor.AbortPolicy,其rejectedExecution方法会输出包含thread pool相关信息的msg,然后使用warn级别打印出来,然后进行dumpJStack,最后再抛出RejectedExecutionException

小结

EagerThreadPool实现了ThreadPool接口,其getExecutor创建的是EagerThreadPoolExecutor,它使用的queue为TaskQueue,使用的threadFactory为NamedInternalThreadFactory,使用的rejectedExecutionHandler为AbortPolicyWithReport

doc

© 著作权归作者所有

下一篇: 聊聊dubbo的Filter
go4it
粉丝 86
博文 1036
码字总数 990831
作品 0
深圳
私信 提问
RPC 服务框架 Dubbo 2.6.2 正式发布, 包含多项重要改进

Dubbo 2.6.2 现已正式发布。 该版本包含了一些重要的改进: Hessian-lite 序列化:为了兼容性,恢复本地序列化 #1413 Asset transfer to ASF,包括 pom, license, DISCLAIMER 等 #1491 引入新...

淡漠悠然
2018/06/07
2K
6
【南京站报名中!】微服务框架到生态,Apache Dubbo 开发者沙龙

Dubbo 诞生于 2008 年,是阿里巴巴开源的高性能分布式服务框架(A High Performance Java RPC Framework),使得应用可通过高性能的 RPC 实现服务的输出和输入功能,可以和 Spring 框架无缝集...

amber涂南
03/11
0
0
聊聊dubbo的TPSLimiter

序 本文主要研究一下dubbo的TPSLimiter TPSLimiter dubbo-2.7.2/dubbo-rpc/dubbo-rpc-api/src/main/java/org/apache/dubbo/rpc/filter/tps/TPSLimiter.java TPSLimiter定义了isAllowable方法......

go4it
06/23
0
0
聊聊dubbo的StatusChecker

序 本文主要研究一下dubbo的StatusChecker Status dubbo-2.7.2/dubbo-common/src/main/java/org/apache/dubbo/common/status/Status.java Status定义了三个属性,分别是level、message、des......

go4it
06/22
0
0
聊聊dubbo的CommandExecutor

序 本文主要研究一下dubbo的CommandExecutor CommandExecutor dubbo-2.7.2/dubbo-plugin/dubbo-qos/src/main/java/org/apache/dubbo/qos/command/CommandExecutor.java CommandExecutor定义了......

go4it
07/05
0
0

没有更多内容

加载失败,请刷新页面

加载更多

不写代码即可快速开发应用,JEPaaS助力企业数字化创新

在企业实现信息化和数字化的过程中,通常有三种方式可以满足企业的大量软件系统需求:一是可以采购如ERP和CRM等的标准化软件产品,二是企业内部自己开发,还有一种是可以通过外包的方式去进行...

JEPaaS云平台
10分钟前
0
0
微信小程序数据 java 解密版

微信小程序的数据,解密方法,官方居然没得 java版的解密demo, 木有java 木有java 木有java 。 简直反人类。 翻阅多位大神的博客,于是乎,写一篇 简要教程: 1. 加入pom.xml <dependency> ...

MrBoyce
12分钟前
0
0
35岁大龄程序员的职业生涯发展之道-大龄码农如何避免被裁员-IT人工职能IOT网联网算法各种高精尖技术情况下大龄程序员如何跟上节奏不被淘汰-程序员迷茫如何自我革新-软件设计在大陆的生命活力

这是一篇从“人”(而非技术也非管理)的角度,聚焦于自身职业发展方方面面的文章,包括职业、学习、生产力、影响力等。 1. 拥有商业心态 你所能犯的最大错误就是相信自己是在为别人工作,职业...

letwang
26分钟前
1
0
Spring Aware 到底是什么?

通过如下前序两篇文章: Spring Bean 生命周期之“我从哪里来”? Spring Bean 生命周期之“我要到哪里去”? 我们了解了 Spring Bean 的生命周期核心内容,bean 是如何被初始化变为 Ready fo...

tan日拱一兵
50分钟前
6
0
Android 调用第三方浏览器打开网址或下载文件

/** * 调用第三方浏览器打开 * @param context * @param url 要浏览的资源地址 */ public static void openBrowser(Context context,String url){ final Intent intent = new Intent(); int......

丁佳辉
55分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部