文档章节

java.util.concurrent包API学习笔记

浮躁的码农
 浮躁的码农
发布于 2015/08/13 12:58
字数 2335
阅读 12
收藏 0

newFixedThreadPool

创建一个固定大小的线程池。

shutdown():用于关闭启动线程,如果不调用该语句,jvm不会关闭。

awaitTermination():用于等待子线程结束,再继续执行下面的代码。该例中我设置一直等着子线程结束。

 

 

Java代码  收藏代码

  1. public class Test {  

  2.   

  3.     public static void main(String[] args) throws IOException, InterruptedException {  

  4.         ExecutorService service = Executors.newFixedThreadPool(2);  

  5.         for (int i = 0; i < 4; i++) {  

  6.             Runnable run = new Runnable() {  

  7.                 @Override  

  8.                 public void run() {  

  9.                     System.out.println("thread start");  

  10.                 }  

  11.             };  

  12.             service.execute(run);  

  13.         }  

  14.         service.shutdown();  

  15.         service.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);  

  16.         System.out.println("all thread complete");  

  17.     }  

  18. }  

 

 

 

 输出:

thread start
thread start
thread start
thread start
all thread complete

newScheduledThreadPool

这个先不说,我喜欢用spring quartz.

CyclicBarrier

假设有只有的一个场景:每个线程代表一个跑步运动员,当运动员都准备好后,才一起出发,只要有一个人没有准备好,大家都等待.

 

 

Java代码  收藏代码

  1. import java.io.IOException;  

  2. import java.util.Random;  

  3. import java.util.concurrent.BrokenBarrierException;  

  4. import java.util.concurrent.CyclicBarrier;  

  5. import java.util.concurrent.ExecutorService;  

  6. import java.util.concurrent.Executors;  

  7.   

  8. class Runner implements Runnable {  

  9.   

  10.     private CyclicBarrier barrier;  

  11.   

  12.     private String name;  

  13.   

  14.     public Runner(CyclicBarrier barrier, String name) {  

  15.         super();  

  16.         this.barrier = barrier;  

  17.         this.name = name;  

  18.     }  

  19.   

  20.     @Override  

  21.     public void run() {  

  22.         try {  

  23.             Thread.sleep(1000 * (new Random()).nextInt(8));  

  24.             System.out.println(name + " 准备OK.");  

  25.             barrier.await();  

  26.         } catch (InterruptedException e) {  

  27.             e.printStackTrace();  

  28.         } catch (BrokenBarrierException e) {  

  29.             e.printStackTrace();  

  30.         }  

  31.         System.out.println(name + " Go!!");  

  32.     }  

  33. }  

  34.   

  35. public class Race {  

  36.   

  37.     public static void main(String[] args) throws IOException, InterruptedException {  

  38.         CyclicBarrier barrier = new CyclicBarrier(3);  

  39.   

  40.         ExecutorService executor = Executors.newFixedThreadPool(3);  

  41.         executor.submit(new Thread(new Runner(barrier, "zhangsan")));  

  42.         executor.submit(new Thread(new Runner(barrier, "lisi")));  

  43.         executor.submit(new Thread(new Runner(barrier, "wangwu")));  

  44.   

  45.         executor.shutdown();  

  46.     }  

  47.   

  48. }  

 

 

输出:

wangwu 准备OK.
zhangsan 准备OK.
lisi 准备OK.
lisi Go!!
zhangsan Go!!
wangwu Go!!

ThreadPoolExecutor

 

newFixedThreadPool生成一个固定的线程池,顾名思义,线程池的线程是不会释放的,即使它是Idle。这就会产生性能问题,比如如果线程池的大小为200,当全部使用完毕后,所有的线程会继续留在池中,相应的内存和线程切换(while(true)+sleep循环)都会增加。如果要避免这个问题,就必须直接使用ThreadPoolExecutor()来构造。可以像Tomcat的线程池一样设置“最大线程数”、“最小线程数”和“空闲线程keepAlive的时间”。

 

 

ThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue, RejectedExecutionHandler handler)  

 

corePoolSize:池中所保存的线程数,包括空闲线程(非最大同时干活的线程数)。如果池中线程数多于 corePoolSize,则这些多出的线程在空闲时间超过 keepAliveTime 时将会终止。

maximumPoolSize:线程池中最大线程数

keepAliveTime:线程空闲回收的时间

unit:keepAliveTime的单位

workQueue:保存任务的队列,可以如下选择:

 

  •   无界队列: new LinkedBlockingQueue<Runnable>();

  •   有界队列: new ArrayBlockingQueue<Runnable>(8);你不想让客户端无限的请求吃光你的CPU和内存吧,那就用有界队列

handler:当提交任务数大于队列size会抛出RejectedExecutionException,可选的值为:

 

  • ThreadPoolExecutor.CallerRunsPolicy 等待队列空闲

  • ThreadPoolExecutor.DiscardPolicy:丢弃要插入队列的任务

  • ThreadPoolExecutor.DiscardOldestPolicy:删除队头的任务

关于corePoolSize和maximumPoolSize:

 

 Java官方Docs写道:

当新任务在方法 execute(java.lang.Runnable) 中提交时,如果运行的线程少于 corePoolSize,则创建新线程来处理请求(即使存在空闲线程)。如果运行的线程多于 corePoolSize 而少于 maximumPoolSize,则仅当队列(queue)满时才创建新线程。如果设置的 corePoolSize 和 maximumPoolSize 相同,则创建了固定大小的线程池。如果将 maximumPoolSize 设置为基本的无界值(如 Integer.MAX_VALUE),则允许池适应任意数量的并发任务。

 

 

Java代码  收藏代码

  1. public class Test {  

  2.   

  3.     public static void main(String[] args) {  

  4.         BlockingQueue<Runnable> queue = new LinkedBlockingQueue<Runnable>();  

  5.         ThreadPoolExecutor executor = new ThreadPoolExecutor(361, TimeUnit.DAYS, queue);  

  6.   

  7.         for (int i = 0; i < 20; i++) {  

  8.             final int index = i;  

  9.             executor.execute(new Runnable() {  

  10.                 public void run() {  

  11.                     try {  

  12.                         Thread.sleep(4000);  

  13.                     } catch (InterruptedException e) {  

  14.                         e.printStackTrace();  

  15.                     }  

  16.                     System.out.println(String.format("thread %d finished", index));  

  17.                 }  

  18.             });  

  19.         }  

  20.         executor.shutdown();  

  21.     }  

  22. }  

 

原子变量(Atomic )

并发库中的BlockingQueue是一个比较好玩的类,顾名思义,就是阻塞队列。该类主要提供了两个方法put()和take(),前者将一个对象放到队列中,如果队列已经满了,就等待直到有空闲节点;后者从head取一个对象,如果没有对象,就等待直到有可取的对象。

 

下面的例子比较简单,一个读线程,用于将要处理的文件对象添加到阻塞队列中,另外四个写线程用于取出文件对象,为了模拟写操作耗时长的特点,特让线程睡眠一段随机长度的时间。另外,该Demo也使用到了线程池和原子整型(AtomicInteger),AtomicInteger可以在并发情况下达到原子化更新,避免使用了synchronized,而且性能非常高。由于阻塞队列的put和take操作会阻塞,为了使线程退出,在队列中添加了一个“标识”,算法中也叫“哨兵”,当发现这个哨兵后,写线程就退出。

 

 

Java代码  收藏代码

  1. import java.io.File;  

  2. import java.io.FileFilter;  

  3. import java.util.concurrent.BlockingQueue;  

  4. import java.util.concurrent.ExecutorService;  

  5. import java.util.concurrent.Executors;  

  6. import java.util.concurrent.LinkedBlockingQueue;  

  7. import java.util.concurrent.atomic.AtomicInteger;  

  8.   

  9. public class Test {  

  10.   

  11.     static long randomTime() {  

  12.         return (long) (Math.random() * 1000);  

  13.     }  

  14.   

  15.     public static void main(String[] args) {  

  16.         // 能容纳100个文件  

  17.         final BlockingQueue<File> queue = new LinkedBlockingQueue<File>(100);  

  18.         // 线程池  

  19.         final ExecutorService exec = Executors.newFixedThreadPool(5);  

  20.         final File root = new File("D:\\dist\\blank");  

  21.         // 完成标志  

  22.         final File exitFile = new File("");  

  23.         // 读个数  

  24.         final AtomicInteger rc = new AtomicInteger();  

  25.         // 写个数  

  26.         final AtomicInteger wc = new AtomicInteger();  

  27.         // 读线程  

  28.         Runnable read = new Runnable() {  

  29.             public void run() {  

  30.                 scanFile(root);  

  31.                 scanFile(exitFile);  

  32.             }  

  33.   

  34.             public void scanFile(File file) {  

  35.                 if (file.isDirectory()) {  

  36.                     File[] files = file.listFiles(new FileFilter() {  

  37.                         public boolean accept(File pathname) {  

  38.                             return pathname.isDirectory() || pathname.getPath().endsWith(".log");  

  39.                         }  

  40.                     });  

  41.                     for (File one : files)  

  42.                         scanFile(one);  

  43.                 } else {  

  44.                     try {  

  45.                         int index = rc.incrementAndGet();  

  46.                         System.out.println("Read0: " + index + " " + file.getPath());  

  47.                         queue.put(file);  

  48.                     } catch (InterruptedException e) {  

  49.                     }  

  50.                 }  

  51.             }  

  52.         };  

  53.         exec.submit(read);  

  54.         // 四个写线程  

  55.         for (int index = 0; index < 4; index++) {  

  56.             // write thread  

  57.             final int num = index;  

  58.             Runnable write = new Runnable() {  

  59.                 String threadName = "Write" + num;  

  60.   

  61.                 public void run() {  

  62.                     while (true) {  

  63.                         try {  

  64.                             Thread.sleep(randomTime());  

  65.                             int index = wc.incrementAndGet();  

  66.                             File file = queue.take();  

  67.                             // 队列已经无对象  

  68.                             if (file == exitFile) {  

  69.                                 // 再次添加"标志",以让其他线程正常退出  

  70.                                 queue.put(exitFile);  

  71.                                 break;  

  72.                             }  

  73.                             System.out.println(threadName + ": " + index + " " + file.getPath());  

  74.                         } catch (InterruptedException e) {  

  75.                         }  

  76.                     }  

  77.                 }  

  78.   

  79.             };  

  80.             exec.submit(write);  

  81.         }  

  82.         exec.shutdown();  

  83.     }  

  84.   

  85. }  

 

CountDownLatch

 

从名字可以看出,CountDownLatch是一个倒数计数的锁,当倒数到0时触发事件,也就是开锁,其他人就可以进入了。在一些应用场合中,需要等待某个条件达到要求后才能做后面的事情;同时当线程都完成后也会触发事件,以便进行后面的操作。 

CountDownLatch最重要的方法是countDown()和await(),前者主要是倒数一次,后者是等待倒数到0,如果没有到达0,就只有阻塞等待了。

一个CountDouwnLatch实例是不能重复使用的,也就是说它是一次性的,锁一经被打开就不能再关闭使用了,如果想重复使用,请考虑使用CyclicBarrier。

下面的例子简单的说明了CountDownLatch的使用方法,模拟了100米赛跑,10名选手已经准备就绪,只等裁判一声令下。当所有人都到达终点时,比赛结束。

 

 

Java代码  收藏代码

  1. import java.util.concurrent.CountDownLatch;  

  2. import java.util.concurrent.ExecutorService;  

  3. import java.util.concurrent.Executors;  

  4.   

  5. public class Test {  

  6.   

  7.     public static void main(String[] args) throws InterruptedException {  

  8.   

  9.         // 开始的倒数锁  

  10.         final CountDownLatch begin = new CountDownLatch(1);  

  11.   

  12.         // 结束的倒数锁  

  13.         final CountDownLatch end = new CountDownLatch(10);  

  14.   

  15.         // 十名选手  

  16.         final ExecutorService exec = Executors.newFixedThreadPool(10);  

  17.   

  18.         for (int index = 0; index < 10; index++) {  

  19.             final int NO = index + 1;  

  20.             Runnable run = new Runnable() {  

  21.                 public void run() {  

  22.                     try {  

  23.                         begin.await();  

  24.                         Thread.sleep((long) (Math.random() * 10000));  

  25.                         System.out.println("No." + NO + " arrived");  

  26.                     } catch (InterruptedException e) {  

  27.                     } finally {  

  28.                         end.countDown();  

  29.                     }  

  30.                 }  

  31.             };  

  32.             exec.submit(run);  

  33.         }  

  34.         System.out.println("Game Start");  

  35.         begin.countDown();  

  36.         end.await();  

  37.         System.out.println("Game Over");  

  38.         exec.shutdown();  

  39.     }  

  40.   

  41. }  

 

使用Callable和Future实现线程等待和多线程返回值

 

 假设在main线程启动一个线程,然后main线程需要等待子线程结束后,再继续下面的操作,我们会通过join方法阻塞main线程,代码如下:

 

Java代码  收藏代码

  1. Runnable runnable = ...;  

  2. Thread t = new Thread(runnable);  

  3. t.start();  

  4. t.join();  

  5. ......  

 通过JDK1.5线程池管理的线程可以使用Callable和Future实现(join()方法无法应用到在线程池线程)

 

Java代码  收藏代码

  1. import java.util.concurrent.Callable;  

  2. import java.util.concurrent.ExecutionException;  

  3. import java.util.concurrent.ExecutorService;  

  4. import java.util.concurrent.Executors;  

  5. import java.util.concurrent.Future;  

  6.   

  7. public class Test {  

  8.   

  9.     public static void main(String[] args) throws InterruptedException, ExecutionException {  

  10.         System.out.println("start main thread");  

  11.         final ExecutorService exec = Executors.newFixedThreadPool(5);  

  12.           

  13.         Callable<String> call = new Callable<String>() {  

  14.             public String call() throws Exception {  

  15.                 System.out.println("  start new thread.");  

  16.                 Thread.sleep(1000 * 5);  

  17.                 System.out.println("  end new thread.");  

  18.                 return "some value.";  

  19.             }  

  20.         };  

  21.         Future<String> task = exec.submit(call);  

  22.         Thread.sleep(1000 * 2);  

  23.         task.get(); // 阻塞,并待子线程结束,  

  24.         exec.shutdown();  

  25.         exec.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS);  

  26.         System.out.println("end main thread");  

  27.     }  

  28.   

  29. }  

 

Java代码  收藏代码

  1. import java.util.ArrayList;  

  2. import java.util.List;  

  3. import java.util.concurrent.Callable;  

  4. import java.util.concurrent.ExecutionException;  

  5. import java.util.concurrent.ExecutorService;  

  6. import java.util.concurrent.Executors;  

  7. import java.util.concurrent.Future;  

  8.   

  9. /** 

  10. * 多线程返回值测试 

  11. */  

  12. public class ThreadTest {  

  13.   

  14.     public static void main(String[] args) throws InterruptedException, ExecutionException {  

  15.         System.out.println("start main thread");  

  16.         int threadCount = 5;  

  17.         final ExecutorService exec = Executors.newFixedThreadPool(threadCount);  

  18.   

  19.         List<Future<Integer>> tasks = new ArrayList<Future<Integer>>();  

  20.         for (int i = 0; i < threadCount; i++) {  

  21.             Callable<Integer> call = new Callable<Integer>() {  

  22.                 public Integer call() throws Exception {  

  23.                     Thread.sleep(1000);  

  24.                     return 1;  

  25.                 }  

  26.             };  

  27.             tasks.add(exec.submit(call));  

  28.         }  

  29.         long total = 0;  

  30.         for (Future<Integer> future : tasks) {  

  31.             total += future.get();  

  32.         }  

  33.         exec.shutdown();  

  34.         System.out.println("total: " + total);  

  35.         System.out.println("end main thread");  

  36.     }  

  37. }  

 

 

CompletionService

这个东西的使用上很类似上面的example,不同的是,它会首先取完成任务的线程。下面的参考文章里,专门提到这个,大家有兴趣可以看下,例子:

 

 

Java代码  收藏代码

  1. import java.util.concurrent.Callable;  

  2. import java.util.concurrent.CompletionService;  

  3. import java.util.concurrent.ExecutionException;  

  4. import java.util.concurrent.ExecutorCompletionService;  

  5. import java.util.concurrent.ExecutorService;  

  6. import java.util.concurrent.Executors;  

  7. import java.util.concurrent.Future;  

  8.   

  9. public class Test {  

  10.     public static void main(String[] args) throws InterruptedException,  

  11.     ExecutionException {  

  12.         ExecutorService exec = Executors.newFixedThreadPool(10);  

  13.         CompletionService<String> serv =  

  14.         new ExecutorCompletionService<String>(exec);  

  15.         for (int index = 0; index < 5; index++) {  

  16.             final int NO = index;  

  17.             Callable<String> downImg = new Callable<String>() {  

  18.                 public String call() throws Exception {  

  19.                     Thread.sleep((long) (Math.random() * 10000));  

  20.                     return "Downloaded Image " + NO;  

  21.                 }  

  22.             };  

  23.             serv.submit(downImg);  

  24.         }  

  25.         Thread.sleep(1000 * 2);  

  26.         System.out.println("Show web content");  

  27.         for (int index = 0; index < 5; index++) {  

  28.             Future<String> task = serv.take();  

  29.             String img = task.get();  

  30.             System.out.println(img);  

  31.         }  

  32.         System.out.println("End");  

  33.         // 关闭线程池  

  34.         exec.shutdown();  

  35.     }  

  36. }  

 

 

Semaphore信号量

 

拿到信号量的线程可以进入代码,否则就等待。通过acquire()和release()获取和释放访问许可。下面的例子只允许5个线程同时进入执行acquire()和release()之间的代码

 

 

Java代码  收藏代码

  1. import java.util.concurrent.ExecutorService;  

  2. import java.util.concurrent.Executors;  

  3. import java.util.concurrent.Semaphore;  

  4.   

  5. public class Test {  

  6.   

  7.     public static void main(String[] args) {  

  8.         // 线程池  

  9.         ExecutorService exec = Executors.newCachedThreadPool();  

  10.         // 只能5个线程同时访问  

  11.         final Semaphore semp = new Semaphore(5);  

  12.         // 模拟20个客户端访问  

  13.         for (int index = 0; index < 20; index++) {  

  14.             final int NO = index;  

  15.             Runnable run = new Runnable() {  

  16.                 public void run() {  

  17.                     try {  

  18.                         // 获取许可  

  19.                         semp.acquire();  

  20.                         System.out.println("Accessing: " + NO);  

  21.                         Thread.sleep((long) (Math.random() * 10000));  

  22.                         // 访问完后,释放  

  23.                         semp.release();  

  24.                     } catch (InterruptedException e) {  

  25.                     }  

  26.                 }  

  27.             };  

  28.             exec.execute(run);  

  29.         }  

  30.         // 退出线程池  

  31.         exec.shutdown();  

  32.     }  

  33.   

  34. }  

 

 


本文转载自:

浮躁的码农

浮躁的码农

粉丝 71
博文 856
码字总数 156434
作品 0
松江
程序员
私信 提问
java基础巩固笔记(5)-多线程之线程并发库

java基础巩固笔记(5)-多线程之线程并发库 标签: java [TOC] 本文主要概述包下的相关类和使用方法 Package java.util.concurrent 原子性操作类 包下的类: Package java.util.concurrent.atom...

brianway
2016/02/06
1K
0
AsyncTask 学习笔记

Class Overview AsyncTask enables proper and easy use of the UI thread. This class allows to perform background operations and publish results on the UI thread without having to ......

Angels_安杰
2016/02/23
24
0
Java基础巩固笔记(8)-多线程之线程并发库

Contents java基础巩固笔记(5)-多线程之线程并发库 原子性操作类 线程池 Lock&Condition 同步工具 参考资料 本文主要概述包下的相关类和使用方法 Package java.util.concurrent 原子性操作类...

卟想苌亣
2017/12/04
0
0
JAVA并发编程JUC基础学习(简介)

之前写过一篇并发编程的简单实例应用,Future快速实现并发编程,可以很快的在自己的项目中应用,但并不系统,之前说过总结一篇(或者一系列)java.util.concurrent 这个并发编程工具类的学习...

小海bug
02/22
0
0
如何在MyBatis-3.2.7中使用Log4j2 rc2——MyBatis学习笔记之十九

前天我上传了我的MyBatis系列课程(http://edu.51cto.com/course/course_id-1110.html)的第六讲,主要内容是如何使用Log4j2(具体版本为v2.0-rc1)为MyBatis 3.2.7配置日志。实际上目前最新...

NashMaster2011
2014/07/09
0
0

没有更多内容

加载失败,请刷新页面

加载更多

从濒临解散到浴火重生,OceanBase 这十年经历了什么?

阿里妹导读:谈及国产自研数据库,就不得不提 OceanBase。与很多人想象不同的是,OceanBase 并非衔着金钥匙出生的宠儿。相反,它曾无人看好、困难重重,整个团队甚至数度濒临解散。 从危在旦...

阿里云云栖社区
30分钟前
2
0
比特币第三方API大全

在开发比特币应用时,除了使用自己搭建的节点,也可以利用第三方提供的比特币api,来获取市场行情、进行交易支付、查询账户余额等。这些第三方api不一定遵循标准的比特币rpc接口规范,但往往...

汇智网教程
41分钟前
1
0
Dozer:Dozer异常java.lang.ClassCastException

这个问题是个很难发现的问题,因为代码本身没有错误,但就是无法找到报错原因 现记录下这个报错 java.lang.ClassCastException:com.XXX.ObjectA cannot be cast to com.XXX.ObjectA 代码中并...

琴兽
今天
2
0
Feign Retryer的默认重试策略测试

1、Feign配置 @Configurationpublic class FeignConfig { @Value("${coupon_service.url:http://localhost:8081}") private String couponServiceUrl; @Bean publ......

moon888
今天
2
0
关于不同域名下的session共享问题

如果登录,首页,分类,列表,产品都在不同的二级域名下,主域名不变,一定要保证里面的版本问题,不能为了更新而更新,这样哪个下面的session都访问不了。

dragon_tech
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部