文档章节

自己动手写一个线程池

tongqu
 tongqu
发布于 2016/02/15 23:17
字数 1469
阅读 50
收藏 2

基础知识:

    http://blog.csdn.net/i_lovefish/article/details/8042708

一。ExcutorService和ThreadGroup的比较

    在使用线程池的时候,大多数情况我们会使用excutors类来创建一个线程池。过程不再赘述。但我们发现,ExcutorService中对线程的操作只有submit、shutdown、shutdownnow等寥寥几个方法。对线程的管理并不细致。而ThreadGroup类中,有很多诸如获取存活线程数量、获取所有线程、中断线程等诸多方法。所以我们考虑用ThreadGroup来模拟一个线程池。

二。通过继承ThreadGroup模拟一个线程池

public class ThreadPoolTest extends ThreadGroup{
    //线程池是否开启
    private boolean isAlive;
    //线程池中的任务队列
    private LinkedList taskQueue;
    //线程池中的线程id
    private int threadID;
    //线程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //设置为守护线程,表示当线程池中的所有线程都销毁时(interrupt方法执行),该线程池自动销毁
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 声明一些线程,并让这些线程运行
        }
    }
  }

这里的注释TODO的部分很重要,需要做的功能如下:

  1. 当任务队列为null时,我们需要让我们声明的线程等待s。

  2. 当任务队列中不为null是,我们需要唤醒我们的线程,拿到任务队列中的任务去执行。

综上,我们需要new一些个性化的线程,让这些线程去任务队列里取任务线程,根据队列的情况执行不同操作。

方便起见,我们直接在ThreadPoolTest里写一个继承了Thread的内部类,Todo部分改为          

new PooledThread().start();

private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //如果该线程没有被终止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                if(task==null){
                    //之所以没有用wait()是因为wait和notify需要在同步代码块或者同步方法中执行
                    //之所以没用task.wait()是因为task为null
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //当线程族中的线程有未被捕获的异常时,jvm会去调用uncaughtException方法
                    //这个方法也是threadGroup的一个特色
                    uncaughtException(this, e);
                }
            }
        }
    }

上述代码中涉及到了getTask方法,该方法应该写在ThreadPoolTest中,用来获取任务队列中的线程

protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();//wait在这那!!说明如果队列中没有任务,则一直处于阻塞状态
        }
        return (Runnable) this.taskQueue.removeFirst();
    }

下面开始实现线程池的submit方法,往队列中添加任务,唤醒getTask

//添加新任务
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //将工作线程放到任务队列的尾部
            this.taskQueue.add(task);
            //通知工作线程取任务
            notify();//getTask被唤醒啦,构造方法中new出来的线程由阻塞状态进入可运行状态了
        }        
    }

重点方法介绍完毕,我们再加一些shutdown,shutdownnow方法就简单多了

    

public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //终止线程池中的所有线程
        this.interrupt();
    }
    //关闭线程池,并等待线程池中的所有任务被运行完,但不能接受新的任务
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //将线程池中的活动线程拷贝到新创建的线程数组thread中
        Thread[] threads=new Thread[this.activeCount()];
        //将线程池中活动的线程拷贝到新创建的线程数组中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待所有线程执行结束
                threads[i].join();
            } catch (InterruptedException e) {
                     e.printStackTrace();
            }
        }
        System.out.println("所有线程运行完毕");
    }

全部代码如下

package com.me.threadtest;

import java.util.LinkedList;


public class ThreadPoolTest extends ThreadGroup{
    //线程池是否开启
    private boolean isAlive;
    //线程池中的任务队列
    private LinkedList taskQueue;
    //线程池中的线程id
    private int threadID;
    //线程池的id
    private static int threadPoolId;
    public  ThreadPoolTest(int numThreads) {
        super("ThreadPool--"+numThreads);
        //设置为守护线程,表示当线程池中的所有线程都销毁时,该线程池自动销毁
        super.setDaemon(true);
        this.isAlive=true;
        this.taskQueue=new LinkedList(); 
        for(int i=0;i<numThreads;i++){
            //TODO 声明一些线程,并让这些线程运行
            new PooledThread().start();
        }
    }
    //添加新任务
    public synchronized void submit(Runnable task){
        if(!this.isAlive){
            throw new IllegalStateException();
        }
        if(task!=null){
            //将工作线程放到任务队列的尾部
            this.taskQueue.add(task);
            //通知工作线程取任务
            //TODO 不懂
            notify();
        }
        
    }
    //获取任务
    protected synchronized Runnable getTask() throws InterruptedException{
        while(this.taskQueue.size()==0){
            if(!this.isAlive){
                return null;
            }
            wait();
        }
        return (Runnable) this.taskQueue.removeFirst();
    }
    public synchronized void shutdownNow(){
        if(isAlive){
            this.isAlive=false;
            this.taskQueue.clear();
        }
        //终止线程池中的所有线程
        this.interrupt();
    }
    //关闭线程池,并等待线程池中的所有任务被运行完,但不能接受新的任务
    public void shutdown(){
        synchronized (this) {
            isAlive=false;
            notifyAll();
        }
        
        //将线程池中的活动线程拷贝到新创建的线程数组thread中
        Thread[] threads=new Thread[this.activeCount()];
        //将线程池中活动的线程拷贝到新创建的线程数组中
        int count=this.enumerate(threads);
        for(int i=0;i<count;i++){
            try {
                //等待所有线程执行结束
                threads[i].join();
            } catch (InterruptedException e) {
                // TODO Auto-generated catch block
                e.printStackTrace();
            }
        }
        System.out.println("所有线程运行完毕");
    }
    private class PooledThread extends Thread{
        public PooledThread(){
            super(ThreadPoolTest.this,"PooledThread--"+(threadID++));
            
        }
        @Override
        public void run() {
            //如果该线程没有被终止
            while(!isInterrupted()){
                Runnable task=null;
                try {
                    task=getTask();
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                
                if(task==null){
//本想通过下面注释掉的代码实现‘队列为null时等待’,但发现实现不了,所以在gettask中实现吧
//                    task.wait();
//                    wait();
                    return;
                }
                try {
                    task.run(); 
                } catch (Throwable e) {
                    e.printStackTrace();
                    //当线程族中的线程有未被捕获的异常时,jvm会去调用uncaughtException方法
                    uncaughtException(this, e);
                }
            }
        }
    }
}

使用的时候,比如我们设置的线程数为5,添加到队列中的线程数为10,那么同时运行的线程只有5个,ExcutorService中还有一个队列的最大长度,这个在上述代码中在添加一个maxSize的属性,并改动少数代码就可以实现了,非常简单,不再赘述。

© 著作权归作者所有

共有 人打赏支持
tongqu
粉丝 39
博文 37
码字总数 26162
作品 0
海淀
私信 提问
自己动手实现简单web容器二

上一篇博客《自己动手实现简单web容器一》,留下了一些问题,今天我们来解决并行访问的问题,即多个用户同时访问,我们的web服务器能给出响应,而不至于阻塞住。现代计算机越来越好,CPU核心...

冷血狂魔
2016/08/21
26
0
java中线程wait,notify,notifyall

有次给客户写一个数据库连接池的插件,由于本人公司的产品是NoSqL数据库,不能和开源的数据库连接池配合使用,所有只有自己动手写 其中有一个场景是这样的:多个线程从一个池子里面拿连接,超...

1987times
2014/04/04
0
0
c#执行定时计算限制操作(计时器)

在.Net Framework Class Library(FCL)中,System.Threading命名空间下定义了一个Timer类,这就是常用的一个计时器。实际上FCL总共提供了如下几种计时器: 1、System.Threading.Timer 在实际...

嗯哼9925
2017/11/22
0
0
堆栈、堆、方法区介绍

堆栈、堆、方法区介绍 终于开始看java啦…不知道有没有很多人跟我一样想法,先把安卓看完了再去看java,因为安卓直接跟工资挂钩而java更多的是内功.直到前段时间我和我们这边后台大佬对接开发w...

zly921112
2017/03/10
0
0
Android 中的子线程解析

Android 中线程可分为和两类,其中主线程也就是,它的主要这作用就是运行四大组件、处理界面交互。子线程则主要是处理耗时任务,也是我们要重点分析的。 首先 Java 中的各种线程在 Android ...

SheHuan
07/27
0
0

没有更多内容

加载失败,请刷新页面

加载更多

小白带你认识netty(二)之netty服务端启动(上)

上一章 中的标准netty启动代码中,ServerBootstrap到底是如何启动的呢?这一章我们来瞅下。 server.group(bossGroup, workGroup);server.channel(NioServerSocketChannel.class).optio...

天空小小
35分钟前
1
0
聊聊storm trident batch的分流与聚合

序 本文主要研究一下storm trident batch的分流与聚合 实例 TridentTopology topology = new TridentTopology(); topology.newStream("spout1", spout) .p......

go4it
昨天
3
0
3分钟总结Mybatis别名

1.系统内置别名: 把类型全小写(resultType/paramType) 2.给某个类起别名 2.1 alias=”自定义” <typeAliases> <typeAlias type="com.bjsxt.pojo.People" alias="peo"/> </typeAli......

KingFightingAn
昨天
2
0
JAVA设计模式之模板方法模式和建造者模式

一、前期回顾 上一篇《Java 设计模式之工厂方法模式与抽象工厂模式》介绍了三种工厂模式,分别是工厂方法模式,简单工厂方法模式,抽象工厂模式,文中详细根据实际场景介绍了三种模式的定义,...

木木匠
昨天
8
0
C中的宏的使用(宏嵌套/宏展开/可变参数宏)

基本原则: 在展开当前宏函数时,如果形参有#或##则不进行宏参数的展开,否则先展开宏参数,再展开当前宏。 #是在定义两边加上双引号 #define _TOSTR(s) #sprintf(_TOSTR(test ABC))pr...

SamXIAO
昨天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部