文档章节

自己动手写一个线程池

tongqu
 tongqu
发布于 2016/02/15 23:17
字数 1469
阅读 49
收藏 2

基础知识:

    http://blog.csdn.net/i_lovefish/article/details/8042708

一。ExcutorService和ThreadGroup的比较

    在使用线程池的时候,大多数情况我们会使用excutors类来创建一个线程池。过程不再赘述。但我们发现,ExcutorService中对线程的操作只有submit、shutdown、shutdownnow等寥寥几个方法。对线程的管理并不细致。而ThreadGroup类中,有很多诸如获取存活线程数量、获取所有线程、中断线程等诸多方法。所以我们考虑用ThreadGroup来模拟一个线程池。

二。通过继承ThreadGroup模拟一个线程池

public class ThreadPoolTest extends ThreadGroup{
    //线程池是否开启
    private boolean isAlive;
    //线程池中的任务队列
    private LinkedList taskQueue;
    //线程池中的线程id
    private int threadID;
    //线程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //设置为守护线程,表示当线程池中的所有线程都销毁时(interrupt方法执行),该线程池自动销毁
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 声明一些线程,并让这些线程运行
        }
    }
  }

这里的注释TODO的部分很重要,需要做的功能如下:

  1. 当任务队列为null时,我们需要让我们声明的线程等待s。

  2. 当任务队列中不为null是,我们需要唤醒我们的线程,拿到任务队列中的任务去执行。

综上,我们需要new一些个性化的线程,让这些线程去任务队列里取任务线程,根据队列的情况执行不同操作。

方便起见,我们直接在ThreadPoolTest里写一个继承了Thread的内部类,Todo部分改为          

new PooledThread().start();

private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //如果该线程没有被终止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(task==null){
                    //之所以没有用wait()是因为wait和notify需要在同步代码块或者同步方法中执行
                    //之所以没用task.wait()是因为task为null
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //当线程族中的线程有未被捕获的异常时,jvm会去调用uncaughtException方法
                    //这个方法也是threadGroup的一个特色
                    uncaughtException(this, e);
                }
            }
        }
    }

上述代码中涉及到了getTask方法,该方法应该写在ThreadPoolTest中,用来获取任务队列中的线程

protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();//wait在这那!!说明如果队列中没有任务,则一直处于阻塞状态
        }
        return (Runnable) this.taskQueue.removeFirst();
    }

下面开始实现线程池的submit方法,往队列中添加任务,唤醒getTask

//添加新任务
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //将工作线程放到任务队列的尾部
            this.taskQueue.add(task);
            //通知工作线程取任务
            notify();//getTask被唤醒啦,构造方法中new出来的线程由阻塞状态进入可运行状态了
        }        
    }

重点方法介绍完毕,我们再加一些shutdown,shutdownnow方法就简单多了

    

public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //终止线程池中的所有线程
        this.interrupt();
    }
    //关闭线程池,并等待线程池中的所有任务被运行完,但不能接受新的任务
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //将线程池中的活动线程拷贝到新创建的线程数组thread中
        Thread[] threads=new Thread[this.activeCount()];
        //将线程池中活动的线程拷贝到新创建的线程数组中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待所有线程执行结束
                threads[i].join();
            } catch (InterruptedException e) {
                     e.printStackTrace();
            }
        }
        System.out.println("所有线程运行完毕");
    }

全部代码如下

package com.me.threadtest;

import java.util.LinkedList;


public class ThreadPoolTest extends ThreadGroup{
    //线程池是否开启
    private boolean isAlive;
    //线程池中的任务队列
    private LinkedList taskQueue;
    //线程池中的线程id
    private int threadID;
    //线程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //设置为守护线程,表示当线程池中的所有线程都销毁时,该线程池自动销毁
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 声明一些线程,并让这些线程运行
            new PooledThread().start();
        }
    }
    //添加新任务
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //将工作线程放到任务队列的尾部
            this.taskQueue.add(task);
            //通知工作线程取任务
            //TODO 不懂
            notify();
        }
        
    }
    //获取任务
    protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();
        }
        return (Runnable) this.taskQueue.removeFirst();
    }
    public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //终止线程池中的所有线程
        this.interrupt();
    }
    //关闭线程池,并等待线程池中的所有任务被运行完,但不能接受新的任务
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //将线程池中的活动线程拷贝到新创建的线程数组thread中
        Thread[] threads=new Thread[this.activeCount()];
        //将线程池中活动的线程拷贝到新创建的线程数组中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待所有线程执行结束
                threads[i].join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("所有线程运行完毕");
    }
    private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //如果该线程没有被终止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                if(task==null){
//本想通过下面注释掉的代码实现‘队列为null时等待’,但发现实现不了,所以在gettask中实现吧
//                    task.wait();
//                    wait();
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //当线程族中的线程有未被捕获的异常时,jvm会去调用uncaughtException方法
                    uncaughtException(this, e);
                }
            }
        }
    }
}

使用的时候,比如我们设置的线程数为5,添加到队列中的线程数为10,那么同时运行的线程只有5个,ExcutorService中还有一个队列的最大长度,这个在上述代码中在添加一个maxSize的属性,并改动少数代码就可以实现了,非常简单,不再赘述。

© 著作权归作者所有

共有 人打赏支持
tongqu
粉丝 39
博文 37
码字总数 26162
作品 0
海淀
java中线程wait,notify,notifyall

有次给客户写一个数据库连接池的插件,由于本人公司的产品是NoSqL数据库,不能和开源的数据库连接池配合使用,所有只有自己动手写 其中有一个场景是这样的:多个线程从一个池子里面拿连接,超...

1987times
2014/04/04
0
0
自己动手实现简单web容器二

上一篇博客《自己动手实现简单web容器一》,留下了一些问题,今天我们来解决并行访问的问题,即多个用户同时访问,我们的web服务器能给出响应,而不至于阻塞住。现代计算机越来越好,CPU核心...

冷血狂魔
2016/08/21
26
0
c#执行定时计算限制操作(计时器)

在.Net Framework Class Library(FCL)中,System.Threading命名空间下定义了一个Timer类,这就是常用的一个计时器。实际上FCL总共提供了如下几种计时器: 1、System.Threading.Timer 在实际...

嗯哼9925
2017/11/22
0
0
堆栈、堆、方法区介绍

堆栈、堆、方法区介绍 终于开始看java啦…不知道有没有很多人跟我一样想法,先把安卓看完了再去看java,因为安卓直接跟工资挂钩而java更多的是内功.直到前段时间我和我们这边后台大佬对接开发w...

zly921112
2017/03/10
0
0
linux通过c++实现线程池类

目录 线程池的实现 前言 线程池的概念 使用原因及适用场合 线程池的实现原理 程序测试 线程池的实现 前言 初学C++,想封装点常用的C++类,已经写好了mutex,cond,thread的类,想用起来写点东西,...

艾露米婭娜
08/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

多线程

1. 多线程概念。并发和并行的概念。 多线程指的是一段时间内cpu同时执行多个线程。一个程序至少运行>=1个进程,进程就是运行中的程序,而一个进程至少运行>=1个线程,线程是操作系统能调度的...

鱼想吃肉
今天
0
0
HBase 表修复在线方式和离线方式

一、在线修复 1.1 使用检查命令 $ ./bin/hbase hbck 该命令可完整修复 HBase 元数据信息;存在有错误信息会进行输出; 也可以通过如下命令查看详细信息: $ ./bin/hbase hbck -details 1.2 ...

Ryan-瑞恩
今天
3
0
redis 系列二 -- 常用命令

1.基础命令 info ping quit save dbsize select flushdb flushall 2.键命令 2.1 set 直接赋值 set a a 2.2 get 取值 get a 2.3 exists 是否存在 exists a 2.4 expire 设置剩余时间 秒 expire......

imbiao
今天
2
0
php foreach

<?php// 数组的引用$a=array(1,2,3,4,5);foreach($a as $key=>&$value){$value=$value*2;}print_r($a);echo " $key -------------------$value\r\n";/** * ...

小张525
今天
3
0
12-利用思维导图梳理JavaSE-多线程

12-利用思维导图梳理JavaSE-多线程 主要内容 1.线程概念 2.线程开发 3.线程的状态 4.线程的同步和死锁 5.Java5.0并发库类 QQ/知识星球/个人WeChat/公众号二维码 本文为原创文章,如果对你有一...

飞鱼说编程
今天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部