java 线程池使用与结构解析

原创
2021/02/27 11:44
阅读数 129
  1. 线程池什么情况下使用,解决什么问题
  2. 线程池的使用
  3. 线程池的结构解析
  • 线程池什么情况下使用,解决什么问题

  1. 线程的创建和销毁是要占用一定的资源的,创建线程会直接向系统申请,调用系统函数进行分配资源。操作系统给线程分配内存、列入调度,同时线程还要进行上下文的切换。
  2. 在Java中,线程的线程栈所占用的内存在Java堆外,不受Java程序控制,只受系统资源限制,默认一个线程的线程栈大小是1M(当让这个可以通过设置-Xss属性设置,但是要注意栈溢出问题)。如果每个请求都新建线程,1024个线程就会占用1个G内存,系统很容易崩溃。
  3. 处理器单元内多个线程执行的问题,减少处理器单元的闲置时间,增加处理器单元的吞吐能力。

线程池:  线程池做的工作主要是控制运行的线程的数量,处理过程中将任务加入队列,然后在线程创建后启动这些任务,如果线程超过了最大数量,超出的数量的线程排队等候,等其他线程执行完毕,再从队列中取出任务来执行。

合理利用线程池能够带来三个好处。

  1. 降低资源消耗。通过重复利用已创建的线程降低线程创建和销毁造成的消耗。
  2. 提高响应速度。当任务到达时,任务可以不需要的等到线程创建就能立即执行。
  3. 提高线程的可管理性。线程是稀缺资源,如果无限制的创建,不仅会消耗系统资源,还会降低系统的稳定性,使用线程池可以进行统一的分配,调优和监控。
  • 线程池的使用

package com.pimee.thread.pool;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

/**
 * 线程池使用
 */
public class ThreadPoolTest {
    public static void main(String[] args) {
//        ExecutorService executorService = Executors.newFixedThreadPool(10); //固定线程
//        ExecutorService executorService = Executors.newCachedThreadPool();// 无限制创建的线程池
//        ExecutorService executorService = Executors.newSingleThreadExecutor();// 单个线程线程池
//        ExecutorService executorService = Executors.newSingleThreadScheduledExecutor();// 单个线程定时调度线程池
//        ExecutorService executorService = Executors.newScheduledThreadPool(1); // 指定数量定时调度线程池
        ExecutorService executorService = Executors.newWorkStealingPool();// 工作窃取的线程池
        executorService.execute(new Runnable() {
            @Override
            public void run() {
                System.out.println("Task is created!!");
            }
        });
        executorService.shutdown();
    }
}

  • 线程池的结构解析

// 创建线程池构造函数
public ThreadPoolExecutor(int corePoolSize,
                              int maximumPoolSize,
                              long keepAliveTime,
                              TimeUnit unit,
                              BlockingQueue<Runnable> workQueue,
                              ThreadFactory threadFactory,
                              RejectedExecutionHandler handler) {
	if (corePoolSize < 0 ||
		maximumPoolSize <= 0 ||
		maximumPoolSize < corePoolSize ||
		keepAliveTime < 0)
		throw new IllegalArgumentException();
	if (workQueue == null || threadFactory == null || handler == null)
		throw new NullPointerException();
	this.acc = System.getSecurityManager() == null ?
			null :
			AccessController.getContext();
	this.corePoolSize = corePoolSize;
	this.maximumPoolSize = maximumPoolSize;
	this.workQueue = workQueue;
	this.keepAliveTime = unit.toNanos(keepAliveTime);
	this.threadFactory = threadFactory;
	this.handler = handler;
}

参数解析:

  1. corePoolSize:线程池的基本大小,线程池初始化创建的线程个数。

  2. maximumPoolSize:线程池创建线程最大个数。 如果队列满了,并且已创建的线程数小于最大线程数,则线程池会再创建新的线程执行任务。值得注意的是如果使用了无界的任务队列这个参数就没什么效果。

  3. keepAliveTime:空闲线程存活时间。 线程没有任务执行时,继续存活的时间。默认情况下,该参数只在线程数大于corePoolSize时才有用

  4. TimeUnit:空间线程存活时间单位,对应keepAliveTime的时间单位

  5. BlockingQueue<Runnable> workQueue:工作队列,是一种阻塞队列。

几个阻塞队列 :

1. ArrayBlockingQueue:是一个基于数组结构的有界阻塞队列,此队列按 FIFO(先进先出)原则对元素进行排序。
2. LinkedBlockingQueue:一个基于链表结构的阻塞队列,此队列按FIFO (先进先出) 排序元素,吞吐量通常要高于
3. ArrayBlockingQueue。静态工厂方法Executors.newFixedThreadPool()使用了这个队列。
4. SynchronousQueue:一个不存储元素的阻塞队列。每个插入操作必须等到另一个线程调用移除操作,
否则插入操作一直处于阻塞状态,吞吐量通常要高于LinkedBlockingQueue,
静态工厂方法Executors.newCachedThreadPool使用了这个队列。
PriorityBlockingQueue:一个具有优先级得无限阻塞队列。

  1. ThreadFactory:用于设置创建线程的工厂,可以通过线程工厂给每个创建出来的线程设置更有意义的名字,Debug和定位问题时非常又帮助。

  2. RejectedExecutionHandler(饱和策略):当队列和线程池都满了,说明线程池处于饱和状态,那么必须采取一种策略处理提交的新任务。

1.  AbortPolicy:直接抛出异常。
	
2.  CallerRunsPolicy:只用调用者所在线程来运行任务。
	
3.  DiscardOldestPolicy:丢弃队列里最老的一个任务,并加入当前任务。
	
4.  DiscardPolicy:不处理,丢弃掉。
	

执行任务的方法:

/**
 * Executes the given task sometime in the future.  The task
 * may execute in a new thread or in an existing pooled thread.
 *
 * If the task cannot be submitted for execution, either because this
 * executor has been shutdown or because its capacity has been reached,
 * the task is handled by the current {@code RejectedExecutionHandler}.
 *
 * @param command the task to execute
 * @throws RejectedExecutionException at discretion of
 *         {@code RejectedExecutionHandler}, if the task
 *         cannot be accepted for execution
 * @throws NullPointerException if {@code command} is null
 */
public void execute(Runnable command) {
	if (command == null)
		throw new NullPointerException();
	/*
	 * Proceed in 3 steps:
	 *
	 * 1. If fewer than corePoolSize threads are running, try to
	 * start a new thread with the given command as its first
	 * task.  The call to addWorker atomically checks runState and
	 * workerCount, and so prevents false alarms that would add
	 * threads when it shouldn't, by returning false.
	 * 1.首先线程池判断基本线程池是否已满?没满,创建一个工作线程来执行任务
	 *
	 * 2. If a task can be successfully queued, then we still need
	 * to double-check whether we should have added a thread
	 * (because existing ones died since last checking) or that
	 * the pool shut down since entry into this method. So we
	 * recheck state and if necessary roll back the enqueuing if
	 * stopped, or start a new thread if there are none.
	 * 2. 其次线程池判断工作队列是否已满?没满,则将新提交的任务存储在工作队列里
	 *
	 * 3. If we cannot queue task, then we try to add a new
	 * thread.  If it fails, we know we are shut down or saturated
	 * and so reject the task.
	 * 3. 最后线程池判断整个线程池是否已满?没满,则创建一个新的工作线程来执行任务,
     * 满了,则交给饱和策略来处理这个任务
	 */
	int c = ctl.get();
	if (workerCountOf(c) < corePoolSize) {
		if (addWorker(command, true))
			return;
		c = ctl.get();
	}
	if (isRunning(c) && workQueue.offer(command)) {
		int recheck = ctl.get();
		if (! isRunning(recheck) && remove(command))
			reject(command);
		else if (workerCountOf(recheck) == 0)
			addWorker(null, false);
	}
	else if (!addWorker(command, false))
		reject(command);
}

参考资料:

https://zhuanlan.zhihu.com/p/166345405

https://ifeve.com/java-threadpool/

展开阅读全文
加载中

作者的其它热门文章

打赏
0
0 收藏
分享
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部