Dubbo线程池耗尽原理分析Thread pool is EXHAUSTED

原创
2017/06/21 15:27
阅读数 1.7W

对最近遇到的业务应用dubbo线程池爆满(异常:RejectedExecutionException:Thread pool is EXHAUSTED)问题进行了分析。

一、问题回顾:

业务应用dubbo配置如下:

<dubbo:protocol name="dubbo" port="${dubbo.port}" /}

在dubbo的spring配置中,业务应用并没有配置threadpool, threads等参数,查看dubbo.io用户文档,可知:默认配置为:

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="100" />

查看dubbo代码(版本为2.5.3)实现:FixedThreadPool.java

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.common.threadpool.support.fixed;

import java.util.concurrent.Executor;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.SynchronousQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

import com.alibaba.dubbo.common.Constants;
import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.threadpool.ThreadPool;
import com.alibaba.dubbo.common.threadpool.support.AbortPolicyWithReport;
import com.alibaba.dubbo.common.utils.NamedThreadFactory;

/**
 * 此线程池启动时即创建固定大小的线程数,不做任何伸缩,来源于:<code>Executors.newFixedThreadPool()</code>
 * 
 * @see java.util.concurrent.Executors#newFixedThreadPool(int)
 * @author william.liangf
 */
public class FixedThreadPool implements ThreadPool {

    public Executor getExecutor(URL url) {
        String name = url.getParameter(Constants.THREAD_NAME_KEY, Constants.DEFAULT_THREAD_NAME);
        int threads = url.getParameter(Constants.THREADS_KEY, Constants.DEFAULT_THREADS);
        int queues = url.getParameter(Constants.QUEUES_KEY, Constants.DEFAULT_QUEUES);
        return new ThreadPoolExecutor(threads, threads, 0, TimeUnit.MILLISECONDS, 
        		queues == 0 ? new SynchronousQueue<Runnable>() : 
        			(queues < 0 ? new LinkedBlockingQueue<Runnable>() 
        					: new LinkedBlockingQueue<Runnable>(queues)),
        		new NamedThreadFactory(name, true), new AbortPolicyWithReport(name, url));
    }

}

其中:

public class Constants {

    // the other constant
    // ......

    public static final String  DEFAULT_THREAD_NAME = "Dubbo";
    public static final int     DEFAULT_THREADS = 200;
    public static final int     DEFAULT_QUEUES = 0;
}

由代码可知:dubbo的线程池采用jdk的ThreadPoolExecutor,默认threads数为200,默认队列长度为0,此时默认采用了SynchronousQueue队列,而如果用户配置的队列长度大于0时,则会采用LinkedBlockingQueue队列。【注意:由此可见,dubbo.io用户文档中,默认threads=100是错误的,实际为200】

SynchronousQueue:是一个缓存为1的阻塞队列,某次添加元素后必须等待其他线程取走后才能继续添加。

LinkedBlockingQueue:线程安全的基于链表的阻塞队列,实现了入队出队的分离(分别采用putLock和takeLock锁,因此可以同时入队和出队操作)位于java.util.concurrent包下,是一个无界队列。

ThreadPoolExecutor的完整构造方法的签名是:ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, ThreadFactory threadFactory, RejectedExecutionHandler handler) .

corePoolSize - 池中所保存的线程数,包括空闲线程。
maximumPoolSize-池中允许的最大线程数。
keepAliveTime - 当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
unit - keepAliveTime 参数的时间单位。
workQueue - 执行前用于保持任务的队列。此队列仅保持由 execute方法提交的 Runnable任务。
threadFactory - 执行程序创建新线程时使用的工厂。
handler - 由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。
ThreadPoolExecutor是Executors类的底层实现。

dubbo默认创建固定大小的线程池(200), 每次提交一个任务就创建一个线程,直到线程数达到线程池大小200,线程池的大小一旦达到最大值就保持不变。如果某个线程因为执行异常而结束,那么线程池会补充一个新的线程。

由于dubbo默认采用了直接提交的SynchronousQueue工作队列,所以,所有的task会直接提交给线程池中的某一worker线程,如果没有可用线程,那么会拒绝任务的处理,而抛出异常RejectedExecutionException:Thread pool is EXHAUSTED.

/*
 * Copyright 1999-2011 Alibaba Group.
 *  
 * Licensed under the Apache License, Version 2.0 (the "License");
 * you may not use this file except in compliance with the License.
 * You may obtain a copy of the License at
 *  
 *      http://www.apache.org/licenses/LICENSE-2.0
 *  
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
package com.alibaba.dubbo.common.threadpool.support;

import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;

import com.alibaba.dubbo.common.URL;
import com.alibaba.dubbo.common.logger.Logger;
import com.alibaba.dubbo.common.logger.LoggerFactory;

/**
 * Abort Policy.
 * Log warn info when abort.
 * 
 * @author ding.lid
 */
public class AbortPolicyWithReport extends ThreadPoolExecutor.AbortPolicy {
    
    protected static final Logger logger = LoggerFactory.getLogger(AbortPolicyWithReport.class);
    
    private final String threadName;
    
    private final URL url;
    
    public AbortPolicyWithReport(String threadName, URL url) {
        this.threadName = threadName;
        this.url = url;
    }
    
    @Override
    public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {
        String msg = String.format("Thread pool is EXHAUSTED!" +
                " Thread Name: %s, Pool Size: %d (active: %d, core: %d, max: %d, largest: %d), Task: %d (completed: %d)," +
                " Executor status:(isShutdown:%s, isTerminated:%s, isTerminating:%s), in %s://%s:%d!" ,
                threadName, e.getPoolSize(), e.getActiveCount(), e.getCorePoolSize(), e.getMaximumPoolSize(), e.getLargestPoolSize(),
                e.getTaskCount(), e.getCompletedTaskCount(), e.isShutdown(), e.isTerminated(), e.isTerminating(),
                url.getProtocol(), url.getIp(), url.getPort());
        logger.warn(msg);
        throw new RejectedExecutionException(msg);
    }

}

因此,如果系统抛出异常RejectedExecutionException:Thread pool is EXHAUSTED.可以通过调大dubbo线程池解决该问题,或者降低对TPS的预期。

<dubbo:protocol name="dubbo" dispatcher="all" threadpool="fixed" threads="500" />

此处修改了dubbo线程池为500,以处理更多的任务。

展开阅读全文
打赏
1
1 收藏
分享
加载中
多台提供者中的一台报这个错误,请求会跳转到其他服务吗?
2019/06/12 15:29
回复
举报
更多评论
打赏
1 评论
1 收藏
1
分享
返回顶部
顶部