文档章节

自己动手写一个线程池

tongqu
 tongqu
发布于 2016/02/15 23:17
字数 1469
阅读 122
收藏 2

阿里云携手百名商业领袖、技术大咖,带您一探行进中的数字新基建!>>>

基础知识:

    http://blog.csdn.net/i_lovefish/article/details/8042708

一。ExcutorService和ThreadGroup的比较

    在使用线程池的时候,大多数情况我们会使用excutors类来创建一个线程池。过程不再赘述。但我们发现,ExcutorService中对线程的操作只有submit、shutdown、shutdownnow等寥寥几个方法。对线程的管理并不细致。而ThreadGroup类中,有很多诸如获取存活线程数量、获取所有线程、中断线程等诸多方法。所以我们考虑用ThreadGroup来模拟一个线程池。

二。通过继承ThreadGroup模拟一个线程池

public class ThreadPoolTest extends ThreadGroup{
    //线程池是否开启
    private boolean isAlive;
    //线程池中的任务队列
    private LinkedList taskQueue;
    //线程池中的线程id
    private int threadID;
    //线程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //设置为守护线程,表示当线程池中的所有线程都销毁时(interrupt方法执行),该线程池自动销毁
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 声明一些线程,并让这些线程运行
        }
    }
  }

这里的注释TODO的部分很重要,需要做的功能如下:

  1. 当任务队列为null时,我们需要让我们声明的线程等待s。

  2. 当任务队列中不为null是,我们需要唤醒我们的线程,拿到任务队列中的任务去执行。

综上,我们需要new一些个性化的线程,让这些线程去任务队列里取任务线程,根据队列的情况执行不同操作。

方便起见,我们直接在ThreadPoolTest里写一个继承了Thread的内部类,Todo部分改为          

new PooledThread().start();

private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //如果该线程没有被终止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(task==null){
                    //之所以没有用wait()是因为wait和notify需要在同步代码块或者同步方法中执行
                    //之所以没用task.wait()是因为task为null
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //当线程族中的线程有未被捕获的异常时,jvm会去调用uncaughtException方法
                    //这个方法也是threadGroup的一个特色
                    uncaughtException(this, e);
                }
            }
        }
    }

上述代码中涉及到了getTask方法,该方法应该写在ThreadPoolTest中,用来获取任务队列中的线程

protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();//wait在这那!!说明如果队列中没有任务,则一直处于阻塞状态
        }
        return (Runnable) this.taskQueue.removeFirst();
    }

下面开始实现线程池的submit方法,往队列中添加任务,唤醒getTask

//添加新任务
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //将工作线程放到任务队列的尾部
            this.taskQueue.add(task);
            //通知工作线程取任务
            notify();//getTask被唤醒啦,构造方法中new出来的线程由阻塞状态进入可运行状态了
        }        
    }

重点方法介绍完毕,我们再加一些shutdown,shutdownnow方法就简单多了

    

public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //终止线程池中的所有线程
        this.interrupt();
    }
    //关闭线程池,并等待线程池中的所有任务被运行完,但不能接受新的任务
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //将线程池中的活动线程拷贝到新创建的线程数组thread中
        Thread[] threads=new Thread[this.activeCount()];
        //将线程池中活动的线程拷贝到新创建的线程数组中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待所有线程执行结束
                threads[i].join();
            } catch (InterruptedException e) {
                     e.printStackTrace();
            }
        }
        System.out.println("所有线程运行完毕");
    }

全部代码如下

package com.me.threadtest;

import java.util.LinkedList;


public class ThreadPoolTest extends ThreadGroup{
    //线程池是否开启
    private boolean isAlive;
    //线程池中的任务队列
    private LinkedList taskQueue;
    //线程池中的线程id
    private int threadID;
    //线程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //设置为守护线程,表示当线程池中的所有线程都销毁时,该线程池自动销毁
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 声明一些线程,并让这些线程运行
            new PooledThread().start();
        }
    }
    //添加新任务
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //将工作线程放到任务队列的尾部
            this.taskQueue.add(task);
            //通知工作线程取任务
            //TODO 不懂
            notify();
        }
        
    }
    //获取任务
    protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();
        }
        return (Runnable) this.taskQueue.removeFirst();
    }
    public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //终止线程池中的所有线程
        this.interrupt();
    }
    //关闭线程池,并等待线程池中的所有任务被运行完,但不能接受新的任务
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //将线程池中的活动线程拷贝到新创建的线程数组thread中
        Thread[] threads=new Thread[this.activeCount()];
        //将线程池中活动的线程拷贝到新创建的线程数组中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待所有线程执行结束
                threads[i].join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("所有线程运行完毕");
    }
    private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //如果该线程没有被终止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                if(task==null){
//本想通过下面注释掉的代码实现‘队列为null时等待’,但发现实现不了,所以在gettask中实现吧
//                    task.wait();
//                    wait();
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //当线程族中的线程有未被捕获的异常时,jvm会去调用uncaughtException方法
                    uncaughtException(this, e);
                }
            }
        }
    }
}

使用的时候,比如我们设置的线程数为5,添加到队列中的线程数为10,那么同时运行的线程只有5个,ExcutorService中还有一个队列的最大长度,这个在上述代码中在添加一个maxSize的属性,并改动少数代码就可以实现了,非常简单,不再赘述。

© 著作权归作者所有

tongqu
粉丝 38
博文 37
码字总数 26162
作品 0
海淀
私信 提问
加载中

评论(0)

自己动手实现简单web容器二

上一篇博客《自己动手实现简单web容器一》,留下了一些问题,今天我们来解决并行访问的问题,即多个用户同时访问,我们的web服务器能给出响应,而不至于阻塞住。现代计算机越来越好,CPU核心...

冷血狂魔
2016/08/21
81
0
java中线程wait,notify,notifyall

有次给客户写一个数据库连接池的插件,由于本人公司的产品是NoSqL数据库,不能和开源的数据库连接池配合使用,所有只有自己动手写 其中有一个场景是这样的:多个线程从一个池子里面拿连接,超...

1987times
2014/04/04
65
0
死磕 java线程系列之自己动手写一个线程池

(手机横屏看源码更方便) 问题 (1)自己动手写一个线程池需要考虑哪些因素? (2)自己动手写的线程池如何测试? 简介 线程池是Java并发编程中经常使用到的技术,那么自己如何动手写一个线...

彤哥读源码
2019/10/09
5.1K
18
自己动手写把”锁”---终极篇

锁是整个Java并发包的实现基础,通过学习本系列文章,将对你理解Java并发包的本质有很大的帮助。 前边几篇中,我已经把实现锁用到的技术,进行了一一讲述。这其中有原子性、内存模型、LockS...

osc_5s0xzojq
2018/01/12
2
0
自己动手实现分布式任务调度框架(续)

  之前写过一篇:自己动手实现分布式任务调度框架本来是用来闲来分享一下自己的思维方式,时至今日发现居然有些人正在使用了,本着对代码负责任的态度,对代码部分已知bug进行了修改,并增加...

osc_7dhd4ad7
04/16
1
0

没有更多内容

加载失败,请刷新页面

加载更多

css spirit 位置计算

http://www.spritecow.com/

lemos
11分钟前
5
0
比较Git中的两个分支? [重复] - Comparing two branches in Git? [duplicate]

问题: This question already has an answer here: 这个问题在这里已有答案: Showing which files have changed between two revisions 17 answers 显示两个修订版本之间已更改的文件 17个...

javail
18分钟前
19
0
快速理解闭包

闭包是一个很有意思的东西,理解起来很绕,但是理解后很简单,网上的讲解也朦朦胧胧的.根据自己的理解解释一番: 预备知识:函数中的作用域 python中函数的作用域由def关键字界定,函数内的代码访...

hc321
23分钟前
14
0
Portworx on OpenShift 原理讲解视频

Portworx on RedHat OpenShift https://v.qq.com/x/page/g0975mnzln0.html 欢迎回到Portworx技术讲解系列视频。我们今天介绍红帽Openshift上的Portworx。我们讨论基本的OpenShift部署,包括本...

Portworx
46分钟前
164
0
type_traits类型特征萃取

type_traits库提供一组特征(traits)类——元函数,可以在编译期确定类型是否具有某些特征。 根据返回类型type_traits库里的元函数可以分为以下两大类: 检查元数据属性的值元函数:以::val...

零落年华
52分钟前
17
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部