前言:
线程池,在我的理解当中,其实是典型的消费者生产者模型
我们实现简单的线程池,其实不难,java的并发库中,有我们可以直接拿来用的阻塞队列使用,用来存储任务以及消费者,而不需要我们做额外的同步跟阻塞操作,而消费者会通过自旋的形式,不断的从任务阻塞队列获取任务,如果没有获取到任务,则阻塞,直到有任务来进行消费,下面是代码
-----------------------------------我是分割线-----------------------------------
首先,我们定义一个接口,定义下,线程池的一些简单操作,下面是代码
public interface Pool {
/**
* 创建人:贺小五
* 创建时间:2017-08-19 00:21:53
* 描述:
* 添加任务
*/
void execute(Runnable runnable);
/**
* 创建人:贺小五
* 创建时间:2017-08-19 00:22:03
* 描述:
* 停止线程池(让线程执行完任务在停止)
*/
void shutDown();
/**
* 创建人:贺小五
* 创建时间:2017-08-19 00:22:39
* 描述:
* 添加工作者
*/
void addWorker(int num);
/**
* 创建人:贺小五
* 创建时间:2017-08-19 00:22:55
* 描述:
* 移除工作者
*/
void removeWorker(int num);
/**
* 创建人:贺小五
* 创建时间:2017-08-19 00:23:04
* 描述:
* 线程池大小
*/
int poolSize();
/**
* 创建人:贺小五
* 创建时间:2017-08-19 00:23:15
* 描述:
* 停止线程池(不管是否有任务,都停止)
*/
void shutDownNow();
}
既然定义好接口了,我们来定义一下实现
public class DefaultThreadPool implements Pool{
/**
* 使用java并发库下的阻塞队列来做,这样我们就不需要做额外的同步跟阻塞操作
*/
private final BlockingQueue<Runnable> jobs = new LinkedBlockingQueue<>();
private final BlockingQueue<Worker> workers = new LinkedBlockingQueue<>();
/**
* 创建人:贺小五
* 创建时间:2017-08-19 00:24:22
* 描述:
* 初始化线程池大小
*/
public DefaultThreadPool(int num) {
initPool(num);
}
@Override
public int poolSize() {
return workers.size();
}
private void initPool(int num){
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.add(worker);
worker.start();
}
}
@Override
public void execute(Runnable runnable) {
if(runnable!=null){
jobs.add(runnable);
}
}
@Override
public void shutDown() {
/**
* 通过不断的循环来判断,任务队列是否已经清空,
* 如果队列任务清空了,将工作者队列的线程停止
* 打破循环,清空工作者队列
*/
while(true){
if(jobs.size()==0){
for (Worker worker : workers) {
worker.stopRunning();
}
break;
}
}
workers.clear();
}
@Override
public void shutDownNow() {
/**
* 清空任务队列,然后调用停止线程池的方法
*/
jobs.clear();
shutDown();
}
@Override
public void addWorker(int num){
/**
* 添加新的工作者到工作者队列尾部
*/
for (int i = 0; i < num; i++) {
Worker worker = new Worker();
workers.offer(worker);
worker.start();
}
}
@Override
public void removeWorker(int num) {
/**
* 移除工作者阻塞队列头部的线程
*/
for (int i = 0; i < num; i++) {
try {
workers.take().stopRunning();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
private class Worker extends Thread{
//通过 volatile修饰的变量,保证变量的可见性,从而能让线程马上得知状态
private volatile boolean isRunning = true;
@Override
public void run() {
//通过自旋不停的从任务队列中获取任务,
while (isRunning){
Runnable runnable = null;
try {
//如果工作队列为空,则紫色
runnable = jobs.take();
} catch (InterruptedException e) {
e.printStackTrace();
}
if(runnable!=null){
System.out.print(getName()+"-->");
runnable.run();
}
// 睡眠 100毫秒,验证 shutdown 是否是在任务执行完毕后才会关闭线程池
try {
Thread.sleep(100);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
System.out.println(getName()+"销毁...");
}
public void stopRunning(){
this.isRunning = false;
}
}
}
上面接口实现很简单,工作者通过私有内部类,继承Thread 类来当工作者,内部定义两个阻塞队列,用于存储任务跟工作者,其它线程最大数,最小数,任务队列最大数暂时不定义了,这只是一个简单的实现,各位看官有兴趣,可以自己进行扩展
接下来就是测试代码,来验证下接口定义的方法
public class PoolTest {
public static void main(String[] args){
//构建一个只有10个线程的线程池
Pool pool = new DefaultThreadPool(10);
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
//放500个任务进去,让线程池进行消费
for (int i = 0; i < 500; i++) {
int finalI = i;
pool.execute(new Runnable() {
@Override
public void run() {
System.out.println("打印数字:"+ finalI);
}
});
}
/**
* 验证线程池的消费完任务停止以及不等任务队列清空就停止任务
*/
System.out.println("停止线程池");
pool.shutDown();
//pool.shutDownNow();
/**
* 移除2个工作者
*/
//pool.removeWorker(2);
//System.out.println("线程池大小:"+pool.poolSize());
/**
* 添加5个工作者
*/
//pool.addWorker(5);
//System.out.println("线程池大小:"+pool.poolSize());
}
}
以上,就是自己动手实现的一个简单线程池,通过自旋锁,让工作线程不停的从任务队列获取任务,非常简单的实现,如果不想使用java提供的阻塞队列,想自己做,可以使用 java关键字 synchronize 配合 wait() notify()方法来实现或者是 lock接口配合 condition的 await()跟signal() 方法来实现一个简单的阻塞队列
到这,文章就结束了!
以上,均为本人个人理解,比较简单的理解,或许跟各位看官理解的有出入,欢迎指正交流
欢迎转载,请注明出处跟作者,谢谢!