文档章节

JDK7 ThreadPoolExecutor execute(Runnable command) 方法解析

w
 wannshan
发布于 2017/03/29 17:04
字数 1653
阅读 41
收藏 1
/**
     * 通过这个方法提交的线程,将在新的线程,或者已有的(线程池)线程中执行
     * 
     *
     * If the task cannot be submitted for execution, either because this
     * executor has been shutdown or because its capacity has been reached,
     * the task is handled by the current {@code RejectedExecutionHandler}.
     *
     * @param command the task to execute
     * @throws RejectedExecutionException at discretion of
     *         {@code RejectedExecutionHandler}, if the task
     *         cannot be accepted for execution
     * @throws NullPointerException if {@code command} is null
     */
    public void execute(Runnable command) {
        if (command == null)
            throw new NullPointerException();
        /*
         * Proceed in 3 steps:
         *
         * 1. If fewer than corePoolSize threads are running, try to
         * start a new thread with the given command as its first
         * task.  The call to addWorker atomically checks runState and
         * workerCount, and so prevents false alarms that would add
         * threads when it shouldn't, by returning false.
         *
         * 2. If a task can be successfully queued, then we still need
         * to double-check whether we should have added a thread
         * (because existing ones died since last checking) or that
         * the pool shut down since entry into this method. So we
         * recheck state and if necessary roll back the enqueuing if
         * stopped, or start a new thread if there are none.
         *
         * 3. If we cannot queue task, then we try to add a new
         * thread.  If it fails, we know we are shut down or saturated
         * and so reject the task.
         */
        int c = ctl.get(); //c是线程池运行状态值和线程数目|的值,通过c既可以取得线程数目也可以取得线程池运行状态。
        if (workerCountOf(c) < corePoolSize) {//如果当前线程数量小于corePoolSize
            if (addWorker(command, true)) //调用addWorker 第二个参数传true,以corePoolSize作为线程池边界,创建一个新线程
                return;
            c = ctl.get();
        }
	//否则,加入等待队列
        if (isRunning(c) && workQueue.offer(command)) {//如果把任务放入处理队列成功
            int recheck = ctl.get();
            if (! isRunning(recheck) && remove(command))//线程池不在运行,从队列中删除任务,执行拒绝策略
                reject(command);
            else if (workerCountOf(recheck) == 0)//若果线程池为空,创建一个空任务线程,去执行等待队列中的任务
                addWorker(null, false);
        }
	//如果添加失败,第二个参数传false,以maximumPoolSize作为线程池边界添加线程
        else if (!addWorker(command, false))//,如果添加失败,执行拒绝策略
            reject(command);
    }

//具体看下addWorker
     /*
     * Methods for creating, running and cleaning up after workers
     */

    /**
     * Checks if a new worker can be added with respect to current
     * pool state and the given bound (either core or maximum). If so,
     * the worker count is adjusted accordingly, and, if possible, a
     * new worker is created and started, running firstTask as its
     * first task. This method returns false if the pool is stopped or
     * eligible to shut down. It also returns false if the thread
     * factory fails to create a thread when asked.  If the thread
     * creation fails, either due to the thread factory returning
     * null, or due to an exception (typically OutOfMemoryError in
     * Thread#start), we roll back cleanly.
     *
     * @param firstTask the task the new thread should run first (or
     * null if none). Workers are created with an initial first task
     * (in method execute()) to bypass queuing when there are fewer
     * than corePoolSize threads (in which case we always start one),
     * or when the queue is full (in which case we must bypass queue).
     * Initially idle threads are usually created via
     * prestartCoreThread or to replace other dying workers.
     *
     * @param core if true use corePoolSize as bound, else
     * maximumPoolSize. (A boolean indicator is used here rather than a
     * value to ensure reads of fresh values after checking other pool
     * state).
     * @return true if successful
     */
    private boolean addWorker(Runnable firstTask, boolean core) {
        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN &&
                ! (rs == SHUTDOWN &&
                   firstTask == null &&
                   ! workQueue.isEmpty()))
                return false;

            for (;;) {
                int wc = workerCountOf(c);
                if (wc >= CAPACITY ||
                    wc >= (core ? corePoolSize : maximumPoolSize))
                    return false;
                if (compareAndIncrementWorkerCount(c))//可以添加,先把线程数目加1
                    break retry;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }
        }

        boolean workerStarted = false;
        boolean workerAdded = false;
        Worker w = null;
        try {
            final ReentrantLock mainLock = this.mainLock;
            w = new Worker(firstTask);//给任务创建新线程
            final Thread t = w.thread;
            if (t != null) {
                mainLock.lock();
                try {
                    // Recheck while holding lock.
                    // Back out on ThreadFactory failure or if
                    // shut down before lock acquired.
                    int c = ctl.get();
                    int rs = runStateOf(c);

                    if (rs < SHUTDOWN ||
                        (rs == SHUTDOWN && firstTask == null)) {
                        if (t.isAlive()) // precheck that t is startable
                            throw new IllegalThreadStateException();
                        workers.add(w);//把新线程加入线程池
                        int s = workers.size();
                        if (s > largestPoolSize)
                            largestPoolSize = s;
                        workerAdded = true;
                    }
                } finally {
                    mainLock.unlock();
                }
                if (workerAdded) {
                    t.start();//开始用新线程执行任务。
                    workerStarted = true;
                }
            }
        } finally {
            if (! workerStarted)//如果添加执行失败,把线程从线程池删除。
                addWorkerFailed(w);
        }
        return workerStarted;
    }

    //接着看下Worker类, 它是一个私有内部类
        private final class Worker
        extends AbstractQueuedSynchronizer
        implements Runnable
    {
        /**
         * This class will never be serialized, but we provide a
         * serialVersionUID to suppress a javac warning.
         */
        private static final long serialVersionUID = 6138294804551838833L;

        /** Thread this worker is running in.  Null if factory fails. */
        final Thread thread;
        /** Initial task to run.  Possibly null. */
        Runnable firstTask;
        /** Per-thread task counter */
        volatile long completedTasks;

        /**
         * Creates with given first task and thread from ThreadFactory.
         * @param firstTask the first task (null if none)
         */
        Worker(Runnable firstTask) {
	///两个私有变量 具体执行任务的线程thread,由线程工厂创建和要执行的任务firstTask
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }

        /** Delegates main run loop to outer runWorker  */
        public void run() {//开始执行任务,重点看下面执行方法,它是父类的runWorker方法
            runWorker(this);
        }

        // Lock methods
        //
        // The value 0 represents the unlocked state.
        // The value 1 represents the locked state.

        protected boolean isHeldExclusively() {
            return getState() != 0;
        }

        protected boolean tryAcquire(int unused) {
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        protected boolean tryRelease(int unused) {
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        public void lock()        { acquire(1); }
        public boolean tryLock()  { return tryAcquire(1); }
        public void unlock()      { release(1); }
        public boolean isLocked() { return isHeldExclusively(); }

        void interruptIfStarted() {
            Thread t;
            if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
                try {
                    t.interrupt();
                } catch (SecurityException ignore) {
                }
            }
        }
    }

     ///worker 执行方法调用
      final void runWorker(Worker w) {
        Thread wt = Thread.currentThread();
        Runnable task = w.firstTask;
        w.firstTask = null;
        w.unlock(); // allow interrupts
        boolean completedAbruptly = true;
        try {
            while (task != null || (task = getTask()) != null) {//如果创建线程时,没指定具体任务,就调用getTask()从等待队列中取任务。具体看下面getTask()方法。
                w.lock();
                // If pool is stopping, ensure thread is interrupted;
                // if not, ensure thread is not interrupted.  This
                // requires a recheck in second case to deal with
                // shutdownNow race while clearing interrupt
                if ((runStateAtLeast(ctl.get(), STOP) ||
                     (Thread.interrupted() &&
                      runStateAtLeast(ctl.get(), STOP))) &&
                    !wt.isInterrupted())
                    wt.interrupt();
                try {
                    beforeExecute(wt, task);//任务执行前调用,子类重写可做监控
                    Throwable thrown = null;
                    try {
                        task.run();//具体任务的方法 
                    } catch (RuntimeException x) {
                        thrown = x; throw x;
                    } catch (Error x) {
                        thrown = x; throw x;
                    } catch (Throwable x) {
                        thrown = x; throw new Error(x);
                    } finally {
                        afterExecute(task, thrown);//任务执行后调用,子类重写可做监控
                    }
                } finally {
                    task = null;
                    w.completedTasks++;
                    w.unlock();
                }
            }
            completedAbruptly = false;
        } finally {
            processWorkerExit(w, completedAbruptly);//清除执行完任务的线程,补充一个新的空线程(如果需要)
        }
    }

      /**
     * Performs blocking or timed wait for a task, depending on
     * current configuration settings, or returns null if this worker
     * must exit because of any of:
     * 1. There are more than maximumPoolSize workers (due to
     *    a call to setMaximumPoolSize).
     * 2. The pool is stopped.
     * 3. The pool is shutdown and the queue is empty.
     * 4. This worker timed out waiting for a task, and timed-out
     *    workers are subject to termination (that is,
     *    {@code allowCoreThreadTimeOut || workerCount > corePoolSize})
     *    both before and after the timed wait.
     *
     * @return task, or null if the worker must exit, in which case
     *         workerCount is decremented
     */
    private Runnable getTask() {
        boolean timedOut = false; // Did the last poll() time out?

        retry:
        for (;;) {
            int c = ctl.get();
            int rs = runStateOf(c);

            // Check if queue empty only if necessary.
            if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
                decrementWorkerCount();
                return null;
            }

            boolean timed;      // Are workers subject to culling?

            for (;;) {
                int wc = workerCountOf(c);
                timed = allowCoreThreadTimeOut || wc > corePoolSize;//如果线程池有存活超时设置,或者,线程数目超过了corePoolSize,就设置true

                if (wc <= maximumPoolSize && ! (timedOut && timed))
                    break;
                if (compareAndDecrementWorkerCount(c))
                    return null;
                c = ctl.get();  // Re-read ctl
                if (runStateOf(c) != rs)
                    continue retry;
                // else CAS failed due to workerCount change; retry inner loop
            }

            try {
	    //从队列尾端取一个任务。是超时等待,还是阻塞等待,前面定了策略。
                Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();
                if (r != null)
                    return r;
                timedOut = true;
            } catch (InterruptedException retry) {
                timedOut = false;
            }
        }
    }

 

© 著作权归作者所有

w
粉丝 21
博文 42
码字总数 79122
作品 0
浦东
技术主管
私信 提问
ThreadPoolExecutor JAVA1.8源码解析

1. 构造函数 corePoolSize:池里维持的最小线程数,即使它们是空闲线程,也不会进行销毁 maximumPoolSize:最大线程数 keepAliveTime:当池里的线程数量超过了corePoolSize时,如果额外线程在...

pomer_huang的博客
2017/12/22
0
0
ThreadPoolExecutor 创建Worker Thread的策略

ThreadPoolExecutor创建新线程的条件: coreThreadCount < 已创建线程数 < maxThreadCount, 或者 已创建线程数 < maxThreadCount 且 阻塞队列已满 因此如果 使用无界阻塞队列,那么线程数将只会...

风荷举
2016/04/23
85
0
ThreadPoolExecutor源码分析及阻塞提交任务方法

ThreadPoolExecutor源码 ThreadPoolExecutor 基本使用参考:ThreadPoolExecutor执行过程分析 线程池状态标志 ctl 保存了线程池的运行状态(runState)和线程池内有效线程数量(workerCount)...

4rnold
2018/06/28
0
0
聊聊Elasticsearch的EsThreadPoolExecutor

序 本文主要研究一下Elasticsearch的EsThreadPoolExecutor EsThreadPoolExecutor elasticsearch-7.0.1/server/src/main/java/org/elasticsearch/common/util/concurrent/EsThreadPoolExecuto......

go4it
06/02
0
0
ThreadPoolExecutor源码详解

我之前一篇文章谈到了ThreadPoolExecutor的作用(http://my.oschina.net/xionghui/blog/494004),这篇文章介绍下它的原理,并根据原理分析下它的实现源码。 我们先来查看一下ThreadPoolExe...

xionghuiCoder
2015/08/19
0
3

没有更多内容

加载失败,请刷新页面

加载更多

蚂蚁区块链BaaS:开放在云端,落地于实体

自 2018 年 6 月上线以来,蚂蚁区块链 BaaS 不断在技术上实现突破,形成自身独特的优势,并做为行业领军者在不同领域的几十个场景实现落地。在区块链的商用时代加速到来之际,通过开放自身的...

阿里云官方博客
31分钟前
3
0
Idea cannot access【好使】

Idea中无法访问类中public方法。。 解决: idea点击左下角电脑图标,打开右边栏各种功能键 右边栏Maven Projects中点一下刷新,就是那个两个蓝的箭头组成的环 ----------------------------...

Airship
32分钟前
3
0
Spark内置图像数据源初探

作者:林武康,花名知瑕, 阿里巴巴计算平台事业部EMR团队的高级开发工程师,Apache HUE Contributor, 参与了多个开源项目的研发工作,对于分布式系统设计应用有较丰富的经验,目前主要专注于...

阿里云云栖社区
33分钟前
1
0
【面试被虐】游戏中的敏感词过滤是如何实现的?

小秋今天去面试了,面试官问了一个与敏感词过滤算法相关的问题,然而小秋对敏感词过滤算法一点也没听说过。于是,有了以下事情的发生….. 面试官开怼 面试官:玩过王者荣耀吧?了解过敏感词过...

爱编程的浪子
35分钟前
4
0
springboot之maven属性引入

<plugin><groupId>org.apache.maven.plugins</groupId><artifactId>maven-resources-plugin</artifactId><version>2.7</version><configuration><delimiters><delimite......

Online_Reus
36分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部