Java并发-线程池

原创
2021/02/12 12:20
阅读数 67

Java中线程池是运用场景最多的并发框架,几乎所有需要异步或者并发执行任务的程序都可以使用线程池。

合理使用线程池可以带来3个好处:

  • 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  • 提高响应速度:当任务到达时,任务可以不需要等到线程创建就能立即执行。
  • 提高线程的可管理性:使用线程池可以进行统一分配、调优和监控。

1 线程池的使用

1.1 线程池的创建

推荐使用ThreadPoolExecutor创建线程池。

/**
 * Creates a new {@code ThreadPoolExecutor} with the given initial
 * parameters.
 *
 * @param corePoolSize the number of threads to keep in the pool, even
 *        if they are idle, unless {@code allowCoreThreadTimeOut} is set
 * @param maximumPoolSize the maximum number of threads to allow in the
 *        pool
 * @param keepAliveTime when the number of threads is greater than
 *        the core, this is the maximum time that excess idle threads
 *        will wait for new tasks before terminating.
 * @param unit the time unit for the {@code keepAliveTime} argument
 * @param workQueue the queue to use for holding tasks before they are
 *        executed.  This queue will hold only the {@code Runnable}
 *        tasks submitted by the {@code execute} method.
 * @param threadFactory the factory to use when the executor
 *        creates a new thread
 * @param handler the handler to use when execution is blocked
 *        because the thread bounds and queue capacities are reached
 * @throws IllegalArgumentException if one of the following holds:<br>
 *         {@code corePoolSize < 0}<br>
 *         {@code keepAliveTime < 0}<br>
 *         {@code maximumPoolSize <= 0}<br>
 *         {@code maximumPoolSize < corePoolSize}
 * @throws NullPointerException if {@code workQueue}
 *         or {@code threadFactory} or {@code handler} is null
 */
public ThreadPoolExecutor(int corePoolSize,
                          int maximumPoolSize,
                          long keepAliveTime,
                          TimeUnit unit,
                          BlockingQueue<Runnable> workQueue,
                          ThreadFactory threadFactory,
                          RejectedExecutionHandler handler) {
  if (corePoolSize < 0 ||
      maximumPoolSize <= 0 ||
      maximumPoolSize < corePoolSize ||
      keepAliveTime < 0)
    throw new IllegalArgumentException();
  if (workQueue == null || threadFactory == null || handler == null)
    throw new NullPointerException();
  this.corePoolSize = corePoolSize;
  this.maximumPoolSize = maximumPoolSize;
  this.workQueue = workQueue;
  this.keepAliveTime = unit.toNanos(keepAliveTime);
  this.threadFactory = threadFactory;
  this.handler = handler;
}

线程池创建参数解释:

  1. corePoolSize(核心线程数):当线程池线程数量小于核心线程数时,即使有空闲线程也会创建线程,只有达到核心线程数时才不会创建。可以调用prestartCoreThreadprestartAllCoreThreads来预先创建一个或全部核心线程。核心线程默认不会被销毁,除非主动调用allowCoreThreadTimeOut(true)
  2. maximumPoolSize(最大线程数):线程池允许的最大线程数。核心线程数+非核心线程数=最大线程数。如果队列满了,并且已创建的线程数小于最大线程数,则会创建新线程执行新任务。如果使用了无界队列,则该参数基本无效。
  3. keepAliveTime,unit(存活时间):线程池的工作线程空闲后,保持的存活时间。默认仅对非核心线程有效,除非主动调用allowCoreThreadTimeOut(true)
  4. workQueue(工作队列):线程阻塞队列,只会存放由execute提交的Runnable任务。①ArrayBlockingQueue:基于数组的有界阻塞队列。②LinkedBlockingQueue:基于链表的阻塞队列,吞吐量通常高于ArrayBlockingQueue。③SynchronousQueue:不存储元素的阻塞队列,每个插入必须等待另一个线程调用移除操作。④PriorityBlockingQueue:具有优先级的无限阻塞队列。
  5. threadFactory(线程工厂):设置创建线程的工厂,创建线程池时要指定有意义的线程名称,方便出错时回溯。
  6. handler(拒绝策略):当队列和线程池都满,或者线程池不处于RUNNING状态时,会使用该策略,默认有4种实现:①AbortPolicy(默认):直接抛出RejectedExecutionException异常。②DiscardPolicy:静默丢弃当前提交的任务。③DiscardOldestPolicy:线程池未关闭的情况下,丢弃队列中最久未被执行的任务,并执行当前任务。④CallerRunsPolicy:线程池未关闭的情况下,使用调用者的线程去执行当前任务。

1.2 提交任务

线程池提交任务两个方法。

  • execute方法用于提交不需要返回值的任务,所以无法判断任务是否被线程池执行成功。
  • submit方法用于提交需要有返回值的任务。线程池会返回一个Future类型对象,该对象可以判断任务是否执行成功,并且可以通过Future.get()来获取返回值,get()会阻塞当前线程直到任务完成,get(long timeout, TimeUnit unit)会阻塞当前线程一段时间后立即返回。

1.3 线程池的关闭

可调用shutdown或者shutdownNow方法来关闭线程池。**原理:**遍历线程池中的线程,逐个调用线程的interrupt方法来中断线程,所以无法响应中断的任务可能永远无法终止。shutdownNow首先向线程池置为STOP状态,然后停止所有的正在执行或暂停任务的线程,并返回等待执行任务的列表;shutdown只是将线程池的状态设置成SHUTDOWN状态,然后中断所有没有正在执行任务的线程,之前提交的任务(正在执行的和队列中的)会被执行。

只要调用了shutdown或者shutdownNow任意方法,isShutdown都会返回true。当所有任务都已关闭后,才表示线程池关闭,isTerminated方法返回true。通常调用shutdown来关闭线程池,如果不一定要任务执行完,则可以调用shutdownNow方法。

1.4 线程池的配置

要想合理的配置线程池,先要对任务特性进行分析。

  • 任务性质:CPU密集、IO密集、混合型
  • 任务执行时间:长、中、短
  • 任务依赖性:是否依赖其他系统资源,如数据库连接、外部系统API调用
  • 任务优先级:高、中、低

性质不同的任务可以使用不同规模的线程池分开处理。CPU密集型任务应配置尽可能小的线程,如N或N+1。IO密集型的任务并不是一直在执行任务,则应配置尽可能多的线程。可以大概预估请求等待时间(WT)和服务时间(ST)之间的比例。线程池大小设置为N*(1+WT/ST)。

优先级不同的任务可以使用优先级队列PriorityBlockingQueue来处理,可以让高优先级的任务先执行。为了防止优先级低的任务可能永远都不能执行。可以将等待时间加入权重计算优先级。

建议使用有界队列,能增加系统的稳定性和预警能力。

1.5 线程池的监控

如果系统中存在大量使用线程池,则由必要对线程池进行监控,方便在出现问题时,可根据线程池使用情况进行快速定位。可根据线程池提供的参数进行监控,常用属性如下:

  • getPoolSize:线程池当前线程数
  • getCorePoolSize:线程池核心线程数
  • getActiveCount:正在执行任务的线程数量
  • getCompletedTaskCount:已完成任务的大致总数(任务和线程的状态可能在计算过程中有所变化)
  • getTaskCount:全部任务的大致总数
  • getQueue:当前线程池的任务队列
  • getLargestPoolSize:线程池曾经最大线程数量
  • getMaximumPoolSize:线程池允许最大线程数
  • getKeepAliveTime:线程池线程存活时间
  • isShutdown:线程池是否为关闭(SHUTDOWN状态)
  • isTerminated:线程池是否为TERMINATED状态

通过扩展线程池进行监控。可以通过继承的方式来自定义线程池,重写beforeExecute(Thread t, Runnable r)afterExecute(Runnable r, Throwable t)terminated(),可以在任务执行前、执行后和线程池关闭前执行一些代码进行监控。

class TPE extends ThreadPoolExecutor {
	// 记录Runnable任务起始执行时间
  private ConcurrentHashMap<Integer, Long> beginTimeMaps = new ConcurrentHashMap<>();
  
  @Override
  protected void beforeExecute(Thread t, Runnable r) {
    // 设置起始时间
    beginTimeMaps.put(r.hashCode(), new Date().getTime());
  }
  
  @Override
  protected void afterExecute(Runnable r, Throwable t) {
    // 获取起始时间  
    Long begin = beginTimeMaps.remove(r.hashCode());
    Long end = new Date().getTime();
    System.out.println(end-begin);
  }
  
  @Override
  protected void terminated() {
    // code
  }

  @Override
  public void shutdown() {
    // code
    super.shutdown();
  }

  @Override
  public List<Runnable> shutdownNow() {
    // code
    return super.shutdownNow();
  }
}

2 线程池原理

2.1 线程池状态及线程数

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));
private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;

// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

// Packing and unpacking ctl
// 运行状态
private static int runStateOf(int c)     { return c & ~CAPACITY; }
// 工作线程数
private static int workerCountOf(int c)  { return c & CAPACITY; }
// 运行状态和工作线程数
private static int ctlOf(int rs, int wc) { return rs | wc; }

线程池控制状态,ctl是一个原子integer包含两个字段:workerCount线程池内有效线程数量,runState线程池状态。高3位存储runState,低29位存储workerCount

线程池的状态如下:

  • RUNNING:接受新任务,并且能够处理队列任务
  • SHUTDOWN:不接收新任务,但能处理队列任务
  • STOP:不接收新任务,不处理队列任务
  • TIDYING:所有任务都已终止,workCount为0,线程进入该状态后会调用terminated()钩子函数
  • TERMINATED:terminated()函数已经调用完毕

2.2 关键方法

2.2.1 execute方法代码

在将来的某个时刻执行给定的任务,该任务可能被新线程执行也可能被线程池中已存在的线程执行。如果无法提交任务执行,因为执行器已经关闭或者达到最大容量,则该任务由当前的RejectedExecutionHandler处理。

public void execute(Runnable command) {
  if (command == null)
    throw new NullPointerException();
  /*
   * Proceed in 3 steps:
   *
   * 1. If fewer than corePoolSize threads are running, try to
   * start a new thread with the given command as its first
   * task.  The call to addWorker atomically checks runState and
   * workerCount, and so prevents false alarms that would add
   * threads when it shouldn't, by returning false.
   *
   * 2. If a task can be successfully queued, then we still need
   * to double-check whether we should have added a thread
   * (because existing ones died since last checking) or that
   * the pool shut down since entry into this method. So we
   * recheck state and if necessary roll back the enqueuing if
   * stopped, or start a new thread if there are none.
   *
   * 3. If we cannot queue task, then we try to add a new
   * thread.  If it fails, we know we are shut down or saturated
   * and so reject the task.
   */
  // 获取线程池的runState和workerCount
  int c = ctl.get();
  // 若当前线程数小于核心线程数;则新建一个Worker并执行当前任务
  if (workerCountOf(c) < corePoolSize) {
    // 新增线程(command, core) core表示新增的是否为核心线程
    if (addWorker(command, true))
      return;
    // 添加核心线程失败,重新获取线程池状态
    c = ctl.get();
  }
  // 若当前线程数不小于核心线程数并且处于运行状态,则将任务放到阻塞队列中
  if (isRunning(c) && workQueue.offer(command)) {
    // 添加队列成功后,再次获取线程池状态
    int recheck = ctl.get();
    // 线程池不处于运行状态,并且成功移除任务。则执行拒绝策略
    if (! isRunning(recheck) && remove(command))
      // 执行拒绝策略
      reject(command);
    else if (workerCountOf(recheck) == 0)
      // 如果工作线程为0(线程池无线程),则新建一个无任务的线程
      addWorker(null, false);
  }
  // 非运行状态或放队列失败时,直接拒绝策略
  else if (!addWorker(command, false))
    // 执行拒绝策略
    reject(command);
}
2.2.2 addWorker方法

检查是否可根据当前线程池状态以及给定的边界(核心线程或最大线程)创建新的worker。firstTask为新线程需先执行的任务,如果为null的话则不执行。core如果为true则使用corePoolSize作为边界界定条件,为false则使用maximumPoolSize作为边界界定条件。

private boolean addWorker(Runnable firstTask, boolean core) {
  retry:
  for (;;) {
    // 线程池状态及线程池运行状态
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // 当rs>=SHUTDOWN时 即SHUTDOWN(0)、STOP(1)、TIDYING(2)、TERMINATED(3),此状态不再接受新任务
    // 当rs=SHUTDOWN时,此时可创建线程条件如下:
    // 1. rs==SHUTDOWN,此时不再接受新任务,但可
    // 2. firstTask为空,不可再继续提交任务
    // 3. !workQueue.isEmpty(),为空的话则不再需要新线程
    if (rs >= SHUTDOWN &&
        ! (rs == SHUTDOWN &&
           firstTask == null &&
           ! workQueue.isEmpty()))
      return false;

    for (;;) {
      // 当前工作线程数
      int wc = workerCountOf(c);
      // 当线程数达到CAPACITY,
      // 或者core为true时达到核心线程数,
      // 或者core为false时达到最大线程数,
      // 不再创建新线程
      if (wc >= CAPACITY ||
          wc >= (core ? corePoolSize : maximumPoolSize))
        return false;
      // 使用CAS增加workerCount
      if (compareAndIncrementWorkerCount(c))
        // 成功修改workerCount,跳出最外层for循环
        break retry;
      
      // 重新获取线程池状态
      c = ctl.get();  // Re-read ctl
      // 线程池状态已被修改,继续外传for循环
      if (runStateOf(c) != rs)
        continue retry;
      // else CAS failed due to workerCount change; retry inner loop
    }
  }

  boolean workerStarted = false;
  boolean workerAdded = false;
  Worker w = null;
  try {
    // 根据firstTask创建新worker
    w = new Worker(firstTask);
    // 拿到当前worker的线程
    final Thread t = w.thread;
    if (t != null) {
      final ReentrantLock mainLock = this.mainLock;
      mainLock.lock();
      try {
        // Recheck while holding lock.
        // Back out on ThreadFactory failure or if
        // shut down before lock acquired.
        int rs = runStateOf(ctl.get());

        // rs<SHUTDOWN即为RUNNING,线程池处于运行状态
        // rs==SHUTDOWN时,因为可以继续执行队列中的任务,故允许添加无任务的worker
        if (rs < SHUTDOWN ||
            (rs == SHUTDOWN && firstTask == null)) {
          if (t.isAlive()) // precheck that t is startable
            throw new IllegalThreadStateException();
          workers.add(w);
          int s = workers.size();
          // 更新getLargestPoolSize,线程池中出现过最大线程数
          if (s > largestPoolSize)
            largestPoolSize = s;
          workerAdded = true;
        }
      } finally {
        mainLock.unlock();
      }
      if (workerAdded) {
        // worker添加成功,启动线程
        t.start();
        workerStarted = true;
      }
    }
  } finally {
    // worker启动失败,则roll back cleanly.
    if (! workerStarted)
      addWorkerFailed(w);
  }
  return workerStarted;
}

t.start()即为worker.thread.start()Worker类本身实现了Runnable接口,在Worker初始化时,会执行this.thread = getThreadFactory().newThread(this);。也就是t.start()-->worker.run()

2.2.3 Worker类

线程池中的线程都会被封装成Worker实例,Worker继承于AbstractQueuedSynchronizer(AQS),实现了Runnalbe接口。

private final class Worker
  extends AbstractQueuedSynchronizer
  implements Runnable
{
  /**
   * This class will never be serialized, but we provide a
   * serialVersionUID to suppress a javac warning.
   */
  private static final long serialVersionUID = 6138294804551838833L;

  /** Thread this worker is running in.  Null if factory fails. */
  // ThreadFactory.newThread创建的线程实例
  final Thread thread;
  /** Initial task to run.  Possibly null. */
  // 要运行的初始任务,可能为空
  Runnable firstTask;
  /** Per-thread task counter */
  // 每个线程都会记录的完成任务数
  volatile long completedTasks;

  /**
   * Creates with given first task and thread from ThreadFactory.
   * @param firstTask the first task (null if none)
   */
  Worker(Runnable firstTask) {
    // 禁止中断,直到runWorker方法执行
    setState(-1); // inhibit interrupts until runWorker
    this.firstTask = firstTask;
    // 通过ThreadFactory创建线程实例,Worker自身作为Runnable来创建线程实例
    this.thread = getThreadFactory().newThread(this);
  }

  /** Delegates main run loop to outer runWorker  */
  // t.start() 将会调用该方法
  public void run() {
    // 执行任务核心方法
    runWorker(this);
  }

  // Lock methods
  //
  // The value 0 represents the unlocked state.
  // The value 1 represents the locked state.

  protected boolean isHeldExclusively() {
    return getState() != 0;
  }

  protected boolean tryAcquire(int unused) {
    if (compareAndSetState(0, 1)) {
      setExclusiveOwnerThread(Thread.currentThread());
      return true;
    }
    return false;
  }

  protected boolean tryRelease(int unused) {
    setExclusiveOwnerThread(null);
    setState(0);
    return true;
  }

  public void lock()        { acquire(1); }
  public boolean tryLock()  { return tryAcquire(1); }
  public void unlock()      { release(1); }
  public boolean isLocked() { return isHeldExclusively(); }

  void interruptIfStarted() {
    Thread t;
    if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {
      try {
        t.interrupt();
      } catch (SecurityException ignore) {
      }
    }
  }
}
2.2.4 runWorker方法

主工作线程循环,不断的从队列获取任务并且执行他们。

final void runWorker(Worker w) {
  Thread wt = Thread.currentThread();
  Runnable task = w.firstTask;
  w.firstTask = null;
  // unlock 将state由-1置为0。
  w.unlock(); // allow interrupts
  boolean completedAbruptly = true;
  try {
    // 初始任务不为null,或者从队列中获取到了任务
    while (task != null || (task = getTask()) != null) {
      // worker加锁
      w.lock();
      // If pool is stopping, ensure thread is interrupted;
      // if not, ensure thread is not interrupted.  This
      // requires a recheck in second case to deal with
      // shutdownNow race while clearing interrupt
      // 若线程池正在停止,要保证当前线程是中断状态
      // 若不是的话,则要保证当前线程不是中断状态
      if ((runStateAtLeast(ctl.get(), STOP) ||
           (Thread.interrupted() &&
            runStateAtLeast(ctl.get(), STOP))) &&
          !wt.isInterrupted())
        wt.interrupt();
      try {
        // 钩子函数,任务执行前执行
        beforeExecute(wt, task);
        Throwable thrown = null;
        try {
          // 执行任务 =======>
          task.run();
        } catch (RuntimeException x) {
          thrown = x; throw x;
        } catch (Error x) {
          thrown = x; throw x;
        } catch (Throwable x) {
          thrown = x; throw new Error(x);
        } finally {
          // 钩子函数,任务执行后执行
          afterExecute(task, thrown);
        }
      } finally {
        // 任务执行完,清空task任务
        task = null;
        // 已完成任务+1
        w.completedTasks++;
        // worker解锁
        w.unlock();
      }
    }
    // 线程正常退出
    completedAbruptly = false;
  } finally {
    // 线程退出 completedAbruptly标识正常退出/非正常退出
    processWorkerExit(w, completedAbruptly);
  }
}

Thread.interrupted()判断线程是否中断,同时复位中断状态。

runWorker()方法的执行流程:

  1. while循环调用getTask()方法,从任务队列中获取任务(Worker新建时可能有firstTask)
  2. 若线程池正在停止,要保证当前线程是中断状态,否则要保证当前线程不是中断状态
  3. 调用task.run()执行任务
  4. 若task为null则退出循环,执行processWorkerExit
  5. runWorker执行完,即代表Worker中run方法执行完毕,销毁线程
2.2.5 getTask方法

根据当前配置,阻塞或者超时等待任务。发生以下情况时,会返回null。task为null时,则销毁线程。

  1. 当前线程池线程数超过最大线程数(maximumPoolSize)。调用setMaximumPoolSize方法。
  2. 线程池处于STOP状态。调用了shutdownNow方法。
  3. 线程池处于SHUTDOWN状态,并且workQueue队列为空。
  4. 当前worker获取任务等待超时(有可能是allowCoreThreadTimeOutworkerCount > corePoolSize)。
private Runnable getTask() {
  boolean timedOut = false; // Did the last poll() time out?

  for (;;) {
    // 获取当前线程池状态、运行状态
    int c = ctl.get();
    int rs = runStateOf(c);

    // Check if queue empty only if necessary.
    // rs>=STOP,线程池调用`shutdownNow`后,不再处理新任务
    // rs>=SHUTDOWN,workQueue消费完后,不再接受新任务
    if (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {
      decrementWorkerCount();
      return null;
    }

    // 当前工作线程数
    int wc = workerCountOf(c);

    // Are workers subject to culling?
    // 是否存在超时校验标识
    // `allowCoreThreadTimeOut`允许核心线程超时 或 工作线程数大于核心线程数
    boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

    // wc>maximumPoolSize,可能调用了`setMaximumPoolSize`,修改了最大线程数
    // timed&&timedOut,当前线程需要进行超时控制,并且上次发生了超时
    // 线程数大于1,或者阻塞队列为空
    if ((wc > maximumPoolSize || (timed && timedOut))
        && (wc > 1 || workQueue.isEmpty())) {
      // 尝试扣减工作线程
      if (compareAndDecrementWorkerCount(c))
        return null;
      continue;
    }

    try {
      // 是否需要进行超时判断 timed
      // 若为true,则需要进行超时判断,通过阻塞队列的poll方法来进行超时控制,超时则返回null
      // 若为false,则通过take获取,阻塞队列直到workQueue不为空
      Runnable r = timed ?
        workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
      workQueue.take();
      if (r != null)
        return r;
      // r为null,已经超时,设置标识位
      timedOut = true;
    } catch (InterruptedException retry) {
      // 发生了中断,未发生超时,设置标识位
      timedOut = false;
    }
  }
}

getTask方法返回null时,跳出循环,然后执行processWorkerExit方法进行退出。

2.2.6 processWorkerExit方法
private void processWorkerExit(Worker w, boolean completedAbruptly) {
  // completedAbruptly是否出现异常,将workerCount减1
  if (completedAbruptly) // If abrupt, then workerCount wasn't adjusted
    decrementWorkerCount();

  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 已完成任务数
    completedTaskCount += w.completedTasks;
    workers.remove(w);
  } finally {
    mainLock.unlock();
  }

  tryTerminate();

  int c = ctl.get();
  // 若线程处于RUNNING或SHUTDOWN状态时,若worker异常结束,则直接添加空任务worker
  if (runStateLessThan(c, STOP)) {
    if (!completedAbruptly) {
      // 允许核心线程超时时,如果阻塞队列不为空,则至少保留一个worker
      int min = allowCoreThreadTimeOut ? 0 : corePoolSize;
      if (min == 0 && ! workQueue.isEmpty())
        min = 1;
      // 若不允许核心线程池超时,则workerCount不少于corePoolSize
      if (workerCountOf(c) >= min)
        return; // replacement not needed
    }
    addWorker(null, false);
  }
}
2.2.7 shutdown方法
public void shutdown() {
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    // 安全策略校验
    checkShutdownAccess();
    // 切换状态为SHUTDOWN
    advanceRunState(SHUTDOWN);
    // 中断空闲线程
    interruptIdleWorkers();
    // 钩子
    onShutdown(); // hook for ScheduledThreadPoolExecutor
  } finally {
    mainLock.unlock();
  }
  // 尝试结束线程池
  tryTerminate();
}
2.2.8 shutdownNow方法
public List<Runnable> shutdownNow() {
  List<Runnable> tasks;
  final ReentrantLock mainLock = this.mainLock;
  mainLock.lock();
  try {
    checkShutdownAccess();
    advanceRunState(STOP);
    // 中断所有线程(空闲、非空闲)
    interruptWorkers();
    // 取出队列中所有未被执行的任务
    tasks = drainQueue();
  } finally {
    mainLock.unlock();
  }
  tryTerminate();
  return tasks;
}

3 几种常见线程池

3.1 Exectors线程池

Executors提供了几个静态方法来创建线程池。

3.1.1 newFixedThreadPool
public static ExecutorService newFixedThreadPool(int nThreads) {
  return new ThreadPoolExecutor(nThreads, nThreads,
                                0L, TimeUnit.MILLISECONDS,
                                new LinkedBlockingQueue<Runnable>());
}

核心线程数量和总线程数相同,都是传入的参数nThreads,所以只能创建核心线程。因为LinkedBlockingQueue的默认大小是Integer.MAX_VALUE,故核心线程空闲则由其处理,否则入队等待直到核心线程空闲。keepAliveTime设置为0L,多余的线程将会被立即停止。

适用于为满足资源管理需求,需要限制当前线程数量的应用场景,适用于负载比较高的服务器。

3.1.2 newSingleThreadExecutor
public static ExecutorService newSingleThreadExecutor() {
  return new FinalizableDelegatedExecutorService
    (new ThreadPoolExecutor(1, 1,
                            0L, TimeUnit.MILLISECONDS,
                            new LinkedBlockingQueue<Runnable>()));
}

核心线程数和最大线程数都为1,使用无界队列。任意时刻最多只有一个线程执行任务,多余任务会被缓冲至队列中。

适用于保证顺序的执行各任务;并且在任意时间点,不会有多线程是活动的应用场景。

3.1.3 newCachedThreadPool
public static ExecutorService newCachedThreadPool() {
  return new ThreadPoolExecutor(0, Integer.MAX_VALUE,
                                60L, TimeUnit.SECONDS,
                                new SynchronousQueue<Runnable>());
}

核心线程数为0,最大线程数为Integer.MAX_VALUEkeepAliveTime为60秒,空闲线程超时将会被终止。阻塞队列SynchronousQueue是一个没有容量的阻塞队列,插入数据时必须等待一个线程来获取数据、否则就会阻塞。没有了队列的缓冲,提交的任务会被尽快的分配线程执行。

适用于很多短期的异步任务的小程序,或者是负载较轻的服务器。

3.1.4 newScheduledThreadPool
public static ScheduledExecutorService newScheduledThreadPool(int corePoolSize) {
  return new ScheduledThreadPoolExecutor(corePoolSize);
}

public ScheduledThreadPoolExecutor(int corePoolSize) {
  super(corePoolSize, Integer.MAX_VALUE, 0, NANOSECONDS,
        new DelayedWorkQueue());
}

支持定时或周期性的任务执行。阻塞队列使用DelayedWorkQueue

适用于需要多个后台线程执行周期任务,同时为了满足资源管理的需求而需要限制后台线程数量的应用场景。

3.2 Exectors弊端

  • newFixedThreadPoolnewSingleThreadExecutor:阻塞队列使用LinkedBlockingQueue,其默认容量为Integer.MAX_VALUE,若任务处理较慢,则会引起消息堆积问题,消耗大量内存甚至触发OOM。
  • newCachedThreadPoolnewScheduledThreadPool:最大线程数为Integer.MAX_VALUE,可能会创建很多的线程,甚至导致OOM。

4 线程池注意事项

  • 自定义线程工厂ThreadFactory,指定有意义的线程名称,方便出错时回溯。

  • 使用Exectors时,避免出现任务堆积线程堆积情况。最好使用ThreadPoolExecutor显示的创建线程池。

  • 若线程池中使用到ThreadLocal,必须主动回收。

  • 最好有有效的监控、日志等记录信息。方便异常处理。

  • 线程池大小设置要根据任务类型进行设置,根据任务运行情况、系统负载、资源利用率进行调整。

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部