关于ThreadPoolExecutor 调用RejectedExecutionHandler的机制

原创
2012/12/21 10:40
阅读数 3W

        当我们创建线程池并且提交任务失败时,线程池会回调RejectedExecutionHandler接口的rejectedExecution(Runnable task, ThreadPoolExecutor executor)方法来处理线程池处理失败的任务,其中task 是用户提交的任务,而executor是当前执行的任务的线程池。可以通过代码的方式来验证。

1、线程池工厂:


package com.threadpool;

import java.util.List;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池工厂方法
 * @author 
 *
 */
public class ThreadPoolFactory {
	
	//线程池
	private static ThreadPoolExecutor  pool;	
	//自身对象
	private static ThreadPoolFactory factory;
	
	/**
	 * 私有构造函数
	 */
	private ThreadPoolFactory(){	}
	
	/**
	 * 获取工厂对象
	 * @param config
	 * @return
	 */
	public static ThreadPoolFactory getInstance(ThreadPoolConfig config){
		if(factory == null){
			factory = new ThreadPoolFactory();
		}
		
		if(pool == null){
			
			if(config.getHandler() == null){
				pool = new ThreadPoolExecutor(config.getCorePoolSize(),
						config.getMaximumPoolSize(),config.getKeepAliveTime(),
						config.getUnit(),config.getWorkQueue());
			}else{
				pool = new ThreadPoolExecutor(config.getCorePoolSize(),
						config.getMaximumPoolSize(),config.getKeepAliveTime(),
						config.getUnit(),config.getWorkQueue(),config.getHandler());
			}
		}		
		System.out.println("pool  create= "+pool.toString());
		return factory;
	}
	
	/**
	 * 添加线程池任务
	 * @param run
	 */
	public synchronized void addTask(Runnable run){
		pool.execute(run);
	}
	
	/**
	 * 添加线程池任务
	 * @param runs
	 */
	public synchronized void addTask(List<Runnable> runs){
		if(runs != null){
			for(Runnable r:runs){
				this.addTask(r);
			}
		}
	}
	
	/**
	 * 关闭线程池
	 */
	public void closePool(){
		pool.shutdown();
	}
	
}
2、线程池配置文件类:



package com.threadpool;

import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.TimeUnit;

/**
 * 线程池配置类
 * @author 
 *
 */
public class ThreadPoolConfig {
	//池中所保存的线程数,包括空闲线程。
	private int corePoolSize;
	//池中允许的最大线程数。
	private int maximumPoolSize;
	//当线程数大于核心时,此为终止前多余的空闲线程等待新任务的最长时间。
	private long keepAliveTime; 
	//参数的时间单位。
	private TimeUnit unit;
	//执行前用于保持任务的队列。此队列仅由保持 execute 方法提交的 Runnable 任务。
	private BlockingQueue<Runnable> workQueue;
	//由于超出线程范围和队列容量而使执行被阻塞时所使用的处理程序。 
	private RejectedExecutionHandler handler;
	//配置文件自身对象
	private static ThreadPoolConfig config;
	/**
	 * 单例模式
	 */
	private ThreadPoolConfig(){
		
	}
	
	/**
	 * 获取配置文件对象
	 * @return
	 */
	public static ThreadPoolConfig getInstance(){
		if(config == null){
			config = new ThreadPoolConfig();
		}		
		return config;
	}	
	public int getCorePoolSize() {
		return corePoolSize;
	}
	public void setCorePoolSize(int corePoolSize) {
		this.corePoolSize = corePoolSize;
	}
	public int getMaximumPoolSize() {
		return maximumPoolSize;
	}
	public void setMaximumPoolSize(int maximumPoolSize) {
		this.maximumPoolSize = maximumPoolSize;
	}
	public long getKeepAliveTime() {
		return keepAliveTime;
	}
	public void setKeepAliveTime(long keepAliveTime) {
		this.keepAliveTime = keepAliveTime;
	}
	public TimeUnit getUnit() {
		return unit;
	}
	public void setUnit(TimeUnit unit) {
		this.unit = unit;
	}
	public BlockingQueue<Runnable> getWorkQueue() {
		return workQueue;
	}
	public void setWorkQueue(BlockingQueue<Runnable> workQueue) {
		this.workQueue = workQueue;
	}
	public RejectedExecutionHandler getHandler() {
		return handler;
	}
	public void setHandler(RejectedExecutionHandler handler) {
		this.handler = handler;
	}	
}
3、简单任务类:



package com.test;

/**
 * 任务线程
 * @author 
 *
 */
public class ThreadTask extends Thread {
	
	public ThreadTask(String name){
		super(name);
	}
	
	@SuppressWarnings("static-access")
	@Override
	public void run() {
		// TODO Auto-generated method stub
		System.out.println(this.getName().toString() + ", will sleep 0 s");
		try {
			this.sleep(1*10);
		} catch (InterruptedException e) {
			// TODO Auto-generated catch block
			e.printStackTrace();
		}
		System.out.println(this.getName().toString() + ", I am wakeup now ");
	}

}

4、异常处理接口实现类:

package com.threadpool;

import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;

/**
 * 线程池异常处理类
 * @author 
 *
 */
public class MyRejectedExecutionHandler implements RejectedExecutionHandler {

	@Override
	public void rejectedExecution(Runnable task, ThreadPoolExecutor executor) {
		// TODO Auto-generated method stub
		System.out.println("Begin exception handler-----------");
		//执行失败任务
		new Thread(task,"exception by pool").start();
		//打印线程池的对象
		System.out.println("The pool RejectedExecutionHandler = "+executor.toString());
	}
}
5、测试主函数:
package com.test;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.TimeUnit;

import com.threadpool.MyRejectedExecutionHandler;
import com.threadpool.ThreadPoolConfig;
import com.threadpool.ThreadPoolFactory;

/**
 * @author 
 *
 */
public class TestThreadPoolMain {

	/**
	 * @param args
	 */
	public static void main(String[] args) {

		//设置配置
		ThreadPoolConfig config = ThreadPoolConfig.getInstance();
		config.setCorePoolSize(2);
		config.setMaximumPoolSize(3);
		config.setKeepAliveTime(5);
		config.setUnit(TimeUnit.SECONDS);
		//将队列设小,会抛异常
		config.setWorkQueue(new ArrayBlockingQueue<Runnable>(10));
		config.setHandler(new MyRejectedExecutionHandler());
		//线程池工厂
		ThreadPoolFactory factory = ThreadPoolFactory.getInstance(config);
		
		for(int i = 0;i<100;i++){
			factory.addTask(new ThreadTask(i+"-i"));
		}
		System.out.println("i add is over!-------------------");
	}
}
6、测试比较:

可以看出创建的线程池对象和调用传递的线程池对象是相同的。

pool create = java.util.concurrent.ThreadPoolExecutor@de6f34
0-i, will sleep 0 s
Begin exception handler-----------
12-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34
Begin exception handler-----------
1-i, will sleep 0 s
The pool RejectedExecutionHandler = java.util.concurrent.ThreadPoolExecutor@de6f34


展开阅读全文
打赏
3
21 收藏
分享
加载中
更多评论
打赏
0 评论
21 收藏
3
分享
返回顶部
顶部