/**
* 同时开始并行 执行任务,如果有一个异常则退出
* @author Administrator
*
*/
public class MyThreadStartTogether {
public static void main(String[] args) throws Exception {
testThread(50);
}
public static boolean testThread(int num) throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(num);
final AtomicBoolean flag = new AtomicBoolean(false);
final CountDownLatch down = new CountDownLatch(num);
final Semaphore se = new Semaphore(0);
for(int i =0; i < num; i++) {
executorService.execute(new Runnable() {
@Override
public void run() {
try {
se.acquire();
if(!flag.get()) {
TimeUnit.MICROSECONDS.sleep(100);
if(Thread.currentThread().getId() == 40) {
throw new RuntimeException("测试异常");
}
System.out.println("Thread=="+Thread.currentThread().getId()+":www.ebnew.com");
}
}catch (Exception e) {
flag.set(true);
e.printStackTrace();
} finally {
down.countDown();
}
}
});
}
se.release(num);
System.out.println("==================开始=================");
down.await();
executorService.shutdown();
return flag.get();
}
}
public class TestInvokeAny {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// invokeAny1();
// invokeAny2();
// invokeAny3();
invokeAnyTimeout();
}
/**
* 还没有到超时之前,所以的任务都已经异常完成,抛出ExecutionException<br>
* 如果超时前满,还没有没有完成的任务,抛TimeoutException
*/
public static void invokeAnyTimeout() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
tasks.add(new ExceptionCallable());
tasks.add(new ExceptionCallable());
tasks.add(new ExceptionCallable());
tasks.add(new ExceptionCallable());
String result = executorService.invokeAny(tasks, 2, TimeUnit.SECONDS);
System.out.println("result=" + result);
executorService.shutdown();
}
/**
* 有异常的任务,有正常的任务,invokeAny()不会抛异常,返回最先正常完成的任务
*/
public static void invokeAny3() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
tasks.add(new ExceptionCallable());
tasks.add(new ExceptionCallable());
tasks.add(new ExceptionCallable());
tasks.add(new ExceptionCallable());
tasks.add(new SleepSecondsCallable("t1", 2));
String result = executorService.invokeAny(tasks);
System.out.println("result=" + result);
executorService.shutdown();
}
/**
* 没有1个正常完成的任务,invokeAny()方法抛出ExecutionException,封装了任务中元素的异常
*
*/
public static void invokeAny2() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
tasks.add(new ExceptionCallable());
tasks.add(new ExceptionCallable());
tasks.add(new ExceptionCallable());
String result = executorService.invokeAny(tasks);
System.out.println("result=" + result);
executorService.shutdown();
}
/**
* 提交的任务集合,一旦有1个任务正常完成(没有抛出异常),会终止其他未完成的任务
*/
public static void invokeAny1() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(3);
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
tasks.add(new SleepSecondsCallable("t1", 2));
tasks.add(new SleepSecondsCallable("t2", 1));
String result = executorService.invokeAny(tasks);
System.out.println("result=" + result);
executorService.shutdown();
}
}
public class TestInvokeAll {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// testInvokeAll();
// testInvokeAllTimeout();
testInvokeAllWhenInterrupt();
}
/**
* 如果线程在等待invokeAll()执行完成的时候,调用线程被中断,会抛出InterruptedException<br>
* 此时线程池会终止没有完成的任务,这主要是为了减少资源的浪费.
*/
public static void testInvokeAllWhenInterrupt() throws Exception {
final ExecutorService executorService = Executors.newFixedThreadPool(5);
// 调用invokeAll的线程
Thread invokeAllThread = new Thread() {
@Override
public void run() {
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
tasks.add(new SleepSecondsCallable("t1", 2));
tasks.add(new SleepSecondsCallable("t2", 2));
tasks.add(new RandomTenCharsTask());
// 调用线程会阻塞,直到tasks全部执行完成(正常完成/异常退出)
try {
List<Future<String>> results = executorService
.invokeAll(tasks);
System.out.println("wait for the result." + results.size());
} catch (InterruptedException e) {
System.out
.println("I was wait,but my thread was interrupted.");
e.printStackTrace();
}
}
};
invokeAllThread.start();
Thread.sleep(200);
invokeAllThread.interrupt();
executorService.shutdown();
}
/**
* 可以通过Future.isCanceled()判断任务是被取消,还是完成(正常/异常)<br>
* Future.isDone()总是返回true,对于invokeAll()的调用者来说,没有啥用
*/
public static void testInvokeAllTimeout() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
tasks.add(new SleepSecondsCallable("t1", 2));
tasks.add(new SleepSecondsCallable("t2", 2));
tasks.add(new SleepSecondsCallable("t3", 1));
// tasks.add(new RandomTenCharsTask());
List<Future<String>> results = executorService.invokeAll(tasks, 1,
TimeUnit.SECONDS);
System.out.println("wait for the result." + results.size());
for (Future<String> f : results) {
System.out.println("isCanceled=" + f.isCancelled() + ",isDone="
+ f.isDone());
}
executorService.shutdown();
}
/**
* 程序的执行结果和一些结论,已经直接写在代码注释里面了。invokeAll是一个阻塞方法,会等待任务列表中的所有任务都执行完成。不管任务是正常完成,
* 还是异常终止,Future.isDone()始终返回true。通过Future.isCanceled()可以判断任务是否在执行的过程中被取消。
* 通过Future.get()可以获取任务的返回结果,或者是任务在执行中抛出的异常。
*
* @throws Exception
*/
public static void testInvokeAll() throws Exception {
ExecutorService executorService = Executors.newFixedThreadPool(5);
List<Callable<String>> tasks = new ArrayList<Callable<String>>();
tasks.add(new SleepSecondsCallable("t1", 2));
tasks.add(new SleepSecondsCallable("t2", 2));
tasks.add(new RandomTenCharsTask());
tasks.add(new ExceptionCallable());
// 调用该方法的线程会阻塞,直到tasks全部执行完成(正常完成/异常退出)
List<Future<String>> results = executorService.invokeAll(tasks);
// 任务列表中所有任务执行完毕,才能执行该语句
System.out.println("wait for the result." + results.size());
executorService.shutdown();
for (Future<String> f : results) {
// isCanceled=false,isDone=true
System.out.println("isCanceled=" + f.isCancelled() + ",isDone="
+ f.isDone());
// ExceptionCallable任务会报ExecutionException
System.out.println("task result=" + f.get());
}
}
}
public class SleepSecondsCallable implements Callable<String> {
private String name;
private int seconds;
public SleepSecondsCallable(String name, int seconds) {
this.name = name;
this.seconds = seconds;
}
public String call() throws Exception {
System.out.println(name + ",begin to execute");
try {
TimeUnit.SECONDS.sleep(seconds);
} catch (InterruptedException e) {
System.out.println(name + " was disturbed during sleeping.");
e.printStackTrace();
return name + "_SleepSecondsCallable_failed";
}
System.out.println(name + ",success to execute");
return name + "_SleepSecondsCallable_succes";
}
}
public class RandomTenCharsTask implements Callable<String> {
@Override
public String call() throws Exception {
System.out.println("RandomTenCharsTask begin to execute...");
StringBuffer content = new StringBuffer();
String base = "ssssssssssssssssssssssss";
Random random = new Random();
for (int i = 0; i < 10; i++) {
int number = random.nextInt(base.length());
content.append(base.charAt(number));
}
System.out.println("RandomTenCharsTask complete.result=" + content);
return content.toString();
}
}
public class ExceptionCallable implements Callable<String> {
private String name = null;
public ExceptionCallable() {
}
public ExceptionCallable(String name) {
this.name = name;
}
@Override
public String call() throws Exception {
System.out.println("开始执行...");
System.out.println(name.length());
System.out.println("结束执行.");
return name;
}
}