文档章节

线程池整理

算法之名
 算法之名
发布于 10/18 11:46
字数 3630
阅读 54
收藏 0

一般在生产环境中,我们都不会直接new一个Thread,然后再去start(),因为这么做会不断频繁的创建线程,销毁线程,过大的线程会耗尽CPU和内存资源,大量的垃圾回收,也会给GC带来压力,延长GC停顿时间.

1、固定大小线程池

public class ThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService es = Executors.newFixedThreadPool(5);
        for (int i = 0;i < 10;i++) {
            es.submit(task);
        }
        es.shutdown();
    }
}

运行结果:

1539134496389:Thread ID:11
1539134496389:Thread ID:12
1539134496389:Thread ID:13
1539134496389:Thread ID:14
1539134496389:Thread ID:15
1539134497390:Thread ID:14
1539134497390:Thread ID:12
1539134497390:Thread ID:15
1539134497390:Thread ID:13
1539134497390:Thread ID:11

结果解读:运行结果并不是一次刷出来的,而是刷出了5个,中间会停顿1秒,再刷出5个,说明,并行处理是5个线程执行一次,然后再并行处理5个。

将Executors.newFixedThreadPool改成Executors.newCachedThreadPool()

public class ThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(1000);
            }catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) {
        MyTask task = new MyTask();
        ExecutorService es = Executors.newCachedThreadPool();
        for (int i = 0;i < 10;i++) {
            es.submit(task);
        }
        es.shutdown();
    }
}

结果相同,但是是同时并行处理的,中间没有停顿,说明newCachedThreadPool()是根据需要来分配线程数的。

2、计划任务

newScheduledThreadPool()有两个方法来调用线程对象,scheduleAtFixedRate()跟scheduleWithFixedDelay().他们之间的差别就是scheduleAtFixedRate()总共只占用调度时间,而scheduleWithFixedDelay()占用的是线程执行时间加调度时间.但如果scheduleAtFixedRate()的线程执行时间大于调度时间,也不会出现重复调度(即一个线程还没有执行完,另外一个线程会启动),而是一个线程执行完,另一个线程马上启动.

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleAtFixedRate(new Runnable() {
            public void run() {
                try {
                    long start = System.currentTimeMillis();
                    Thread.sleep(2000);
                    System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName());
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

运行结果(部分截取)

2001:pool-1-thread-1
2000:pool-1-thread-1
2000:pool-1-thread-2
2000:pool-1-thread-1
2001:pool-1-thread-3
2000:pool-1-thread-2

结果解读:尽管有时间调度,他们依然是不同的线程来运行的,每显示一条中间停顿2秒(线程运行时间也是2秒)

public class ScheduledExecutorServiceDemo {
    public static void main(String[] args) {
        ScheduledExecutorService ses = Executors.newScheduledThreadPool(10);
        ses.scheduleWithFixedDelay(new Runnable() {
            public void run() {
                try {
                    long start = System.currentTimeMillis();
                    Thread.sleep(2000);
                    System.out.println((System.currentTimeMillis() -start) +":" + Thread.currentThread().getName());
                }catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
        },0,2, TimeUnit.SECONDS);
    }
}

运行结果与之前相同,但是每显示一条的时间间隔为4秒(线程运行时间依然为2秒),其中2秒为调度时间,2秒为运行时间.

3、核心线程池的内部实现。

其实不论是Executors工厂的哪种实现,都是调用了同一个类ThreadPoolExecutor,使用了不同的构造参数罢了.不同的构造参数可以产生不同种类的线程池,因此我们也可以自定义线程池.

JDK实现

public static ExecutorService newFixedThreadPool(int nThreads) {
    return new ThreadPoolExecutor(nThreads, nThreads,
                                  0L, TimeUnit.MILLISECONDS,
                                  new LinkedBlockingQueue<Runnable>());
}
public static ExecutorService newSingleThreadExecutor() {
    return new FinalizableDelegatedExecutorService
        (new ThreadPoolExecutor(1, 1,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>()));
}
public static ExecutorService newCachedThreadPool() {
    return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                  60L, TimeUnit.SECONDS,
                                  new SynchronousQueue<Runnable>());
}

拒绝策略

当线程池任务数量超过系统实际承载能力时,可以启用拒绝策略。

直接中断策略

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor()的最后一个参数为中断策略,上面的new ThreadPoolExecutor.AbortPolicy()为直接中断!

参数说明:

第一个参数corePoolSize:指定了线程池中的线程数量.

第二个参数maximumPoolSize:指定了线程池中的最大线程数量.

第三个参数KeepAliveTime:当线程池线程数量超过了corePoolSize时,多余的空闲线程的存活时间.即超过corePoolSize的空闲线程,在多长时间内会被销毁.

第四个参数unit:keepAliveTime的单位.

第五个参数workQueue:任务队列,被提交但尚未被执行的任务.

1,直接提交的队列:SynchronousQueue,无容量,每一个插入操作都要等待一个删除操作,提交的任务不会被真实保存,总是将新任务提交给线程执行.如果没有空闲进程,则尝试创建新的进程.如果进程数量达到最大,则执行拒绝策略.

2,有界的任务队列:ArrayBlockingQueue,必须带一个容量参数,表示该队列的最大容量.当线程池的实际线程数小于corePoolSize,会优先创建新的线程,若大于corePoolSize,则会将新任务加入到等待队列.若等待队列满的时候,无法加入,则在总线程数不大于maximumPoolSize的前提下,创建新的进程执行任务.若大于maximumPoolSize,执行拒绝策略.

3,无界的任务队列:LinkedBlockingQueue,除非系统资源耗尽,不存在任务入队失败的情况.当线程池的实际线程数小于corePoolSize,会优先创建新的线程,若大于corePoolSize,则会将新任务加入到等待队列,若任务的创建和处理的速度差异很大,无界队列会保持快速增长,直到耗尽系统内存.

4,优先任务队列:PriorityBlockingQueue,可以控制任务执行的先后顺序.是一个特殊的无界队列.无论是ArrayBlockingQueue还是LinkedBlockingQueue都是按照先进先出算法处理任务的,而PriorityBlockingQueue则可以根据任务自身的优先级顺序先后执行,总是确保高优先级的任务先执行.

第六个参数threadFactory:线程工厂,用于创建线程,一般用默认的即可.

第七个参数handler:拒绝策略,当任务太多,来不及处理,如何拒绝任务.

运行结果

1539153799420:Thread ID:11
1539153799430:Thread ID:12
1539153799440:Thread ID:13
1539153799450:Thread ID:14
1539153799460:Thread ID:15
1539153799520:Thread ID:11
1539153799530:Thread ID:12
1539153799540:Thread ID:13
1539153799550:Thread ID:14
1539153799560:Thread ID:15
1539153799621:Thread ID:11
Exception in thread "main" java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.FutureTask@45ee12a7 rejected from java.util.concurrent.ThreadPoolExecutor@330bedb4[Running, pool size = 5, active threads = 5, queued tasks = 9, completed tasks = 6]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1369)
    at java.util.concurrent.AbstractExecutorService.submit(AbstractExecutorService.java:112)
    at com.guanjian.RejectThreadPoolDemo.main(RejectThreadPoolDemo.java:31)

结果解读:由于并发线程数量太大,Integer.MAX_VALUE,我们线程池的最大线程数只有5个,而无界任务队列LinkedBlockingQueue<Runnable>只有10个,无法满足快速的线程数量增长,拒绝策略发挥作用,抛出异常,阻止系统正常工作.

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.CallerRunsPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

new ThreadPoolExecutor.CallerRunsPolicy()只要线程池未关闭,该策略直接在调用者线程中,运行当前被丢弃的任务,但性能极有可能会急剧下降.

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardOldestPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor.DiscardOldestPolicy()该策略将丢弃最老的一个请求,也就是即将被执行的一个任务,并尝试再次提交当前任务.

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.DiscardPolicy());
//                new RejectedExecutionHandler() {
//                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
//                        System.out.println(r.toString() + " is discard");
//                    }
//                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

ThreadPoolExecutor.DiscardPolicy()丢弃无法处理的任务,不予任何处理.

自定义拒绝策略

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>(10),
                Executors.defaultThreadFactory(),
                new RejectedExecutionHandler() {
                    public void rejectedExecution(Runnable r, ThreadPoolExecutor executor) {
                        System.out.println(r.toString() + " is discard");
                    }
                });
        for (int i = 0;i < Integer.MAX_VALUE;i++) {
            es.submit(task);
            Thread.sleep(10);
        }
    }
}

运行结果:

1539159379178:Thread ID:11
1539159379187:Thread ID:12
1539159379197:Thread ID:13
1539159379207:Thread ID:14
1539159379217:Thread ID:15
1539159379279:Thread ID:11
1539159379288:Thread ID:12
1539159379301:Thread ID:13
1539159379308:Thread ID:14
1539159379318:Thread ID:15
1539159379379:Thread ID:11
1539159379388:Thread ID:12
1539159379401:Thread ID:13
java.util.concurrent.FutureTask@45ee12a7 is discard
1539159379408:Thread ID:14
1539159379418:Thread ID:15
java.util.concurrent.FutureTask@330bedb4 is discard
java.util.concurrent.FutureTask@2503dbd3 is discard
java.util.concurrent.FutureTask@4b67cf4d is discard
java.util.concurrent.FutureTask@7ea987ac is discard

这里只是比ThreadPoolExecutor.DiscardPolicy()多了打印出丢弃的任务.

自定义线程创建

public class RejectThreadPoolDemo {
    public static class MyTask implements Runnable {
        public void run() {
            System.out.println(System.currentTimeMillis() + ":Thread ID:" + Thread.currentThread().getId());
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        MyTask task = new MyTask();
        ExecutorService es = new ThreadPoolExecutor(5, 5, 0L, TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>(),
                new ThreadFactory() {
                    public Thread newThread(Runnable r) {
                        Thread t = new Thread(r);
                        t.setDaemon(true);
                        System.out.println("create " + t);
                        return t;
                    }
                });
        for (int i = 0; i < 5; i++) {
            es.submit(task);
        }
        Thread.sleep(2000);
    }
}

就是可以自己定义线程,如守护线程等等

运行结果:

create Thread[Thread-0,5,main]
create Thread[Thread-1,5,main]
create Thread[Thread-2,5,main]
create Thread[Thread-3,5,main]
create Thread[Thread-4,5,main]
1539159694414:Thread ID:11
1539159694414:Thread ID:12
1539159694414:Thread ID:13
1539159694414:Thread ID:14
1539159694414:Thread ID:15

扩展线程池

线程池可以扩展出线程执行前,执行后,终止的后续处理

public class ExtThreadPool {
    public static class MyTask implements Runnable {
        public String name;
        public MyTask(String name) {
            this.name = name;
        }
        public void run() {
            System.out.println("正在执行" + ":Thread ID" + Thread.currentThread().getId() + ",Task Name=" + name);
            try {
                Thread.sleep(100);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        ExecutorService es = new ThreadPoolExecutor(5,5,0L, TimeUnit.MILLISECONDS,
                new LinkedBlockingQueue<Runnable>()) {
            @Override
            protected void beforeExecute(Thread t, Runnable r) {
                System.out.println("准备执行:" + ((MyTask)r).name);
            }

            @Override
            protected void afterExecute(Runnable r, Throwable t) {
                System.out.println("执行完成:" + ((MyTask)r).name);
            }

            @Override
            protected void terminated() {
                System.out.println("线程池退出!");
            }
        };
        for (int i = 0;i < 5;i++) {
            MyTask task = new MyTask("TASK-GEYM-" + i);
            es.execute(task);
            Thread.sleep(10);
        }
        es.shutdown();
    }
}

运行结果:

准备执行:TASK-GEYM-0
正在执行:Thread ID11,Task Name=TASK-GEYM-0
准备执行:TASK-GEYM-1
正在执行:Thread ID12,Task Name=TASK-GEYM-1
准备执行:TASK-GEYM-2
正在执行:Thread ID13,Task Name=TASK-GEYM-2
准备执行:TASK-GEYM-3
正在执行:Thread ID14,Task Name=TASK-GEYM-3
准备执行:TASK-GEYM-4
正在执行:Thread ID15,Task Name=TASK-GEYM-4
执行完成:TASK-GEYM-0
执行完成:TASK-GEYM-1
执行完成:TASK-GEYM-2
执行完成:TASK-GEYM-3
执行完成:TASK-GEYM-4
线程池退出!

在线程池中寻找堆栈

有时候线程执行时会出现Bug,抛出异常,如果使用submit()来提交线程时,不会打印异常信息,而使用execute()来执行线程时可以打印异常信息.

public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
//        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
//                new SynchronousQueue<Runnable>());
        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.submit(new DivTask(100,i));
        }
    }
}

这段代码中,5个并发线程会有一个线程有除0错误

运行结果:

100.0
50.0
33.0
25.0

结果没有任何提示,异常抛出.

public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
//        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
//                new SynchronousQueue<Runnable>());
        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.execute(new DivTask(100,i));
        }
    }
}

运行结果:

100.0
Exception in thread "pool-1-thread-1" java.lang.ArithmeticException: / by zero
50.0
33.0
    at com.guanjian.DivTask.run(DivTask.java:18)
25.0
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

有异常抛出

重写跟踪线程池,自定义跟踪

public class TraceThreadPoolExecutor extends ThreadPoolExecutor {
    public TraceThreadPoolExecutor(int corePoolSize, int maximumPoolSize, long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
        super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
    }

    @Override
    public void execute(Runnable task) {
        super.execute(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    @Override
    public Future<?> submit(Runnable task) {
        return super.submit(wrap(task,clientTrace(),Thread.currentThread().getName()));
    }

    private Exception clientTrace() {
        return new Exception("Client stack trace");
    }

    private Runnable wrap(final Runnable task,final Exception clientStack,String clientThreadName) {
        return new Runnable() {
            public void run() {
                try {
                    task.run();
                } catch (Exception e) {
                    clientStack.printStackTrace();
                    try {
                        throw e;
                    } catch (Exception e1) {
                        e1.printStackTrace();
                    }
                }
            }
        };
    }
}
public class DivTask implements Runnable {
    int a,b;
    public DivTask(int a,int b) {
        this.a = a;
        this.b = b;
    }
    public void run() {
        double re = a / b;
        System.out.println(re);
    }

    public static void main(String[] args) {
        ExecutorService pools = new TraceThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.MILLISECONDS,
                new SynchronousQueue<Runnable>());
//        ExecutorService pools = new ThreadPoolExecutor(0,Integer.MAX_VALUE,0L,TimeUnit.SECONDS,
//                new SynchronousQueue<Runnable>());
        for (int i = 0;i < 5;i++) {
            pools.execute(new DivTask(100,i));
        }
    }
}

运行结果:

100.0
java.lang.Exception: Client stack trace
50.0
    at com.guanjian.TraceThreadPoolExecutor.clientTrace(TraceThreadPoolExecutor.java:27)
33.0
25.0
    at com.guanjian.TraceThreadPoolExecutor.execute(TraceThreadPoolExecutor.java:18)
    at com.guanjian.DivTask.main(DivTask.java:28)
java.lang.ArithmeticException: / by zero
    at com.guanjian.DivTask.run(DivTask.java:18)
    at com.guanjian.TraceThreadPoolExecutor$1.run(TraceThreadPoolExecutor.java:34)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
    at java.lang.Thread.run(Thread.java:745)

这样我们就可以知道是在哪里出了错.

4、分而治之,Fork/Join框架

将一个大任务拆分成各种较小规模的任务,进行并行处理,也许按照约定条件拆分的任务还是大于约定条件就继续拆分.有两种线程类型,一种是有返回值的RecursiveTask<T>,一种是没有返回值的RecursiveAction,他们都继承于ForkJoinTask<>,一个带泛型<T>,一个是<Void>.

/**
 * Created by Administrator on 2018/10/11.
 * 可以理解成一个Runnable(线程类)
 */
public class CountTask extends RecursiveTask<Long> {
    private static final int THRESHOLD = 10000;
    private long start;
    private long end;

    public CountTask(long start, long end) {
        this.start = start;
        this.end = end;
    }

    /**
     * 可以理解成run()方法
     * @return
     */
    @Override
    protected Long compute() {
        long sum = 0;
        boolean canCompute = (end - start) < THRESHOLD;
        //最终计算,所有的最终拆分都是在这里计算
        if (canCompute) {
            for (long i = start;i <= end;i++) {
                sum += i;
            }
        }else {
            //并行计算的规模,拆分成100个并行计算
            long step = (start + end) /100;
            //创建子任务线程集合
            List<CountTask> subTasks = new ArrayList<CountTask>();
            //每个并行子任务的开始值
            long pos = start;
            //并行执行100个分叉线程
            for (int i = 0;i < 100;i++) {
                //每个并行子任务的结束值
                long lastOne = pos + step;
                if (lastOne > end) {
                    lastOne = end;
                }
                //建立一个子任务的线程
                CountTask subTask = new CountTask(pos,lastOne);
                //创建下一个并行子任务的开始值
                pos += step + 1;
                //将当前子任务线程添加到线程集合
                subTasks.add(subTask);
                //执行该线程,其实是一个递归,判断lastOne-pos是否小于THRESHOLD,小于则真正执行,否则继续分叉100个子线程
                subTask.fork();
            }
            for (CountTask t:subTasks) {
                //阻断每一次分叉前的上一级线程进行等待,并将最终并行的结果进行层层累加
                sum += t.join();
            }
        }
        return sum;
    }

    public static void main(String[] args) {
        ForkJoinPool forkJoinPool = new ForkJoinPool();
        CountTask task = new CountTask(0,200000L);
        ForkJoinTask<Long> result = forkJoinPool.submit(task);
        try {
            long res = result.get();
            System.out.println("sum: " + res);
        }catch (InterruptedException e) {
            e.printStackTrace();
        }catch (ExecutionException e) {
            e.printStackTrace();
        }
    }
}

运行结果:

sum: 20000100000

© 著作权归作者所有

共有 人打赏支持
算法之名
粉丝 13
博文 107
码字总数 103286
作品 0
广州
私信 提问
Concurrent包学习(一)

java.util.concurrent包下面提供了很多多并发编程的工具和框架,locks 下面提供了锁相关的工具,例如ReentrantLock(可重入锁)、condition等在其他的类中经常有使用,提供了HashMap、Queue...

alvaDing
2016/09/24
27
0
演化理解 Android 异步加载图片

在学习"Android异步加载图像小结"这篇文章时, 发现有些地方没写清楚,我就根据我的理解,把这篇文章的代码重写整理了一遍,下面就是我的整理。 下面测试使用的layout文件: 简单来说就是 Li...

Koon.LY
2012/05/04
0
1
【Android框架进阶〖02〗】ThinkAndroid线程池机制

/** * 本博客为CSDN博主【MK】原创,博客地址:http://blog.csdn.net/mkrcpp/article/details/14166627 **/ TAApplication是ThinkAndroid的灵魂,整个框架的初始化工作都在这里进行了。 其中...

JayPark不作死
2014/04/09
0
0
线程互相组合,互相依赖,设计方法

整理于服务器管理系统部分设计思想. 做系统始终坚持一个原则,首先保证工期,保证系统正确运行,然后系统经过一段时间的运行,肯定会有一些新的改变,新的变化,同时系统还有一些忽略的bug都可以再...

乌鸦哥
2013/07/29
0
0
Java中高级面试必问之多线程TOP50(含答案)

以下为大家整理了今年一线大厂面试被问频率较高的多线程面试题,由于本人的见识局限性,所以可能不是很全面,也欢迎大家在后面留言补充,谢谢。 1、什么是线程? 2、什么是线程安全和线程不安...

老道士
08/28
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Qt那些事0.0.9

关于QThread,无F*k说的。文档说的差不多,更多的是看到很多人提到Qt开发者之一的“你TM的做错了(You're doing it wrong...)”,这位大哥2010年写的博客,下面评论很多,但主要还是集中在2...

Ev4n
12分钟前
0
0
constructor / destructor

_attribute__表示属性,是Clang提供的一种源码注释,方便开发者向编译器表达诉求,一般以__attribute__(*)的方式出现在代码中。为了方便使用,一些常用属性被定义成了宏,经常出现在系统头文...

HeroHY
12分钟前
0
0
大数据教程(7.6)shell脚本定时采集日志数据到hdfs

上一篇博客博主分享了hadoop内置rpc的使用案例,本节博主将为小伙伴们分享一个在实际生产中使用的日志搜集案例。前面的文章我们有讲到过用户点击流日志分析的流程,本节就是要完成这个分析流...

em_aaron
41分钟前
1
0
wave和pcm互转

wav->pcm pcm->wav c#代码: using System;using System.Collections.Generic;using System.ComponentModel;using System.Data;using System.Drawing;using System.IO;using Sys......

whoisliang
43分钟前
1
0
Win10:默认的图片打开应用,打开图片时速度明显很慢的解决办法

首先,我们随便地打开一张图片。然后,点击右上角的三个小点,最后点击弹出菜单最下面的“设置”。如下图: 在“设置”中找到下面的“人物”,把它关掉就好了。 原来,默认情况下,Win 10的图...

LivingInFHL
今天
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部