java并发编程(六): 取消与关闭

原创
2014/03/30 17:12
阅读数 2.7K

取消与关闭:

  • 如何正确安全地取消或关闭任务。

任务取消:

  • 若外部代码能在某个操作正常完成之前将其置入“完成”状态,则还操作是可取消的
  • 取消操作的原因:

       1. 用户请求取消。

       2. 有时间限制的操作,如超时设定

       3. 应用程序事件。

       4. 错误。

       5. 关闭。

如下面这种取消操作实现:

/**
 * 一个可取消的素数生成器
 * 使用volatile类型的域保存取消状态
 * 通过循环来检测任务是否取消
 */
@ThreadSafe
public class PrimeGenerator implements Runnable {
	private final List<BigInteger> primes = new ArrayList<>();
	private volatile boolean canceled;
	
	@Override
	public void run() {
		BigInteger p = BigInteger.ONE;
		while (!canceled){
			p = p.nextProbablePrime();
			synchronized (this) { //同步添加素数
				primes.add(p);
			}
		}
	}
	
	/**
	 * 取消生成素数
	 */
	public void cancel(){
		canceled = true;
	}
	
	/**
	 * 同步获取素数
	 * @return 已经生成的素数
	 */
	public synchronized List<BigInteger> get(){
		return new ArrayList<>(primes);
	}
}

其测试用例:

public class PrimeGeneratorTest {
	public static void main(String[] args) {
		PrimeGenerator pg = new PrimeGenerator();
		new Thread(pg).start();
		
		try {
			Thread.sleep(1000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally{
			pg.cancel(); //始终取消
		}
		
		System.out.println("all primes: " + pg.get());
	}
}

中断:

  • 调用interrupt并不意味者立即停止目标线程正在进行的工作,而只是传递了请求中断的消息。会在下一个取消点中断自己,如wait, sleep,join等。
  • 通常,中断是实现取消的合理方式

下面通过中断实现取消功能:

/**
 * 通过中断来实现取消
 * 不采用boolean变量,
 * 防止在queue.put()时由于阻塞,不能检查到boolean变量而无法取消
 * 但使用interrupt就可以,
 * 即使queue.put()阻塞, 也会检查到interrupt信号,从而抛出IntteruptedException
 * 从而达到取消的目的
 */
public class PrimeProducer extends Thread {
	private final BlockingQueue<BigInteger> queue;
	
	public PrimeProducer(BlockingQueue<BigInteger> queue){
		this.queue = queue;
	}

	@Override
	public void run() {
		try {
			BigInteger p = BigInteger.ONE;
			while (!Thread.currentThread().isInterrupted()){
				queue.put(p = p.nextProbablePrime());
			}
		} catch (InterruptedException e) {
			// thread exit
		}
	}
	/**
	 * 取消
	 */
	public void cancel(){
		interrupt(); //中断当前线程
	}
}

中断策略:

  • 由于每个线程拥有各自的中断策略,因此除非你知道中断对该线程的含义,否则就不应该中断这个线程。

响应中断:

  • 处理InterruptedException的实用策略:

      1. 传递异常。

      2. 恢复中断状态,从而事调用栈的上层代码能够对其进行处理。

  • 只有实现了线程中断策略的代码才可以屏蔽中断请求,在常规的任务和库代码中都不应该屏蔽中断请求。

通过Future实现取消:

public void timedRun(Runnable r, long timeout, TimeUnit unit)
			throws InterruptedException {
	Future<?> task = taskExec.submit(r);
	try {
		task.get(timeout, unit);
	} catch (ExecutionException e) {
		//任务执行中抛出异常
	} catch (TimeoutException e) {
		//任务超时处理
	} finally{
		//任务执行完毕,没有影响; 任务执行中会中断任务
		if (task != null) task.cancel(true);
	}
}
  • 当Future.get抛出InterruptedExceptionTimeoutException时 ,如果你知道不再需要结果,那么就可以调用Future.cancel来取消任务。

处理不可中断的阻塞:

  • 造成线程阻塞的原因:

       1. java.io包中的同步Socket I/O。如套接字中进行读写操作read, write方法。

       2. java.io包中的同步I/O如当中断或关闭正在InterruptibleChannel上等待的线程时,会对应抛出ClosedByInterruptException或                         AsynchronousCloseException

       3. Selector的异步I/O。如果一个线程在调用Selector.select时阻塞了,那么调用close, wakeup会使线程抛出ClosedSelectorException

       4. 获取某个锁。当一个线程等待某个锁而阻塞时,不会响应中断。但Lock类的lockInterruptibly允许在等待锁时响应中断。

/**
 * 通过改写interrupt方法将非标准的取消操作封装在Thread中
 */
public class ReaderThread extends Thread {
	private final Socket socket;
	private final InputStream in;
	private int bufferSize;
	
	public ReaderThread(Socket socket, InputStream in) {
		this(socket, in, 1024);
	}
	
	public ReaderThread(Socket socket, InputStream in, int bufferSize) {
		this.socket = socket;
		this.in = in;
		this.bufferSize = bufferSize;
	}

	@Override
	public void interrupt() {
		try {
			socket.close(); //中断前关闭socket
		} catch (IOException e) {
			
		} finally{
			super.interrupt();
		}
	}

	@Override
	public void run() {
		try {
			byte[] buf = new byte[bufferSize];
			while (true) {
				int count = in.read(buf);
				if (count < 0) {
					break;
				} else if (count > 0) {
					processBuffer(buf, count);
				}
			}
		} catch (IOException e) {
			// 线程中断处理
		}
	}
       ...
}

采用newTaskFor来封装非标准的取消:

/**
 * 可取消的任务接口
 */
public interface CancellableTask<T> extends Callable<T> {
	void cancel();
	RunnableFuture<T> newTask();
}

/**
 * 使用了Socket的任务
 * 在取消时需要关闭Socket
 */
public abstract class SocketUsingTask<T> implements CancellableTask<T> {
	private Socket socket;

	public void setSocket(Socket socket) {
		this.socket = socket;
	}

	@Override
	public T call() throws Exception {
		//do working
	       ...
	}

	@Override
	public synchronized void cancel() {
		try {
			if (socket != null){
				socket.close();
			}
		} catch (IOException ignored) {
		}
	}

	@Override
	public RunnableFuture<T> newTask() {
		return new FutureTask<T>(this){
			@Override
			public boolean cancel(boolean mayInterruptIfRunning) {
				try {
					SocketUsingTask.this.cancel();
				} catch (Exception ignored) {
				}
				return super.cancel(mayInterruptIfRunning);
			}
		};
	}
}

/**
 * 通过newTaskFor将非标准的取消操作封装在任务中
 */
public class CancellingExecutor extends ThreadPoolExecutor {

	public CancellingExecutor(int corePoolSize, int maximumPoolSize,
			long keepAliveTime, TimeUnit unit, BlockingQueue<Runnable> workQueue) {
		super(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue);
	}

	@Override
	protected <T> RunnableFuture<T> newTaskFor(Callable<T> callable) {
		if (callable instanceof CancellableTask){ //若是我们定制的可取消任务
			return ((CancellableTask<T>)callable).newTask();
		}
		return super.newTaskFor(callable);
	}
}

停止基于线程的服务:

  • 正确的封装原则:除非拥有某个线程,否则不能对该线程进行操控。如中断线程修改线程优先级等。
  • 对于持有线程的服务,只要服务的存在时间大于创建线程的方法的存在时间,那么就应该提供生命周期的方法。如ExecutorService提供的shutdown(), shutdownNow()
/**
 * 不支持关闭的生产者-消费者日志服务
 */
public class LogWriter {
	private final BlockingQueue<String> queue;
	private final LoggerThread logger;
	
	public LogWriter(Writer writer){
		this.queue = new LinkedBlockingDeque<String>();
		this.logger = new LoggerThread(writer);
	}
	
	public void start(){
		logger.start();
	}
	
	public void log(String msg) throws InterruptedException{
		queue.put(msg);
	}

	private class LoggerThread extends Thread{
		private final Writer writer;
		
		public LoggerThread(Writer writer) {
			this.writer = writer;
		}

		@Override
		public void run() {
			try {
				while(true){
					writer.write(queue.take());
				}
			} catch (IOException e) {
				// io exception handle
			} catch (InterruptedException e) {
				// interrupt exceptino handle
			} finally{
				try {
					writer.close();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}
	}
}
  • 向LogWriter添加可靠的取消操作
/**
 * 为LoggerWriter添加可靠的取消操作
 */
public class LogService {
	private final BlockingQueue<String> queue;
	private final LoggerThread logger;
	private final PrintWriter writer;
	private boolean isShutdown; //用于终止生产者
	private int reservations; //队列中的消息数
	
	public LogService(PrintWriter writer){
		this.queue = new LinkedBlockingDeque<String>();
		this.logger = new LoggerThread();
		this.writer = writer;
	}
	
	/**
	 * 产生日志
	 * @param msg 日志内容
	 * @throws InterruptedException
	 */
	public void log(String msg) throws InterruptedException{
		synchronized (this) {
			if (isShutdown){
				throw new IllegalStateException("can't log, service has stopped.");
			}
			++reservations;
		}
		queue.put(msg);
	}
	
	/**
	 * 启动日志香妃
	 */
	public void start(){
		logger.start();
	}
	/**
	 * 停止日志服务
	 */
	public void stop(){
		synchronized(this){
			isShutdown = true;
		}
		logger.interrupt(); //中断日志线程
	}
	
	/**
	 * 消费日志线程
	 */
	private class LoggerThread extends Thread{
		@Override
		public void run() {
			try {
				while(true){
					try {
						synchronized (LogService.this) {
							if (isShutdown)
								break;
						}
						String msg = queue.take();
						synchronized (LogService.this) {
							--reservations;
						}
						writer.println(msg);
					} catch (InterruptedException e) {
						// retry
					}
				}
			}finally{
				writer.close();
			}
		}
	}
}

关闭ExecutorService:

  • ExecutorService提供两种关闭服务的方法:

      1. shutdown: 安全关闭。不再接受新任务提交,待所有队列中的任务执行完成再关闭。

      2. shutdownNow: 强行关闭。不再接受新任务提交,停止正在执行的任务,并返回未开始执行的任务列表。

/**
 * 封装ExecutorService实现日志服务
 */
public class LogService2 {
	private final ExecutorService exec = Executors.newSingleThreadExecutor();
	private final PrintWriter writer;
	
	public LogService2(PrintWriter writer){
		this.writer = writer;
	}
	
	/**
	 * 产生日志
	 * @param msg 日志内容
	 * @throws InterruptedException
	 */
	public void log(String msg) throws InterruptedException{
		exec.execute(new WriteTask(msg));
	}
	
	/**
	 * 停止日志服务
	 * @throws InterruptedException 
	 */
	public void stop(long timeout, TimeUnit unit) throws InterruptedException{
		try {
			exec.shutdown(); //平缓关闭服务
			//关闭服务后, 阻塞到所有任务被执行完毕或者超时发生,或当前线程被中断
			exec.awaitTermination(timeout, unit); 
		} finally{
			writer.close();
		}
	}
	...
}

"毒丸"对象:

  • 毒丸:指一个放在队列上的对象,当得到这个对象时,就立即停止

通过毒丸对象来关闭服务:

/**
 * 索引服务
 * 通过一个毒丸对象来关闭服务
 */
public class IndexingService {
	private static final File POISON = new File(""); //毒丸对象
	private final IndexerThread consumer = new IndexerThread(); //消费者
	private final CrawlerThread producer = new CrawlerThread(); //生产者
	private final BlockingQueue<File> queue = new LinkedBlockingDeque<File>();
	private final File root;
	
	public IndexingService(File root) {
		this.root = root;
	}

	/**
	 * 启动索引服务
	 */
	public void start(){
		producer.start();
		consumer.start();
	}
	
	public void stop(){
		producer.interrupt(); //中断爬虫线程
	}
	
	public void awaitTermination() throws InterruptedException{
		consumer.join(); //等待消费者线程结束
	}
	
	/**
	 * 爬虫线程
	 */
	private class CrawlerThread extends Thread{
		@Override
		public void run() {
			try {
				crawl(root);
			} catch (InterruptedException e) {
				// handle the exception
			} 
			try {
				while(true){
					queue.put(POISON);
					break;
				}
			} catch (InterruptedException e) {
				// retry
			}
		}

		private void crawl(File root) throws InterruptedException{
			// crawl from web
		}
	}
	
	/**
	 * 建立索引的线程
	 */
	private class IndexerThread extends Thread{
		@Override
		public void run() {
			try {
				while (true){
					File file = queue.take();
					if (file == POISON){ //若是毒丸对象
						break;
					} else{
						indexFile(file); //建立索引文件
					}
				}
			} catch (InterruptedException e) {
				// handle exception
			}
		}

		private void indexFile(File file) {
			
		}
	}
}

shutdownNow的局限性:

  • 在关闭服务过程中,我们无法通过常规方法来得知哪些任务已经开始但未结束。
/**
 * 在ExecutorService中跟踪在关闭之后被取消的任务
 */
public class TrackingExecutor extends AbstractExecutorService {
	private final ExecutorService exec;
	private final Set<Runnable> tasksCancelledAtShutdown = 
			Collections.synchronizedSet(new HashSet<Runnable>());
	
	public TrackingExecutor(ExecutorService exec) {
		this.exec = exec;
	}

	/**
	 * 获取关闭后取消的任务
	 */
	public List<Runnable> getCancelledTasks(){
		if (!exec.isTerminated()){
			throw new IllegalStateException("service doesn't stop");
		}
		return new ArrayList<>(tasksCancelledAtShutdown);
	}
	
	@Override
	public void execute(final Runnable command) {
		exec.execute(new Runnable() {
			@Override
			public void run() {
				try {
					command.run();
				} finally{ //有可能出现误报: 任务执行完毕了, 线程池
					if (isShutdown() && //若Executor已经关闭了
							Thread.currentThread().isInterrupted()){ //且当前线程被中断了
						tasksCancelledAtShutdown.add(command);
					}
				}
			}
		});
	}
}

处理非正常的线程终止:

  • 当一个线程由于未捕获异常而退出时, jvm会把这个事件报告给应用程序提供的UncaughtExceptionHandler异常处理器。若没有提供任何异常处理器,则默认行为是将栈追踪信息输出到System.err
/**
 * 将异常写入日志的UncaughtExceptionHandler
 */
public class UEHLogger implements UncaughtExceptionHandler {

	@Override
	public void uncaughtException(Thread t, Throwable e) {
		Logger logger = Logger.getAnonymousLogger();
		logger.log(Level.SEVERE, "the thread with exceptoin: "+t.getName(), e);
	}
}
  • 在运行时间较长的应用程序中,通常会为所有线程的未捕获异常制定同一个异常处理器,并且该处理器至少会将异常信息记录到日志中。

JVM关闭:

关闭钩子:

  • 关闭钩子:通过Runtime.addShutdownHook注册的但尚未开始的线程。
  • jvm不保证关闭钩子的调用顺序
  • 强制关闭jvm时,不会运行关闭钩子。
  • 最后对所有服务使用同一个关闭钩子,防止多个钩子之间的出现共享资源竞争

守护线程:

  • 守护线程(Daemon Thread):执行一些辅助工作,不会阻碍JVM的关闭
  • 线程分为普通线程守护线程。jvm启动时创建的所有线程中,除了主线程,其他线程都是守护线程(例如垃圾回收器以及其他执行辅助工作的线程)。
  • 新创建的线程,默认会继承创建它的线程的守护状态,所以默认时,主线程创建的所有线程都是普通线程
  • 普通线程与守护线程之间的差异:当一个线程退出时,jvm会检查其他正在运行的线程,如果这些线程是守护线程,那么jvm会正常退出操作。当jvm停止时,所有仍然存在的守护线程都会被抛弃-不执行finally块不执行回卷栈,而直接退出。所有尽可能少用守护线程,特别是包含一些I/O操作的任务。
  • 守护线程通常不能用来替代应用程序管理程序中各个服务的生命周期

终结器:

  • 避免使用终结器

不吝指正。

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
0 评论
9 收藏
0
分享
返回顶部
顶部