ThreadPoolExecutor线程池

原创
2020/08/05 10:52
阅读数 81

阿里开发手册,要求使用ThreadPoolExecutor来创建线程池,不允许使用Executors.newSingleThreadExecutor等创建线程池,因为任务队列最大值是Integer.MAX_VALUE,任务数量可能超过最大值,会OOM,拒绝策略一般自定义,把来不及处理的任务放到MQ等中间件,缓冲一下;

有两种线程池,

ThreadPoolExecutors:所有线程维护一个任务队列

ForkJoinPool:每个线程维护一个任务队列

Executors线程池工厂,用于产生线程池的,线程池的不同根据任务队列区分,线程池有:

public static ExecutorService newSingleThreadExecutor() {//核心线程数与最大线程数都是1
        return new FinalizableDelegatedExecutorService
            (new ThreadPoolExecutor(1, 1,
                                    0L, TimeUnit.MILLISECONDS,
                                    new LinkedBlockingQueue<Runnable>()));//链表,任务队列上线是Integer.MAX_VALUE

public static ExecutorService newCachedThreadPool() {
        return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                      60L, TimeUnit.SECONDS,
                                      new SynchronousQueue<Runnable>());//相当于手递手的任务队列,里面不放任务,有任务了创建线程获取任务,如果没有任务则阻塞

  public static ExecutorService newFixedThreadPool(int nThreads) {//设置了核心线程数与最大线程数
        return new ThreadPoolExecutor(nThreads, nThreads,
                                      0L, TimeUnit.MILLISECONDS,
                                      new LinkedBlockingQueue<Runnable>());//链表,任务队列上线是Integer.MAX_VALUE
    }

public ScheduledThreadPoolExecutor(int corePoolSize) {
        super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
              new DelayedWorkQueue());//该任务队列有时间,执行定时任务
    }

public static ExecutorService newWorkStealingPool() {
        return new ForkJoinPool
            (Runtime.getRuntime().availableProcessors(),
             ForkJoinPool.defaultForkJoinWorkerThreadFactory,
             null, true);
    }

public class HelloThreadPool {

    static class Task implements Runnable {
        private int i;

        public Task(int i) {
            this.i = i;
        }

        @Override
        public void run() {
            System.out.println(Thread.currentThread().getName() + " Task " + i);
            try {
                System.in.read();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }

        @Override
        public String toString() {
            return "Task{" +
                    "i=" + i +
                    '}';
        }
    }

    public static void main(String[] args) {


       //当有多个任务执行时,线程数已经是最大值,任务队列满了,就会执行拒绝策略,jdk提供了4中拒绝策略
        ThreadPoolExecutor tpe= new ThreadPoolExecutor(
                2,//核心线程数
                4,//最大线程数
                10,//存活时间
                TimeUnit.SECONDS,//存活时间单位
                new LinkedBlockingQueue<Runnable>(),//任务队列,可以实例化ArrayBlockingQueue
                Executors.defaultThreadFactory(),//线程工厂
                new ThreadPoolExecutor.DiscardPolicy());//拒接策略

        for (int i = 0; i < 8; i++) {
            tpe.execute(new Task(i));
        }

        System.out.println(tpe.getQueue());

        tpe.execute(new Task(100));

        System.out.println(tpe.getQueue());

        tpe.shutdown();
    }
}

public class ForkJoinPool {
    static int[] nums = new int[1000000];
    static final int MAX_NUM = 50000;
    static Random r = new Random();
    
    static {
        for(int i=0; i<nums.length; i++) {
            nums[i] = r.nextInt(100);
        }
        
        System.out.println("---" + Arrays.stream(nums).sum()); //stream api
    }
    
    
    static class AddTaskRet extends RecursiveTask<Long> {
        
        private static final long serialVersionUID = 1L;
        int start, end;
        
        AddTaskRet(int s, int e) {
            start = s;
            end = e;
        }

        @Override
        protected Long compute() {
            
            if(end-start <= MAX_NUM) {
                long sum = 0L;
                for(int i=start; i<end; i++) sum += nums[i];
                return sum;
            } 
            
            int middle = start + (end-start)/2;
            
            AddTaskRet subTask1 = new AddTaskRet(start, middle);
            AddTaskRet subTask2 = new AddTaskRet(middle, end);
            subTask1.fork();
            subTask2.fork();
            
            return subTask1.join() + subTask2.join();
        }
        
    }
    
    public static void main(String[] args) throws IOException {


        T12_ForkJoinPool temp = new T12_ForkJoinPool();

        ForkJoinPool fjp = new ForkJoinPool();
        AddTaskRet task = new AddTaskRet(0, nums.length);
        fjp.execute(task);
        long result = task.join();
        System.out.println(result);

    }
}
 

展开阅读全文
jdk
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部