文档章节

多线程编程之两阶段终止模式

爱宝贝丶
 爱宝贝丶
发布于 2018/05/27 23:51
字数 1941
阅读 718
收藏 24

       对于多线程编程,如何优雅的终止子线程,始终是一个值得考究的问题。如果直接终止线程,可能会产生三个问题:

  • 子线程当前执行的任务可能必须要原子的执行,即其要么成功执行,要么就不执行;
  • 当前任务队列中还有未执行完的任务,直接终止线程可能导致这些任务被丢弃;
  • 当前线程占用了某些外部资源,比如打开了某个文件,或者使用了某个Socket对象,这些都是无法被垃圾回收的对象,必须由调用方进行清理。

       由此可见,如何优雅的终止一个线程,并不是一个简单的问题。常见的终止线程的方式是,声明一个标志位,如果调用方要终止其创建的线程的执行,就将该标志位设置为需要终止状态,子线程每次执行任务之前会检查该标志位,如果为需要终止状态,就不继续执行任务,而是进行当前线程所占用资源的一些清理工作,如关闭Socket和备份当前未完成的任务,清理完成之后结束当前线程的调用。

       两阶段终止模式使用的就是上述方式进行多线程的终止的,只不过其将线程的终止封装为一个特定的框架,使用者只需要关注特定的任务执行方式即可,从而实现了线程的终止与任务的执行的关注点的分离。两阶段终止模式的UML图如下:

两阶段终止模式类图

       其各角色的作用如下:

  • ThreadOwner:客户端程序,由其创建线程并执行任务,Terminatable提供的终止方法也是由其调用;
  • Terminatable:终止方法提供的一个抽象接口,提供了一个terminate()方法供外部调用;
  • TerminatableSupport:实现了Terminatable接口的抽象类,封装了具体的终止模板,其doRun()是一个抽象方法,子类必须实现,用于编写相关的任务的代码,doTermiate()和doCleanup()方法都是钩子方法,提供了空的实现,子类根据具体情况判断是否需要实现该方法;
  • ConcreteTerminatable:用户具体的终止类,其doRun()方法用于实现具体的任务;
  • TerminationToken:包含了一个标志位,并且记录了当前线程还需要执行的任务数量,默认情况下,只有其标志位为true,并且剩余需要执行的任务数为0时才会真正的终止当前线程的执行。

       如下是两阶段终止模式各个类的实现,我们首先看看Terminatable接口及其抽象实现TerminatableSupport:

public interface Terminatable {
  void terminate();
}
public abstract class TerminatableSupport extends Thread implements Terminatable {
  public final TerminationToken terminationToken;  // 记录当前的标志位

  public TerminatableSupport() {
    this(new TerminationToken());  // 初始化当前标志位
  }

  public TerminatableSupport(TerminationToken terminationToken) {
    super();
    this.terminationToken = terminationToken;  // 初始化标志位
    terminationToken.register(this);  // 注册当前对象的标志位
  }

  protected abstract void doRun() throws Exception;  // 供子类实现具体任务的方法

  // 钩子方法,用于子类进行一些清理工作
  protected void doCleanup(Exception cause) {}

  // 钩子方法,用于子类进行终止时的一些定制化操作
  protected void doTerminate() {}

  @Override
  public void run() {
    Exception ex = null;
    try {
      // 在当前线程中执行任务时,会判断是否标识为终止,并且剩余任务数小于等于0,是才会真正终止当前线程
      while (!terminationToken.isToShutdown() || terminationToken.reservations.get() > 0) {
        doRun();
      }
    } catch (Exception e) {
      ex = e;
    } finally {
      try {
        doCleanup(ex);  // 当前线程终止后需要执行的操作
      } finally {
        terminationToken.notifyThreadTermination(this);
      }
    }
  }

  @Override
  public void interrupt() {
    terminate();
  }

  @Override
  public void terminate() {
    terminationToken.setToShutdown(true);  // 设置终止状态
    try {
      doTerminate();  // 执行客户端定制的终止操作
    } finally {
      if (terminationToken.reservations.get() <= 0) {
        super.interrupt();  // 如果当前线程处于终止状态,则强制终止当前线程
      }
    }
  }

  // 提供给客户端调用的,即客户端线程必须等待终止完成之后才会继续往下执行
  public void terminate(boolean waitUntilThreadTerminated) {
    terminate();
    if (waitUntilThreadTerminated) {
      try {
        this.join();
      } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
      }
    }
  }
}

       当客户端调用termiante()方法时,其首先会将当前的终止状态设置为true,然后调用doTerminate()方法,这里需要注意的一点是,如果当前线程在doRun()方法中处于等待状态,比如Thread.sleep()、Thread.wait()方法等,那么即使设置了终止状态,也无法使其被唤醒,因为其无法运行到检测终止状态的代码处,其只能使用intertupt()方法才能使其被唤醒并终止,但是对于Socket.read()方法,即使调用了interrupt()方法,也无法使其终止,因而这里设置了doTerminate()方法,用于子类在该方法中关闭Socket。最后在finally块中,调用super.interrupt()方法,该调用的作用也即如果当前线程在doRun()方法中被阻塞,就强制终止其执行。

public class TerminationToken {
  protected volatile boolean toShutdown = false;  // 终止状态的标志位
  public final AtomicInteger reservations = new AtomicInteger(0);  // 记录当前剩余任务数

  // 记录了所有注册了TerminationToken的实例,这里使用Queue是因为可能会有多个
  // Terminatable实例共享同一个TeraminationToken,如果是共享的,那么reservations
  // 实例就保存了所有共享当前TerminationToken实例的线程所需要执行的任务总数
  private final Queue<WeakReference<Terminatable>> coordinatedThreads;

  public TerminationToken() {
    coordinatedThreads = new ConcurrentLinkedQueue<>();
  }

  public boolean isToShutdown() {
    return toShutdown;
  }

  public void setToShutdown(boolean toShutdown) {
    this.toShutdown = toShutdown;
  }

  // 将当前Terminatable实例注册到当前TerminationToken中
  protected void register(Terminatable thread) {
    coordinatedThreads.add(new WeakReference<>(thread));
  }

  // 如果是多个Terminatable实例注册到当前TerminationToken中,
  // 则广播当前的终止状态,使得这些实例都会终止
  protected void notifyThreadTermination(Terminatable thread) {
    WeakReference<Terminatable> wrThread;
    Terminatable otherThread;
    while (null != (wrThread = coordinatedThreads.poll())) {
      otherThread = wrThread.get();
      if (null != otherThread && otherThread != thread) {
        otherThread.terminate();
      }
    }
  }
}

       关于Terminatable和TerminationToken的关系是一对多的关系,即多个Terminatable实例可共用一个TerminationToken实例,而其reservations属性所保存的则是这多个Terminatable实例所共同要完成的任务数量。这里典型的多个Terminatable共用一个TerminationToken实例的例子是当有多个工作者线程时,这几个线程所消费的任务是共用的,因而其TermiantionToken实例也需要共用。

       两阶段终止模式的使用场景非常的多,基本上只要是使用了子线程的位置都需要使用一定的方式来优雅的终止该线程的执行。我们这里使用生产者和消费者的例子来演示两阶段终止模式的使用,如下是该例子的代码:

public class SomeService {
  private final BlockingQueue<String> queue = new ArrayBlockingQueue<>(100);

  private final Producer producer = new Producer();
  private final Consumer consumer = new Consumer();

  public static void main(String[] args) throws InterruptedException {
    SomeService ss = new SomeService();
    ss.init();
    TimeUnit.SECONDS.sleep(500);
    ss.shutdown();
  }

  // 停止生产者和消费者的执行
  public void shutdown() {
    producer.terminate(true);  // 先停止生产者,只有在生产者完全停止之后才会停止消费者
    consumer.terminate();  // 停止消费者
  }

  // 启动生产者和消费者
  public void init() {
    producer.start();
    consumer.start();
  }

  // 生产者
  private class Producer extends TerminatableSupport {
    private int i = 0;

    @Override
    protected void doRun() throws Exception {
      queue.put(String.valueOf(i++));  // 将任务添加到任务队列中
      consumer.terminationToken.reservations.incrementAndGet();  // 更新需要执行的任务数量
    }
  }

  // 消费者
  private class Consumer extends TerminatableSupport {
    @Override
    protected void doRun() throws Exception {
      String product = queue.take();  // 获取任务
      System.out.println("Processing product: " + product);
      try {
        TimeUnit.SECONDS.sleep(new Random().nextInt(100));  // 模拟消费者对任务的执行
      } catch (InterruptedException e) {
        // ignore
      } finally {
        terminationToken.reservations.decrementAndGet();  // 更新需要执行的任务数量
      }
    }
  }
}

       可以看到,在子类使用两阶段终止模式时,其只需要实现各自所需要执行的任务,并且更新当前任务的数量即可。在某些情况下,当前任务的数量也可以不进行更新,比如在进行终止时,不关心当前剩余多少任务需要执行。

© 著作权归作者所有

爱宝贝丶

爱宝贝丶

粉丝 340
博文 136
码字总数 456051
作品 0
武汉
程序员
私信 提问
加载中

评论(1)

引鸩怼孑
引鸩怼孑
����
Java多线程19 两阶段终止模式(Two-Phase Termination Patter)

Java多线程目录 有时候,我们希望提前结束线程,但安全可靠地停止线程,并不是一件容易的事情,如果立即停止线程,会使共享的数据结构处于不一致的状态,如目前已经废弃使用的Thread类的sto...

香沙小熊
02/12
0
0
异步编程解决方案

前言 本文节选朴灵《深入浅出的nodejs》第四章异步编程,整理加分析而得。 异步编程难点 1 异常处理 我们通常用try catch进行异常捕获,但是异步编程包括两个阶段,提交请求和处理,而方法通...

Tolonger
2017/10/31
0
0
读书笔记之《Java并发编程的艺术》-并发编程基础

读书笔记部分内容来源书出版书,版权归本书作者,如有错误,请指正。 欢迎star、fork,读书笔记系列会同步更新 git https://github.com/xuminwlt/j360-jdk module j360-jdk-thread/me.j360....

Hi徐敏
2015/11/11
4K
8
python之多线程并发处理模块-threading

thread:多线程的底层支持模块,一般不建议使用; threading:对thread进行了封装,将一些线程的操作对象化,一般采用这种方法实现多线程编程 多线程实现有两种模式: 1.创建线程要执行的函数...

zhpfxl
2016/12/27
0
0
Python3基础之学习笔记(九)-线程-进程-协程

文章目录 1. 线程与进程 2. 协程 1. 线程与进程 进程:是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础。 线程:是操作系统能够...

GoldenKitten
01/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

SSH安全加强两步走

从 OpenSSH 6.2 开始已经支持 SSH 多因素认证,本文就来讲讲如何在 OpenSSH 下启用该特性。 OpenSSH 6.2 以后的版本多了一个配置项 AuthenticationMethods。该配置项可以让 OpenSSH 同时指定...

xiangyunyan
45分钟前
5
0
C或C++不是C/C++

http://www.voidcn.com/article/p-mucdruqa-ws.html

shzwork
今天
6
0
OSChina 周六乱弹 —— 如何将梳子卖给和尚

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @for_ :划水五分钟,专注两小时。分享Various Artists的单曲《贝多芬第8号钢琴奏鸣曲悲伤的第三乐章》: 《贝多芬第8号钢琴奏鸣曲悲伤的第三乐...

小小编辑
今天
323
8
ES5

什么是ES5:比普通js运行要求更加严格的模式 为什么:js语言本身有很多广受诟病的缺陷 如何:在当前作用域的顶部添加:"use strict" 要求: 1、禁止给未声明的变量赋值 2、静默失败升级为错误...

wytao1995
今天
7
0
c++ 内联函数调用快的原因

见图片分析

天王盖地虎626
今天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部