自己动手写一个线程池
博客专区 > tongqu 的博客 > 博客详情
自己动手写一个线程池
tongqu 发表于2年前
自己动手写一个线程池
  • 发表于 2年前
  • 阅读 36
  • 收藏 2
  • 点赞 1
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: 比较ExcutorService和ThreadGroup,并通过继承ThreadGroup写一个线程池

基础知识:

    http://blog.csdn.net/i_lovefish/article/details/8042708

一。ExcutorService和ThreadGroup的比较

    在使用线程池的时候,大多数情况我们会使用excutors类来创建一个线程池。过程不再赘述。但我们发现,ExcutorService中对线程的操作只有submit、shutdown、shutdownnow等寥寥几个方法。对线程的管理并不细致。而ThreadGroup类中,有很多诸如获取存活线程数量、获取所有线程、中断线程等诸多方法。所以我们考虑用ThreadGroup来模拟一个线程池。

二。通过继承ThreadGroup模拟一个线程池

public class ThreadPoolTest extends ThreadGroup{
    //线程池是否开启
    private boolean isAlive;
    //线程池中的任务队列
    private LinkedList taskQueue;
    //线程池中的线程id
    private int threadID;
    //线程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //设置为守护线程,表示当线程池中的所有线程都销毁时(interrupt方法执行),该线程池自动销毁
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 声明一些线程,并让这些线程运行
        }
    }
  }

这里的注释TODO的部分很重要,需要做的功能如下:

  1. 当任务队列为null时,我们需要让我们声明的线程等待s。

  2. 当任务队列中不为null是,我们需要唤醒我们的线程,拿到任务队列中的任务去执行。

综上,我们需要new一些个性化的线程,让这些线程去任务队列里取任务线程,根据队列的情况执行不同操作。

方便起见,我们直接在ThreadPoolTest里写一个继承了Thread的内部类,Todo部分改为          

new PooledThread().start();

private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //如果该线程没有被终止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(task==null){
                    //之所以没有用wait()是因为wait和notify需要在同步代码块或者同步方法中执行
                    //之所以没用task.wait()是因为task为null
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //当线程族中的线程有未被捕获的异常时,jvm会去调用uncaughtException方法
                    //这个方法也是threadGroup的一个特色
                    uncaughtException(this, e);
                }
            }
        }
    }

上述代码中涉及到了getTask方法,该方法应该写在ThreadPoolTest中,用来获取任务队列中的线程

protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();//wait在这那!!说明如果队列中没有任务,则一直处于阻塞状态
        }
        return (Runnable) this.taskQueue.removeFirst();
    }

下面开始实现线程池的submit方法,往队列中添加任务,唤醒getTask

//添加新任务
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //将工作线程放到任务队列的尾部
            this.taskQueue.add(task);
            //通知工作线程取任务
            notify();//getTask被唤醒啦,构造方法中new出来的线程由阻塞状态进入可运行状态了
        }        
    }

重点方法介绍完毕,我们再加一些shutdown,shutdownnow方法就简单多了

    

public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //终止线程池中的所有线程
        this.interrupt();
    }
    //关闭线程池,并等待线程池中的所有任务被运行完,但不能接受新的任务
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //将线程池中的活动线程拷贝到新创建的线程数组thread中
        Thread[] threads=new Thread[this.activeCount()];
        //将线程池中活动的线程拷贝到新创建的线程数组中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待所有线程执行结束
                threads[i].join();
            } catch (InterruptedException e) {
                     e.printStackTrace();
            }
        }
        System.out.println("所有线程运行完毕");
    }

全部代码如下

package com.me.threadtest;

import java.util.LinkedList;


public class ThreadPoolTest extends ThreadGroup{
    //线程池是否开启
    private boolean isAlive;
    //线程池中的任务队列
    private LinkedList taskQueue;
    //线程池中的线程id
    private int threadID;
    //线程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //设置为守护线程,表示当线程池中的所有线程都销毁时,该线程池自动销毁
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 声明一些线程,并让这些线程运行
            new PooledThread().start();
        }
    }
    //添加新任务
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //将工作线程放到任务队列的尾部
            this.taskQueue.add(task);
            //通知工作线程取任务
            //TODO 不懂
            notify();
        }
        
    }
    //获取任务
    protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();
        }
        return (Runnable) this.taskQueue.removeFirst();
    }
    public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //终止线程池中的所有线程
        this.interrupt();
    }
    //关闭线程池,并等待线程池中的所有任务被运行完,但不能接受新的任务
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //将线程池中的活动线程拷贝到新创建的线程数组thread中
        Thread[] threads=new Thread[this.activeCount()];
        //将线程池中活动的线程拷贝到新创建的线程数组中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待所有线程执行结束
                threads[i].join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("所有线程运行完毕");
    }
    private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //如果该线程没有被终止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                if(task==null){
//本想通过下面注释掉的代码实现‘队列为null时等待’,但发现实现不了,所以在gettask中实现吧
//                    task.wait();
//                    wait();
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //当线程族中的线程有未被捕获的异常时,jvm会去调用uncaughtException方法
                    uncaughtException(this, e);
                }
            }
        }
    }
}

使用的时候,比如我们设置的线程数为5,添加到队列中的线程数为10,那么同时运行的线程只有5个,ExcutorService中还有一个队列的最大长度,这个在上述代码中在添加一个maxSize的属性,并改动少数代码就可以实现了,非常简单,不再赘述。

共有 人打赏支持
粉丝 39
博文 37
码字总数 26162
×
tongqu
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: