文档章节

JUC系列四:任务的取消与关闭

那位先生_
 那位先生_
发布于 2015/08/09 20:27
字数 2556
阅读 114
收藏 6

在大多数情况下,我们创建一个任务,都会让它运行直到结束。但有时,又需要在某种情况下取消任务,比如用户请求取消,有时间限制的任务,任务运行时出现错误等等。在Java中,没有一种安全的抢占式方式来停止线程(什么意思?),因此也没有安全的抢占式方法来停止任务。

###标识 在前面的例子中,我们曾使用volatile来修饰一个变量作为方法退出的一种标识,而在任务中,我们同样可以使用它来使得任务在需要的情况下退出。在下面的例子中,PrimeGenerator每次在生成素数之前都会检查canceled标识,如果为true,则立即退出任务。

public class PrimeGenerator implements Runnable{
	private final List<BigInteger> primes=new ArrayList<>();
	private volatile boolean cancelled;
	public void run(){
		BigInteger p=BigInteger.ONE;
		while(!cancelled){
			p=p.nextProbablePrime();
			synchronized(primes){
				primes.add(p);
			}
		}
	}
	public void cancel(){
		cancelled=true;
	}
	public synchronized List<BigInteger> get(){
		return new ArrayList<BigInteger>(primes);
	}
	public static void main(String []args) throws Exception{
		PrimeGenerator generator=new PrimeGenerator();
		new Thread(generator).start();
		try{
			Thread.sleep(100);
		}finally{
			generator.cancel();
		}
		List<BigInteger> results=generator.get();
		for(BigInteger i:results){
			System.out.println(i);
		}
	}
}

但这种方式只能满足一部分需求,如果在任务执行代码中存在线程阻塞的方法(sleep(),wait()...),那么就可能存在一个很严重的问题,任务不能如期望的那样及时退出,甚至可能永远不会退出。

public class BrokenPrimeProducer extends Thread{
	private final BlockingQueue<BigInteger> queue;
	private volatile boolean cancelled=false;
	public BrokenPrimeProducer(BlockingQueue<BigInteger> queue){
		this.queue=queue;
	}
	public void run(){
		try{
			BigInteger p=BigInteger.ONE;
			while(!cancelled){
				queue.put(p=p.nextProbablePrime());
			}
		}catch(InterruptedException e){}
	}
	public void cancel(){
		cancelled=true;
	}
	public static void main(String []args) throws Exception{
		BlockingQueue<BigInteger> primes=new LinkedBlockingQueue<BigInteger>(10);
		BrokenPrimeProducer producer=new BrokenPrimeProducer(primes);
		producer.start();
		int count=0;
		try{
			while(count<10){
				count++;
				Thread.sleep(1000);
				System.out.println(primes.take());
			}
		}finally{
			producer.cancel();
		}
		System.out.println("over..");
	}
}

在上面的例子中,BrokenPrimeProducer用于生产素数,并将结果保存在阻塞队列中,而main方法则在不断的从队列中读取素数。但是程序在运行到producer.cancel()之后,生产者线程并没有如期的停止下来。这是因为,当队列已满时,queue.put()将会阻塞,而此时count>=10,不再执行primes.take(),那么在调用producer.cancel()时,由于producer一直阻塞在queue.put方法处,使得线程不能检查到cancelled标识,导致线程永远不会结束。

###线程中断

对于需要取消 存在阻塞操作的任务,则不能使用检查标识的方式,而是通过线程中断机制。每个线程都有一个boolean类型的中断状态,当中断线程时,该状态会被设置为true。在Thread类中,定义interrupt方法来中断线程目标,而isInterrupted方法能返回中断状态。静态的interrupted方法将清除当前线程的中断状态并返回它之前的值。

需要注意的是:

  • 线程中断是一种协作机制,线程可以通过这种机制来通知另一个线程,告诉它在合适的或者可能的情况下停止当前的工作。

  • 线程中断并不代表线程终止,线程的中断只是改变了线程的中断状态,这个中断状态改变后带来的结果取决于这个程序本身

  • 调用interrupt方法并不是立即停止线程,而是发出了一个中断请求,然后有线程本身在某个合适的时间点中断自己。对于wait(),sleep()等阻塞方法来说,将严格处理这种请求,当他们收到中断请求或者开始执行发现中断状态被设置了,将抛出一个异常并将中断状态复位。

  • 通常,中断是实现取消的最合理的方式

通过中断实现取消操作

public class PrimeProducer extends Thread{
	private final BlockingQueue<BigInteger> queue;
	public PrimeProducer(BlockingQueue<BigInteger> queue){
		this.queue=queue;
	}
	public void run(){
		try{
			BigInteger p=BigInteger.ONE;
			while(!Thread.currentThread().isInterrupted()){
				queue.put(p=p.nextProbablePrime());
			}
		}catch(InterruptedException e){
			
		}
	}
	public void cancel(){
		interrupt();
	}
}

###响应中断 当调用可阻塞的方法时,例如Thread.sleep()或BlockingQueue.put等,有两种实用策略可用于处理InterruptedException:

  • 传递异常(抛出异常),从而使你的方法也成为可中断的阻塞方法

  • 恢复中断,从而使调用栈中的上层代码能够对其进行处理

####传递异常 传递InterruptedException的方法包括根本就不捕获该异常,直接向上抛出,与将InterruptedException添加到throws子句中一样(如程序清单getNextTask所示)。或者捕获异常,在执行完某些操作后(清理),再抛出该异常。

BlockingQueue<Task> queue;
...
public Task getNextTask() throws InterruptedException{
	return queue.take();
}

####恢复中断状态

有时候不能抛出InterruptedException,例如在Runnable的run方法中,则必须捕获该异常,并通过调用当前线程的interrupt方法恢复中断状态。这样,在调用栈中更高层的代码将看到引发了一个中断。

public void run() {   
    while (!Thread.currentThread().isInterrupted()) {   
        try {   
            ...   
            sleep(delay);   
        } catch (InterruptedException e) {
	    //抛出InterruptedException异常后,中断标示位会自动清除
            Thread.currentThread().interrupt();//重新设置中断标示   
        }   
    }   
} 

另外一种情况是

public void mySubTask() {   
    ...   
    try {   
        sleep(delay);   
    } catch (InterruptedException e) {   
	//抛出InterruptedException异常后,中断标示位会自动清除
        Thread.currentThread().interrupted();   
    }   
}
public void test(){
	while(!Thread.currentThread().isInterrupted()){
		mySubTask();
	}
}

抛出InterruptedException异常后,中断标示位会自动清除,需要恢复中断状态,这样,在test方法中才能看到mySubTask引发了一个中断。否则,test将继续执行while。

###Future

在concurrent包中,ExecutorService.submit将返回一个Future来描述任务。Future拥有一个cancel方法,该方法带有一个boolean类型的参数mayInterruptIfRunning。如果为true并且这个线程正在运行,则线程能被中断。如果为false并且任务没有运行,则该任务不会被启动。

public static void timeOut(Runnable r,long timeout,TimeUnit unit) throws InterruptedException{
	Future<?> task=executorService.submit(r);
	try{
		task.get(timeout,unit);
	}catch(TimeoutException e){
		
	}catch(ExecutionException e){
		
	}finally{
		//如果任务已经结束,那么执行取消操作也不会有任何影响
		task.cancel(true);//如果任务正在运行,则将被中断
	}
}

###不可中断的阻塞

在前面的例子中,只是针对某些可阻塞的方法做中断请求,在Java库中,并非所有的可阻塞方法或者阻塞机制都能响应中断;如果一个线程由于执行同步的Socket I/O或者等待获得内置锁而阻塞,那么中断请求只能设置线程的中断状态,除此之外没有任何其它用处。那些由于执行不可中断操作操作而被阻塞的线程,可以使用类似于中断的手段来停止这些线程,但这要求我们必须知道线程阻塞的原因;

Socket I/O:在服务器应用程序中,常见的阻塞IO形式就是对套接字进行读取和写入。虽然InputStream和OutputStream中的read和write等方法都不会响应中断,但通过关闭底层套接字,可以使得由于执行read或write等方法而被阻塞的线程抛出一个SocketException

同步 I/O:当中断一个正在InterruptibleChannel上等待的线程时,将抛出ClosedByInterruptException并关闭链路。当关闭一个InterruptibleChannel时,将导致所有在链路操作上阻塞的线程都抛出AsynchronousCloseException。大多数标准的Channel都实现了InterruptibleChannel

Selector异步I/O:如果一个线程在调用Selector.select方法时阻塞,那么调用close或者wakeup方法会使线程抛出CloseSelectorException并提前返回

获取某个锁:如果一个线程由于等待某个内置锁而阻塞,那么将无法响应中断。但可以使用Lock类做替代

下面给出一个中断套接字阻塞的例子

import java.io.IOException;
import java.net.ServerSocket;

public class ThreadTest extends Thread {
    volatile ServerSocket socket;

    public static void main(String args[]) throws Exception {
        ThreadTest1 thread = new ThreadTest1();
        System.out.println("Starting thread...");
        thread.start();
        Thread.sleep(3000);
        System.out.println("Asking thread to stop...");
        thread.socket.close();// 再调用close方法,此句去掉将则不会
        System.out.println("Stopping application...");
    }

    public void run() {
        try {
            socket = new ServerSocket(3036);
        } catch (IOException e) {
            System.out.println("Could not create the socket...");
            return;
        }
        while (!Thread.currentThread().isInterrupted()) {
            System.out.println("Waiting for connection...");
            try {
                socket.accept();
            } catch (IOException e) {
                System.out.println("accept() failed or interrupted...");
                Thread.currentThread().interrupt();// 重新设置中断标示位
            }
        }
        //判断线程是否被阻塞,如果被阻塞则无法打印此句
        System.out.println("Thread exiting under request...");
    }
}

###日志服务 在代码清单LogWriter中给出了一个简单的日志服务示例,其中日志操作在单独的日志线程中执行。产生日志消息的线程并不会将消息直接输出,而是将其保存在一个阻塞队列中。这是一种多生产者单消费者的设计方式。

public class LogWriter{
	private final BlockingQueue<String> queue;
	private final LoggerThread logger;
	public LogWriter(Writer writer){
		this.queue=new LinkedBlockingQueue<String>(10);
		this.logger=new LoggerThread(writer);
	}
	public void start(){
		logger.start();
	}
	public void put(String msg) throws InterruptedException{
		queue.put(msg);
	}
	private class LoggerThread extends Thread{
		private final Writer writer;
		public void run(){
			try{
				while(true){
					writer.println(queue.take());
				}
			}catch(InterruptedException e){
				
			}finally{
				writer.close();
			}
		}
	}
}

当我们想要停止日志服务时,只需要在queue.take()方法捕获抛出的InterruptedException异常,退出日志服务即可。但这种退出方式是不完备的,首先,对于正在等待被写入到日志的信息将会丢失,其次,由于队列已满时put操作会阻塞,所以等待put的线程也会被阻塞。这种状态下,生产者和消费者需要同时被取消。由于生产者不是专门的线程,因此要取消他们将非常困难。

另一种关闭LogWriter的方式是设置一个“已请求关闭”的标识,以避免进一步提交日志。

public class LogService{
	private final BlockingQueue<String> queue;
	private fina LoggerThread loggerThread;
	private final PrintWriter writer;
	private boolean isShutDown;
	private int reservations;

	public void start(){
		loggerThread.start();
	}
	public void stop(){
		synchronized(this){
			isShutDown=true;
		}
		loggerThread.interrupt();
	}
	public void log(String msg) throws InterruptedException{
		synchronized(this){
			if(isShutDown){
				throw new InterruptedException("");
			}
			++reservations;
		}
		//因为put操作本来就是同步的,所以不需要再加内置锁
		queue.put(msg);
	}
	private class LoggerThread extends Thread{
		public void run(){
			try{
				while(true){
					try{
						//如果处理完了阻塞队列中的日志则退出
						synchronized(LogService.this){
							if(isShutdown&&reservations==0){
								break;
							}
						}
						String msg=queue.take();
						synchronized(LogService.this){
							--reservations;
						}
						writer.println(msg);
					}catch(InterruptedException e){
						
					}
				}
			}finally{
				writer.close();
			}
		}
	}
}

更简单的使用ExecutorService来管理

public class LogService{
	private final ExecutorService executorService=...
	...
	public void start(){
		
	}
	public void stop() throws InterruptedException{
		try{
			executorService.shutdown();//不再接收新的任务
			executorService.awaitTermination(TIMEOUT,UNIT);//等待关闭时间
		}finally{
			writer.close();
		}
	}
	public void log(String msg){
		try{
			exectorService.submit(new Task(msg));
		}catch(RejectedExecutionException e){
			
		}
	}
}

参考

© 著作权归作者所有

那位先生_

那位先生_

粉丝 131
博文 109
码字总数 242433
作品 0
深圳
后端工程师
私信 提问
加载中

评论(0)

Java 多线程系列目录(共43篇)

Java多线程系列目录(共43篇) 最近,在研究Java多线程的内容目录,将其内容逐步整理并发布。 (一) 基础篇 01. Java多线程系列--“基础篇”01之 基本概念 02. Java多线程系列--“基础篇”02之 ...

foxeye
2016/02/29
346
0
java程序猿技术栈

一、java 基础知识 1.1 java基础集合类 1.2 jdk1.5、1.6、1.7、1.8 特效比较 1.3 java异常处理 1.4 jvm原理及常见问题 1.5 log4j等日志收集 1.6 jdbc驱动 1.7 jdk反射机制使用和原理 1.8 ja...

南寒之星
2016/11/30
17
0
java并发编程实战------阅读笔记第七章 取消与关闭

java没有提供安全的机制来安全的终止线程,但它提供了中断(Interruption),中断是一种协作机制,能够使一个线程终止另一个线程的当前工作。 一、任务取消任务取消的应用场景:用户请求取消;...

treenewtreenew
2016/11/14
13
0
【死磕Java并发】—–J.U.C之线程池:线程池的基础架构

原文出处http://cmsblogs.com/ 『chenssy』 经历了Java内存模型、JUC基础之AQS、CAS、Lock、并发工具类、并发容器、阻塞队列、atomic类后,我们开始JUC的最后一部分:线程池。在这个部分你将...

chenssy
2017/10/06
0
0
Java 并发编程-不懂原理多吃亏(送书福利)

作者 加多 关注阿里巴巴云原生公众号,后台回复关键字“并发”,即可参与送书抽奖!** 导读:并发编程与 Java 中其他知识点相比较而言学习门槛较高,从而导致很多人望而却步。但无论是职场面...

阿里巴巴云原生
2019/08/30
113
0

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周三乱弹 —— 提高不了工作效率和脸有关系

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @薛定谔的兄弟 :分享洛神有语创建的歌单「我喜欢的音乐」: 1 《夏令时记录(piano.ver)》- ゆめこ 手机党少年们想听歌,请使劲儿戳(这里) ...

小小编辑
今天
67
2
List的一波操作

public static void main(String[] args) { List<Entity> list = new ArrayList<>(); list.add(new Entity(1)); list.add(new Entity(2)); list.add(new Entity(3)); ......

那个猩猩很亮
今天
75
0
Spring基础

主要用于service层; 轻量级java开发框架; 各层 web层:struts,spring-MVC service层:spring dao层:hibernate,mybatis , jdbcTemplate --> spring-data Spring核心:控制反转IOC 切面编...

七宝1
今天
30
0
解决overflow+border-radius+transform圆角问题

网上还有其他版本,但是对我来说都不好使,下面是我在Chrome上的代码。overflow:hidden依然是不能正常使用,换成unset就可以,读者如果有更好的解决方案,请留言,谢谢。 <figure> <img...

hi懒喵
今天
53
0
《C语言》—— 数组

书籍使我变成了一个幸福的人,使我的生活变成轻松而舒适的诗。——高尔基 本文已经收录至我的GitHub,欢迎大家踊跃star 和 issues。 https://github.com/midou-tech/articles 点关注,不迷路!...

龙跃十二
今天
84
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部