文档章节

终止阻塞的线程

摆渡者
 摆渡者
发布于 2015/10/08 09:17
字数 2572
阅读 1048
收藏 6

线程状态

    我们知道,一个线程可以处于以下四种状态之一:

    1. 新建(New):当线程被创建时,它只会短暂地处于这种状态。此时它已经分配了必须的系统资源,并执行了初始化。此刻线程已经有资格获取CPU时间了,之后调度器将把这个线程转变为可运行状态或阻塞状态。

    2. 就绪(Runnable):在这种状态下,只要调度器将CPU时间片分给线程,线程就可以运行。也就是说,在任意时刻,线程可以运行也可以不运行。

    3. 阻塞(Blocked):线程能够运行,但有某个或多个条件阻止它运行。当线程处于阻塞状态时,调度器将忽略线程,不会分配给线程任何CPU时间片。直到线程重新进入了就绪状态,它才有可能执行操作。

    4. 死亡(Dead):处于死亡或终止状态的线程将不再是可调度的,并且再也不会得到CPU时间,它的任务已经结束,或不再是可运行的。任务死亡的通常方式是从run()方法返回,但是任务的线程还可以不被中断。

进入线程状态

    而一个任务进入阻塞状态,可能由以下原因造成:

    1. 通过调用sleep(milliseconds)方法使任务进入休眠状态,在这种情况下,任务在指定的时间内不会运行。

    2. 通过调用wait()方法使线程挂起。直到线程得到了notify()或notifyAll()消息(或者在JavaSE5的java.util.concurrent类库中等价的signal()活signalAll()消息),线程才会进入就绪状态。

    3. 任务在等待某个I/O操作完成。

    4. 任务试图在某个对象上调用其同步控制方法,但是对象锁不可用,因为另一个任务已经获取了这个锁。

    在较早的代码中,也可能会看到用suspend()和resume()方法来阻塞和唤醒线程,但是在Java新版本中这些方法被废弃了,因为它们可能导致死锁。stop()方法也已经被废弃了,因为它不释放线程获得的锁,并且如果线程处于不一致的状态,其他任务可以在这种状态下浏览并修改它们。

    现在我们需要查看的问题是:有事你希望能够终止处于阻塞状态的任务。如果对于阻塞装填的任务,你不能等待其到达代码中可以检查其状态值的某一点,因而决定让它主动终止,那么你就必须强制这个任务跳出阻塞状态。

中断

    正如你所想象的,在Runnable.run()方法的中间打断它,与到达程序准备好离开该方法的其他一些地方相比,要复杂得多。因为当你打断被阻塞的任务时,可能需要清理资源。正因为这一点,在任务的run()方法中间打断,更像是抛出的异常,因此在Java线程中的这种类型的异常中断中用到了异常。为了在以这种方式终止任务时返回良好的状态,你必须仔细考虑代码的执行路径,并仔细编写catch字句以便正确的清除所有事物。

    Thread类包含了interrupt()方法,因此你可以终止被阻塞的任务,这个方法将设置线程的中断状态。如果一个线程已经被阻塞,或者试图执行一个阻塞操作,那么设置这个线程的中断状态将抛出InterruptedException。当抛出该异常或者该任务调用Thread.interrupted()时,中断状态将被复位。正如你将看到的,Thread.interrupted()提供了离开run()循环而不抛出异常的第二种方式。

    为了调用interrupt(),你必须持有Thread对象。你可能已经注意到了,新的concurrent类库似乎在避免对Thread对象上的直接操作,转而尽量的通过Executor来执行所有操作。如果你在Executor上调用shutdownNow(),那么它将发送一个interrupt()调用给它启动的线程。这么做是有意义的,因为当你完成工程中的某个部分或者整个程序时,通常会希望同时关闭某个特定Executor的所有任务。然而,你有时也会希望只中断某个单一任务。如果使用Executor,那么通过调用submit()方法而不是execute()方法来启动任务,就可以持有该任务的上下文。submit()将返回一个泛型Future<?>,其中有一个未修饰的参数,因为你永远都不会在其上调用get()——持有这种Future的关键在于你可以在其上调用cancel(),并因此可以使用它来中断某个特定任务。如果你将true传递给cancel(),那么它就会拥有在该线程上调用interrupt()以停止这个线程的能力。因此,cancel是一种中断由Executor启动的单个线程的方式。

    下面的示例使用Executor展示了基本的interrupt()用法:

import java.io.IOException;
import java.io.InputStream;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class SleepBlocked implements Runnable {
    @Override
    public void run() {
        try {
            TimeUnit.SECONDS.sleep(2);
        } catch (InterruptedException e) {
            System.out.println("InterruptedException");
        }
        System.out.println("Exiting SleepBlocked.run()");
    }
}

class IOBlocked implements Runnable {
    private InputStream in;
    public IOBlocked(InputStream is) {
        in = is;
    }
    @Override
    public void run() {
        try {
            System.out.println("Waiting for read():");
            in.read();
        } catch (IOException e) {
            if (Thread.currentThread().isInterrupted()) {
                System.out.println("Interrupted from Blocked I/O");
            } else {
                throw new RuntimeException(e);
            }
        }
        System.out.println("Exiting IOBlocked.run()");
    }
}

class SynchronizedBlocked implements Runnable {
    public synchronized void f() {
        while(true) {
            //永不释放获得的锁
            Thread.yield();
        }
    }
    public SynchronizedBlocked() {
        //在构造的时候就获取该对象的锁
        new Thread(){
            @Override
            public void run() {
                f();
            }
        };
    }
    
    @Override
    public void run() {
        System.out.println("Trying to call f()");
        f();
        System.out.println("Exiting SynchronizedBlocked.run()");
    }
}

public class Interrupting {
    private static ExecutorService exec = Executors.newCachedThreadPool();
    static void test(Runnable r) throws InterruptedException {
        Future<?> future = exec.submit(r);
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Interrupting " + r.getClass().getName());
        future.cancel(true);//如果在运行的话,中断该线程。
        System.out.println("Interrupting sent to " + r.getClass().getName());
    }
    
    public static void main(String[] args) throws Exception {
        test(new SleepBlocked());
        test(new IOBlocked(System.in));
        test(new SynchronizedBlocked());
        TimeUnit.SECONDS.sleep(3);
        System.out.println("Aborting with System.exit(0);");
        //强行停止退出
        System.exit(0);
    }
}

执行结果:

Interrupting SleepBlocked
Interrupting sent to SleepBlocked
InterruptedException
Exiting SleepBlocked.run()
Waiting for read():
Interrupting IOBlocked
Interrupting sent to IOBlocked
Trying to call f()
Interrupting SynchronizedBlocked
Interrupting sent to SynchronizedBlocked
Aborting with System.exit(0);

    上面的每个任务都表示了一种不同类型的阻塞。SleepBlock是可中断的阻塞示例,而IOBlocked和SynchronizedBlocked是不可中断的阻塞示例。这个程序证明I/O和在synchronized块上的等待是不可中断的,但是通过浏览代码,你也可以预见到这一点——无论是I/O还是尝试调用synchronized方法,都不需要任何InterruptedException处理器。

    两个类很简单直观:在第一个类中run()方法调用了sleep(),在第二个类中调用了read()。但是为了演示SynchronizedBlock,我们必须首先获得锁。这是通过在构造器中创建匿名的Thread类的实例来实现的,这个匿名Thread类的对象通过调用f()获得了对象锁(这个线程必须有别于为启动SynchronizedBlock.run()的线程,因为同一个线程可以多次获得某个对象锁,你将在稍后看见)。由于f()永远都不反回,因此这个锁永远不会释放,而SynchronizedBlock.run()在试图调用f(),并阻塞以等待这个锁被释放。

    从输出中可以看到,你能够中断对sleep()的调用(或者任何要求抛出InterruptedException的调用)。但是你不能中断正在试图获取synchronized锁或者正在试图执行I/O操作的线程。这有点令人烦恼,特别是在创建执行I/O任务时,因为这意味着I/O具有锁住你的多线程程序的潜在可能。特别是对于急于Web的程序,这更是关乎厉害。

    对于这类问题,有一个略显笨拙但是确实行之有效的解决方案,那就是关闭任务在其上发生阻塞的资源

import java.io.InputStream;
import java.net.ServerSocket;
import java.net.Socket;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CloseResource {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InputStream stream = new Socket("localhost", 8080).getInputStream();
        service.execute(new IOBlocked(stream));
        TimeUnit.MILLISECONDS.sleep(100);
        System.out.println("Shutting down all threads");
        service.shutdownNow();//尝试停止所有正在执行的任务
        TimeUnit.SECONDS.sleep(1);
        System.out.println("Closing " + stream.getClass().getName());
        stream.close();//通过关闭线程操作的资源来释放阻塞的线程
    }
}

执行结果:

Waiting for read():
Shutting down all threads
Closing java.net.SocketInputStream
Interrupted from Blocked I/O
Exiting IOBlocked.run()

    在shutdownNow()被调用之后以及在输入流上调用close()之前的延迟强调的是一旦底层资源被关闭,任务将解除阻塞。

    幸运的是,各种NIO类提供了更人性化的I/O中断。被阻塞的nio通道会自动地响应中断

import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
import java.nio.ByteBuffer;
import java.nio.channels.AsynchronousCloseException;
import java.nio.channels.ClosedByInterruptException;
import java.nio.channels.SocketChannel;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;

class NIOBlocked implements Runnable {
    private final SocketChannel channel;
    
    public NIOBlocked(SocketChannel channel) {
        this.channel = channel;
    }
    
    @Override
    public void run() {
        try {
            System.out.println("Waiting for read() in " + this);
            channel.read(ByteBuffer.allocate(1));//阻塞当前任务
        } catch (ClosedByInterruptException e) {
            System.out.println("ClosedByInterruptException");
        } catch (AsynchronousCloseException e) {
            System.out.println("AsynchronousCloseException");
        } catch (IOException e) {
            throw new RuntimeException(e);
        }
        System.out.println("Exiting NIOBlocked.run() " + this);
    }
}

public class NIOInterruption {
    public static void main(String[] args) throws Exception {
        ExecutorService service = Executors.newCachedThreadPool();
        ServerSocket server = new ServerSocket(8080);
        InetSocketAddress isa = new InetSocketAddress("localhost", 8080);
        SocketChannel sc1 = SocketChannel.open(isa);
        SocketChannel sc2 = SocketChannel.open(isa);
        Future<?> f = service.submit(new NIOBlocked(sc1));
        service.execute(new NIOBlocked(sc2));
        //尝试关闭任务,但由于任务处于阻塞状态,关闭不了。
        service.shutdown();
        TimeUnit.SECONDS.sleep(1);
        // 通过在channel1上调用cancel来产生中断
        f.cancel(true);
        TimeUnit.SECONDS.sleep(1);
        // 释放channel2
        sc2.close();
    }
}

执行结果:

Waiting for read() in NIOBlocked@18b8914
Waiting for read() in NIOBlocked@1d49247
ClosedByInterruptException
Exiting NIOBlocked.run() NIOBlocked@18b8914
AsynchronousCloseException
Exiting NIOBlocked.run() NIOBlocked@1d49247

    如你所见,你还可以关闭底层资源以释放锁,尽管这种做法一般不是必须的。注意,使用execute()来启动两个任务,并调用service.shutdownNow()将可以很容易的终止所有事物,而对于捕获上面示例中的Future,只有在将中断发送给一个线程,同时不发送给另一个线程时才是必须的。

© 著作权归作者所有

摆渡者
粉丝 345
博文 171
码字总数 206504
作品 0
成都
程序员
私信 提问
c#线程-线程同步

线程同步 上一篇介绍了如何开启线程,线程间相互传递参数,及线程中本地变量和全局共享变量区别。 本篇主要说明线程同步。 如果有多个线程同时访问共享数据的时候,就必须要用线程同步,防止...

zsdnr
2017/07/21
0
0
c#线程-线程同步

线程同步 上一篇介绍了如何开启线程,线程间相互传递参数,及线程中本地变量和全局共享变量区别。 本篇主要说明线程同步。 如果有多个线程同时访问共享数据的时候,就必须要用线程同步,防止...

JoeSnail
2017/07/12
0
0
Thread线程终止interrupt

interrupt()的字面意思是中断一个线程,那么它是怎么使用来达到中断当前线程的呢?我们来看几个例子。 一、终止处于“阻塞状态”的线程 通过中断方式终止处于阻塞状态的线程,当线程由于被调...

激情的狼王丶21
2017/12/19
0
0
详解 ManualResetEvent(转)

原文:http://www.cnblogs.com/li-peng/p/3291306.html 今天详细说一下ManualResetEvent 它可以通知一个或多个正在等待的线程已发生事件,允许线程通过发信号互相通信,来控制线程是否可心访...

老朱教授
2017/10/01
0
0
关于ManualResetEvent的实例分析

最近用WPF开发时使用多个定时器处理时需要实例化N多个DispatcherTimer,而且全部暴露在程序外部,显得很冗杂,例如下面的例子:用到的两个定时器就要实例化两个DispatcherTimer,不知道各位看...

煮酒东篱下
06/04
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Spring使用ThreadPoolTaskExecutor自定义线程池及实现异步调用

多线程一直是工作或面试过程中的高频知识点,今天给大家分享一下使用 ThreadPoolTaskExecutor 来自定义线程池和实现异步调用多线程。 一、ThreadPoolTaskExecutor 本文采用 Executors 的工厂...

CREATE_17
今天
5
0
CSS盒子模型

CSS盒子模型 组成: content --> padding --> border --> margin 像现实生活中的快递: 物品 --> 填充物 --> 包装盒 --> 盒子与盒子之间的间距 content :width、height组成的 内容区域 padd......

studywin
今天
7
0
修复Win10下开始菜单、设置等系统软件无法打开的问题

因为各种各样的原因导致系统文件丢失、损坏、被修改,而造成win10的开始菜单、设置等系统软件无法打开的情况,可以尝试如下方法解决 此方法只在部分情况下有效,但值得一试 用Windows键+R打开...

locbytes
昨天
8
0
jquery 添加和删除节点

本文转载于:专业的前端网站➺jquery 添加和删除节点 // 增加一个三和一节点function addPanel() { // var newPanel = $('.my-panel').clone(true) var newPanel = $(".triple-panel-con......

前端老手
昨天
8
0
一、Django基础

一、web框架分类和wsgiref模块使用介绍 web框架的本质 socket服务端 与 浏览器的通信 socket服务端功能划分: 负责与浏览器收发消息(socket通信) --> wsgiref/uWsgi/gunicorn... 根据用户访问...

ZeroBit
昨天
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部