Java中线程池如何实现复用

原创
2018/05/08 17:01
阅读数 7.5K

对于从事Java语言开发者对于线程池大家应该都不会陌生,Executors里面的各种线程池也是顺手拈来。但突然某一次,某人问了句“复用”如何实现的。想了想线程执行完就释放了,如何复用不甚了解。

Java线程池优点:

    降低资源消耗。java中所有的池化技术都有一个好处,就是通过复用池中的对象,降低系统资源消耗。设想一下如果我们有n多个子任务需要执行,如果我们为每个子任务都创建一个执行线程,而创建线程的过程是需要一定的系统消耗的,最后肯定会拖慢整个系统的处理速度。而通过线程池我们可以做到复用线程,任务有多个,但执行任务的线程可以通过线程池来复用,这样减少了创建线程的开销,系统资源利用率得到了提升。

    降低管理线程的难度。多线程环境下对线程的管理是最容易出现问题的,而线程池通过框架为我们降低了管理线程的难度。我们不用再去担心何时该销毁线程,如何最大限度的避免多线程的资源竞争。这些事情线程池都帮我们代劳了。

    提升任务处理速度。线程池中长期驻留了一定数量的活线程,当任务需要执行时,我们不必先去创建线程,线程池会自己选择利用现有的活线程来处理任务。

 

资源复用分析:

    当我们往线程池添加任务的时候使用ThreadPollExcutor对象的execute(Runnable command)方法来完成的。那我们就来看一下这个逻辑部分的代码。

代码逻辑如下:

    public void execute(Runnable command) {
        int c = ctl.get();
        if (workerCountOf(c) < corePoolSize) {
            //当前工作线程小于corePoolSize,新建work线程并返回
            if (addWorker(command, true))
                return;
            c = ctl.get();
        }
        //如果大于等于corePoolSize,添加任务到队列。并进行二次确认(确认队列是否关闭,进行回滚)
        if (isRunning(c) && workQueue.offer(command)) {
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))
                reject(command);
            else if (workerCountOf(recheck) == 0)
                addWorker(null, false);
        }
        //添加队列失败后,则尝试新建非core线程,失败则拒绝任务。
        else if (!addWorker(command, false))
            reject(command);
    }

关于逻辑的一二三不再文字赘述,感兴趣可以看下文档英文注释。       

其中我们可以看到核心逻辑是执行addWorker,下面我们来分析下 ThreadPoolExecutor的这个方法。

    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                //判断是否有能力继续处理,没有直接返回false。
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                //worker数目+1,开始跳出双层循环,执行真正的Worker添加
                if (compareAndIncrementWorkerCount(c))
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            w = new Worker(firstTask);
            final Thread t = w.thread;
            if (t != null) {
                workers.add(w);
                workerAdded = true;
                t.start();

            }
        } finally {
            if (! workerStarted)
                addWorkerFailed(w);
        }
        return workerStarted;
    }

上面代码逻辑到新建Worker对象,其实可以猜测到就是新建线程,并启动。可以到Worker类看一下,是一个内部私有类,实现了Runnable接口。在run方面里面只有一句runWorker。

下面简化下runWorker方法:

final void runWorker(Worker w) {
    Runnable task = w.firstTask;
    w.firstTask = null;
    while (task != null || (task = getTask()) != null) {
            try {
                task.run();
            } finally {
                task = null;
            }
        }
}

参数w是新建的自身。task是新建worker的时候的Runnable任务。见到看到这,觉得task.run()之后线程就没了。没有发现说线程池到最后的重用啊。

其实while逻辑有个task == getTask()。笔者也是看了别人分析,理解了这点。其实就是条件成立了的话,对于一个Thread的来说,就不只执行一个任务了。就也就实现了复用。再往下看看getTask方法就明了了,

主要两点:

1,判断队列是否为空,返回NULL。如果当前活动线程数大于最大线程数或者等待超时,则进行Worker数目减少,也就是大于核心线程的就这样被销毁掉了。

2,如果当前活动线程数小于等于核心线程数,同样也是去缓存队列中取任务,但当缓存队列中没任务了,就会进入阻塞状态,直到能取出任务为止,因此这个线程是处于阻塞状态的,并不会因为缓存队列中没有任务了而被销毁。这样就保证了线程池有N个线程是活的,可以随时处理任务,从而达到重复利用的目的。
 

小结

关于有些线程池的队列状态和有些判断条件不是很明白,参考他人的解释。只是读完了官方的实现方案,理解的如果自己来实现线程池,如何来完成的疑惑。

简单说是下面两点,水平有限,错误之处欢迎指出:

1,实现对Thead数目的管理,不是有任务就去创建线程,而是根据任务的数量和线程的数量来判断要做什么。

2,对于多于corePoolSize的任务,放到队列等待。

展开阅读全文
打赏
0
1 收藏
分享
加载中
更多评论
打赏
0 评论
1 收藏
0
分享
返回顶部
顶部