【JDK并发包基础】线程池详解

原创
2017/08/08 13:50
阅读数 547

        为了更好的控制多线程,JDK提供了一套线程框架Executor来帮助程序员有效的进行线程控制。Java.util.concurrent 包是专为 Java并发编程而设计的包,它下有很多编写好的工具:

                  

脑图地址,感谢深入浅出 Java Concurrency ,此脑图在这篇基础上修改而来。其中有一个比较重要的线程工厂类:Executors。 Executors工厂会提供常用四类线程池的创建。

       以前当我们每次执行一个任务时用new Thread,频繁创建对象会导致系统性能差,线程缺乏统一管理,可能无限制新建线程,相互之间竞争导致系统耗尽,并且缺乏定时任务,中断等功能。线程池可以有效的提高系统资源的使用率,同时避免过多资源竞争,重用存在的线程,减少对象创建。Java通过Executors创建不同功能的线程池,若Executors无法满足需求,我们也可以创建自定义的线程池。文章分为以下部分讲解:

       1.newFixedThreadPool()方法

       2. newSingleThreadExecutor()方法

       3.newCachedThreadPool()方法

       4.newScheduledThreadPool()方法

       5.自定义线程池

在讲述之前,因为上面5条均会用到ThreadPoolExecutor这个类,所以我们先来看看ThreadPoolExecutor中线程执行任务的示意图,它的执行任务分两种情况:

           

     1).Execute()方法会创建一个线程然后执行一个任务。

     2).这个线程在执行完1之后,会反复从BlockingQueue队列中获取任务来执行。如果图中所示三个线程同时间在执行任务,还有任务进来则会放入BlockingQueue队列中暂缓起来等待线程空闲去执行。再者,这3个线程正在使用,队列也满了的话(有界队列的情况),还有任务进来,则会实行拒绝策略。(take()和poll()都是取头元素节点,区别在于前者会删除元素,后者不会)

1.newFixedThreadPool()方法

       创建一个固定数量的线程池,里面的线程数始终不变,当有一个线程提交时,若线程池中有空闲的线程,则立即执行。若没有,则会暂缓在一个阻塞队列LinkedBlockingQueue中等待有空闲的线程去执行。newFixedThreadPool()方法的源码如下(LinkedBlockingQueue的详解可以看博主的上一篇文章:【JDK并发包基础】并发容器详解):

    public static ExecutorService newFixedThreadPool(int nThreads) {
        return new ThreadPoolExecutor(nThreads, //核心线程数
                                      nThreads,//最大线程数
                                      0L, //空闲时保持线程活着的时间
                                      TimeUnit.MILLISECONDS,//上述时间的单位
                                      new LinkedBlockingQueue<Runnable>());//线程池没空闲,则新任务放在这个队列里
    }

       现在我们思考一下:假如有Thread1、Thread2、Thread3、Thread4四条线程分别统计C、D、E、F四个盘的大小,所有线程都统计完毕交给Thread5线程去做汇总,应当如何实现?

       第一种方式是用join()来做,不推荐:

                    

       推荐使用线程池的方式:

public static void main(String[] args) throws InterruptedException { 
  //用CountDownLatch实现,CountDownLatch传入4相当于一个计时器,一个await需要4次countDown才能唤醒
  final CountDownLatch countDownLatch= new CountDownLatch(4);
	        Runnable run1= new Runnable() {
	            @Override
	            public void run() {
	                try {
	                    Thread.sleep(3000);
	                    System.out.println("统计C盘");
	                    countDownLatch.countDown();//单任务,把计数器减1
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	            }
	        };
	        Runnable run2= new Runnable() {
	            @Override
	            public void run() {
	                try {
	                    Thread.sleep(3000);
	                    System.out.println("统计D盘");
	                    countDownLatch.countDown();
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	            }
	        };
	        Runnable run3= new Runnable() {
	            @Override
	            public void run() {
	                try {
	                    Thread.sleep(3000);
	                    System.out.println("统计E盘");
	                    countDownLatch.countDown();
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	            }
	        };
	        Runnable run4= new Runnable() {
	            @Override
	            public void run() {
	                try {
	                    Thread.sleep(3000);
	                    System.out.println("统计F盘");
	                    countDownLatch.countDown();
	                } catch (InterruptedException e) {
	                    e.printStackTrace();
	                }
	            }
	        };
            //创建固定线程的线程池
	        ExecutorService service= Executors.newFixedThreadPool(4);
	        service.submit(run1);
	        service.submit(run2);
	        service.submit(run3);
	        service.submit(run4);
//	        new Thread(run1).start();
//	        new Thread(run2).start();
//	        new Thread(run3).start();
//	        new Thread(run4).start();
	        countDownLatch.await();//主线程,即第5线程等待
	        System.out.println("合计C,D,E,F");
	        service.shutdown();
}

       运行结果如下,统计前四个盘大小可以没有顺序,但合计始终在最后:

                                          

2. newSingleThreadExecutor()方法

       创建只有一个线程的线程池,若线程池中有空闲的线程,则立即执行。若没有,则会暂缓在一个阻塞队列LinkedBlockingQueue中等待有空闲的线程去执行,它保证所有任务按照提交顺序执行。我们来看看newSingleThreadExecutor方法的源码:

    public static ExecutorService newSingleThreadExecutor() {
        return new FinalizableDelegatedExecutorService//先不用关注这个
            (new ThreadPoolExecutor(1, //核心线程数
                                    1,//最大线程数
                                    0L,//空闲时保持线程活着的时间
                                    TimeUnit.MILLISECONDS,//上面时间的单位
                                    new LinkedBlockingQueue<Runnable>()));//当线程池没有空闲线程,就放在这个队列里
    }

       应用场景:这个线程池会在仅有的一个线程发生异常时,重新启动一个线程来替代原来的线程执行下去。

3.newCachedThreadPool()方法

       创建一个可根据实际情况调整线程个数的线程池,不限制线程数量。若有任务,则创建线程。若无任务,则不创建线程,并且每一个空闲的线程会在60秒后自动回收。我们来看看源码:

    public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0,//核心线程数,0表示初始化不创建线程
                                      Integer.MAX_VALUE,//int的最大值,表示不限制线程池容量
                                      60L,//缓存线程60秒
                                      TimeUnit.SECONDS,//单位
                                      new SynchronousQueue<Runnable>());
    }

       源码中的SynchronousQueue这个没有容量的队列一创建,内部就使用take()方法阻塞着,当有一个线程来了直接就执行。

4.newScheduledThreadPool()方法

       创建一个大小无限的线程池,此线程池支持定时以及周期性执行任务的需求。它的源码如下:


public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
   return new ScheduledThreadPoolExecutor(corePoolSize);
}
public class ScheduledThreadPoolExecutor  extends ThreadPoolExecutor//注意这里继承了ThreadPoolExecutor
        implements ScheduledExecutorService {

   public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize,//核心线程数,传入
              Integer.MAX_VALUE,//int的最大值,表示不限制线程池容量
              0, //表示没有延迟
              TimeUnit.NANOSECONDS,//单位是纳秒
              new DelayedWorkQueue());
   }

}

       源码中的DelayedWorkQueue是带有延迟时间的一个队列,其中元素只有当指定时间到了,才能够从队列中获取元素,可以做定时的功能。

       创建一个任务,等3秒初始化后每隔1秒打印一句话:

public class ScheduledThread {
	public static void main(String args[]) throws Exception {
    	Temp command = new Temp();
    	//创建一个实现定时器的线程池
        ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
        //command表示具体的任务对象,第一个数字表示初始化的时间,第二个数字表示轮询的时间
        ScheduledFuture<?> scheduleTask = scheduler.scheduleWithFixedDelay(command, 3, 1, TimeUnit.SECONDS);
    }
}
class Temp extends Thread {
    public void run() {
        System.out.println("run");
    }
}

       这个类似于Java的Timer定时器,但项目中用Quartz,跟Spring整合的话,最好用@Scheduled注解。ref:Spring Schedule 任务调度实现

5.自定义线程池

        在上述Executors工厂类创建线程池时,它的创建线程方法内部实现均用了ThreadPoolExecutor这个类,ThreadPoolExecutor可以实现自定义线程池,它的构造方法如下:

 public ThreadPoolExecutor(int corePoolSize,//核心线程数
                           int maximumPoolSize,//最大线程数
                           long keepAliveTime,//线程保持多久
                           TimeUnit unit,//单位
                           BlockingQueue<Runnable> workQueue,//线程池功能
                           ThreadFactory threadFactory,//先不关注这个
                           RejectedExecutionHandler handler)//拒绝策略,比如超过最大线程数了,可以告诉客户服务器繁忙
                             {...}

       这个构造方法对于BlockingQueue队列是什么类型比较关键,它关乎这个自定义线程池的功能。

       1.使用有界队列ArrayBlockingQueue时,实际线程数小于corePoolSize时,则创建线程。若大于corePoolSize时,则任务会加入BlockingQueue队列中,若队列已满,则在实际线程总数不大于maximumPoolSize时,创建新线程。若还大于maximumPoolSize,则执行拒绝策略,或者自定义的其他方式。

       2.使用无界队列LinkedBlockingQueue时,缓冲队列,当实际线程超过corePoolSize核心线程数后放置等待的线程,最后等系统空闲了在这个队列里取,maximumPoolSize参数在这里就没有作用了。因为它是无界队列,所以除非系统资源耗尽,否则不会出现任务入队失败的情况。比如创建任务的速度和处理速度差异很大,无界队列会保持快速增长,直到系统内存耗尽。

       有界队列和无界队列实例如下:

public class ThreadPoolExecutorDemo implements Runnable{
	private static AtomicInteger count = new AtomicInteger(0);
	
	@Override
	public void run() {
		try {
			int temp = count.incrementAndGet();
			System.out.println("任务" + temp);
			Thread.sleep(2000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
	public static void main(String[] args) throws Exception{
		BlockingQueue<Runnable> queue = 
				new LinkedBlockingQueue<Runnable>();
				//new ArrayBlockingQueue<Runnable>(10);
		ExecutorService executor  = new ThreadPoolExecutor(
					5, 		//corePoolSize
					10, 	//使用无界队列LinkedBlockingQueue时,maximumPoolSize这个参数值不起作用
					120L, 	//2分钟
					TimeUnit.SECONDS,
					queue);
		
		for(int i = 0 ; i < 15; i++){//提交15个任务
			executor.execute(new ThreadPoolExecutorDemo());
		}
		Thread.sleep(1000);
		System.out.println("queue size:" + queue.size());
		executor.shutdown();
	}
}

       用LinkedBlockingQueue无界队列执行后结果是每过一段时间5个任务一执行:

                                   

对于拒绝策略,即当任务数量超过了系统实际承载能力时该如何处理呢?JDK提供了几种实现策略:

       AbortPolicy:直接抛出异常来阻止系统正常工作。

       CallerRunsPolicy:只要线程池未关闭,会把丢弃的任务先执行。

       DiscardOledestPolicy:丢弃最老的一个请求,尝试再次提交当前任务

       DiscardPolicy:丢弃无法处理的任务,不给于任何处理。

这四种策略个人觉得都不太好,我们可以实现一个自定义策略,在这里实现RejectedExecutionHandler接口就好了:

public class MyThreadPoolExecutor {
	public static void main(String[] args) {
		ThreadPoolExecutor pool = new ThreadPoolExecutor(
				1, 				//coreSize
				2, 				//MaxSize
				60, 			//60
				TimeUnit.SECONDS, 
				new ArrayBlockingQueue<Runnable>(3)			//指定一种队列 (有界队列)
				//new LinkedBlockingQueue<Runnable>()
				, new MyRejected()
				//, new DiscardOldestPolicy()//直接抛出异常
				);
		
		MyTask mt1 = new MyTask(1, "任务1");//第一个任务会直接执行
		MyTask mt2 = new MyTask(2, "任务2");//第二个任务会放入队列里,等第一个任务执行完以后才执行
		MyTask mt3 = new MyTask(3, "任务3");//因为队列里有三个容量,所以任务3也会放入队列里
		MyTask mt4 = new MyTask(4, "任务4");//因为队列里有三个容量,所以任务4也会放入队列里
		MyTask mt5 = new MyTask(5, "任务5");//假如有5个任务,任务1和5同时执行,任务234放在队列里
		MyTask mt6 = new MyTask(6, "任务6");//队列满了,线程池的最大线程数也超过了,则会实行拒绝策略
		
		pool.execute(mt1);
		pool.execute(mt2);
		pool.execute(mt3);
		pool.execute(mt4);
		pool.execute(mt5);
		pool.execute(mt6);
		
		pool.shutdown();
	}
}
class MyRejected implements RejectedExecutionHandler{
	@Override
	//传入当前任务对象和当前线程池对象
	public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
		//1.可以做一些处理,比如用http再创建请求给传数据的客户端,让它重新发送任务。高峰期的时候,系统已经超负荷了,不建议再发送请求
		//2.只是记录日志:id及相关重要的信息,暂缓到磁盘上,在不是高峰期的时候跑一些定时的job解析日志,把没处理的任务再处理一遍或者批处理下,一般用这个
		System.out.println("自定义处理..");
		System.out.println("当前被拒绝任务为:" + r.toString());
	}
}
class MyTask implements Runnable {
	private int taskId;
	private String taskName;
	
	public MyTask(int taskId, String taskName){this.taskId = taskId;this.taskName = taskName;}
	public int getTaskId() {return taskId;}
	public void setTaskId(int taskId) {this.taskId = taskId;}
	public String getTaskName() {return taskName;}
	public void setTaskName(String taskName) {this.taskName = taskName;}
	
	@Override
	public void run() {
		try {
			System.out.println("run taskId =" + this.taskId);
			Thread.sleep(3000);
			//System.out.println("end taskId =" + this.taskId);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}		
	}
	public String toString(){
		return Integer.toString(this.taskId);
	}
}

        运行结果如下:

                                             

        到这里已经介绍完了Java并发包下的线程池,博主是个普通的程序猿,水平有限,文章难免有错误,欢迎牺牲自己宝贵时间的读者,就本文内容直抒己见。

系列:

【JDK并发包基础】线程池详解

【JDK并发包基础】并发容器详解

【JDK并发包基础】工具类详解

【JDK并发基础】Java内存模型详解

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
7 收藏
2
分享
返回顶部
顶部