Fork/Join粗解
Fork/Join粗解
多弗哥 发表于5个月前
Fork/Join粗解
  • 发表于 5个月前
  • 阅读 2
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 新注册用户 域名抢购1元起>>>   

    这些天自我感觉知识比较落后,在上网找了一些资料后,复习了一下Java7的特性同时也在Java8中优化较多的框架——Fork/Join。原理和设计方面,Doug Lea写的《A Java Fork/Join Framework》,网上有免费完整版可供下载 ①,这里我只做自己的笔记。

 

1. Fork/Join介绍

    Fork/Join是Java7以来提供的一个用于并行执行任务的框架,它把一个大任务分解成若干个小任务,最终汇总每个小任务结果后获得完整结果的框架。简单的说,Fork/Join核心思想就是分治:Fork分解任务,Join收集结果。

    同其他分治算法一样,Fork/Join总是会递归的、反复的去分解任务,直到这些子任务可以用足够简单、短小的方式来执行。所以需要恰当地选取子任务的大小,太大的子任务不利于通过并行方式来提高性能,而太小的子任务则会带来较大的额外开销。

    

 

2. 工作窃取算法

    工作窃取算法(work-stealing)是指某个线程从其他队列里窃取任务来执行。意思就是,当任务正在等待其它的任务结束时,正在执行它的工作线程可以允许执行其他正在等待的任务。这种轻量级调度机制是Fork/Join框架的核心。在这种行为下,Fork/Join框架比Runnable和Callable对象本身提供一种更高效的任务管理。

    

    举个例子,假设我们要做一个大任务,我们可以把这个大任务分解为若干互不依赖的子任务。为了减少线程间的竞争,我们把这些子任务分别放到不同的队列里,并为每个队列创建一个单独的线程来执行队列里的任务,线程和队列一一对应。但是有的线程会先把自己队列里的子任务做完,而其他队列里还有任务在等待处理。为了尽可能的使用CPU资源,于是这个线程就去其他队列里窃取一个子任务来执行,但这意味着此时两个线程可能会访问同一个队列,为了减少窃取任务线程和被窃取任务线程之间的竞争,我们使用双端队列WorkQueue,工作线程永远从双端队列的头部获取任务执行(LIFO,最早优先),而窃取任务的线程永远从双端队列的尾部获取任务执行(FIFO)。

    使用后进先出(LIFO)来处理每个工作线程自己的任务,而先进先出(FIFO)则用于获取其它的任务,这是一种被广泛使用的进行分治递归的一种调优手段。让窃取任务的线程从队列拥有者相反的方向进行操作,这也体现了递归分治算法的大任务优先策略。因此,更早期被窃取的任务有可能会提供一个更大的单元任务,从而使得窃取线程不至于“频繁窃取”,因为对于一些基础的操作而言,使用相对较小粒度的任务比那些仅仅使用粗粒度划分的任务以及那些没有使用递归分解的任务的运行速度要快。

    需要注意的是,子任务的使用其实有一定的约定:子任务中应该避免使用synchronized关键词或其他方式方式的同步,也不应该是一阻塞IO或过多的访问共享变量。在理想情况下,每个子任务的实现,都应该只进行CPU相关的计算,并且只使用每个子任务的内部对象。唯一的同步应该只发生在子任务和创建它的父任务之间。

    work-stealing算法的优点是充分利用线程进行并行计算,并减少了线程间的竞争,提高了性能。但也有缺点:

  1. 经常会在处理任务的同步上遇到问题,比如双端队列里只有一个任务的时候。在Fork/Join框架中,这种情况下的线程竞争有时会导致线程强制睡眠;
  2. 从其他队列窃取任务的开销其实要比在自己队列中执行pop操作的开销大;
  3. 消耗了更多的系统资源,比如创建多个线程和多个双端队列。

 

3. Fork/Join的使用

    

  • ForkJoinTask

    ForkJoinTask表示一个可以并行、合并的任务,它也实现了Future接口,可以按照Future接口的方式来使用。我们通常并不直接继承 ForkJoinTask,它包含了太多的抽象。针对特定的问题,我们可以选择ForkJoinTask不同的子类来完成任务:RecurisiveTask或RecurisiveAction类。它们之间的区别在于,RecurisiveTask类代表的是一类带有返回值的任务,而RecursiveAction类则代表了一类最简单的不需要返回值的ForkJoinTask。

    ForkJoinTask有两个主要的方法:
    * fork() – 该方法决定了ForkJoinTask的异步执行,凭借这个方法可以创建新的任务;
    * join()  – 该方法负责在任务完成后并返回指向结果,因此允许一个任务等待另一任务执行完成。

    Fork/Join框架任务的执行除由ForkJoinTask类的对象之外,还可以使用一般的Callable和Runnable接口来表示任务。

  • ForkJoinPool

    ForkJoinPool类是ForkJoinTask实例的执行者,它是ExecutorService的实现类,表示一种特殊的线程池。ForkJoinPool的主要任务就是工作窃取(work-stealing),其线程尝试发现和执行其他任务创建的子任务。ForkJoinPool默认的线程数通过Runtime.availableProcessors()获得,因为在计算密集型的任务中,获得多于处理性核心数的线程并不能获得更多性能提升。

    在ForkJoinPool对象中执行的任务大体可以分为两类,一类通过execute()、invoke()或submit()提交的任务,一类是ForkJoinTask对象在执行过程中产生的子任务,并通过fork方法来运行。其中,前者从关系上还可以区分出,一个是从ExecutorService继承过来的接口进来的任务,一个是ForkJoinPool自己重载的接口进来的任务。这里有几个细节:

客户端调用 内部调用
execute(),submit() ForkJoinTask.fork()
invoke(),invokeAny(),invokeAll() ForkJoinTask.invoke()

    1)通过ExecutorService接口进来的任务(Runnable、Callback的接口实现),ForkJoinPool类不会对它们使用work-stealing算法。该算法只适用于ForkJoinTask对象;

    2)与ExecutorService接口使用保持一致,不论入参是Runnable、Callback还是ForkJoinTask实现,execute()、submit()均是异步调用,invoke()、invokeAny()、invokeAll()则使用同步调用(不会立即返回,直到传递的参数任务完成它的执行);

    3)值得一提的是,ForkJoinTask类同样提供了invokeAll()方法,它接收一个ForkJoinTask和一个可变范型参数T,或者接收一个泛型类型T的集合并要求,这个泛型T必须是ForkJoinTask类或其子类。

    另外,《Java 7 Concurrency Cookbook》也有提到,译文 ②如下:

当你在ForkJoinPool中执行ForkJoinTask时,你可以使用同步或异步方式来实现。当你使用同步方式时,提交任务给池直到提交的任务执行完成,才会返回结果。当你使用异步方式时,提交任务给执行者并且立即返回,所以这个任务可以并行执行。你应该意识到这两个方法有很大的区别,当你使用同步方法,调用这些方法(比如:invokeAll()方法)的任务将被阻塞,直到提交给池的任务执行完成。这允许ForkJoinPool类使用work-stealing算法,分配一个新的任务给正在执行睡眠任务的工作线程。反之,当你使用异步方法(比如:fork()方法),这个任务将继续它的执行,所以ForkJoinPool类不能使用work-stealing算法来提高应用程序的性能。在这种情况下,只有当你调用join()或get()方法来等待任务的完成时,ForkJoinPool才能使用work-stealing算法。

 

你可以使用ForkJoinTask#join()方法来等待任务的结束,并获得它们的结果。对于这个目的,你也可以使用get()方法的两个版本之一:
 - get():这个版本的get()方法,如果ForkJoinTask已经结束它的执行,则返回compute()方法的返回值,否则,等待直到它完成。
 - get(long timeout, TimeUnit unit):这个版本的get()方法,如果任务的结果不可用,则在指定的时间内等待它。如果超时并且任务的结果仍不可用,这个方法返回null值。

而get()和join()有两个主要的区别:
 - join()方法不能被中断。如果你中断调用join()方法的线程,这个方法将抛出InterruptedException异常。
 - 如果执行的任务抛出任何未受检异常,get()方法将返回一个ExecutionException异常,而join()方法将返回一个RuntimeException异常。

  • 异常处理

    ForkJoinTask在执行时可能会抛出异常,但是没办法在主线程里直接捕获它,所以ForkJoinTask提供了isCompletedAbnormally()方法来检查任务是否已经抛出异常或已经被取消了,并且可以通过ForkJoinTask的getException方法获取异常。 

    getException方法返回Throwable对象,如果任务被取消了则返回CancellationException。如果任务没有完成或者没有抛出异常则返回null。

    《Java 7 Concurrency Cookbook》中的译文如下:

 在Java中有两种异常:
  - 已检查异常(Checked exceptions):这些异常必须在一个方法的throws从句中指定或在内部捕捉它们。比如:IOException或ClassNotFoundException。
  - 未检查异常(Unchecked exceptions):这些异常不必指定或捕捉。比如:NumberFormatException。

在ForkJoinTask类的compute()方法中,你不能抛出任何已检查异常,因为在这个方法的实现中,它没有包含任何抛出异常的声明。你必须包含必要的代码来处理异常。但是,你可以抛出(或者它可以被任何方法或使用内部方法的对象抛出)一个未检查异常。ForkJoinTask和ForkJoinPool类的行为与你可能的期望不同。程序不会结束执行,并且你将不会在控制台看到任何关于异常的信息。它只是被吞没,好像它没抛出异常。你可以使用ForkJoinTask类的一些方法,得知一个任务是否抛出异常及其异常种类。

当你在一个任务中抛出一个未检查异常时,它也影响到它的父任务(把它提交到ForkJoinPool类的任务)和父任务的父任务,以此类推。如果你修订程序的所有输出,你将会看到一些任务结束没有输出信息。这些任务是那些及其父任务抛出异常的任务。它们全部异常地完成。考虑到这一点,当你使用ForkJoinPool和ForkJoinTask对象开发一个程序,当你不想要这种行为时,可以抛出异常。如果不是抛出异常,你可以使用ForkJoinTask类的completeExceptionally()方法。

  • 取消任务

当你在一个ForkJoinPool类中执行ForkJoinTask对象时,在它们开始执行之前,你可以取消执行它们。ForkJoinTask类提供cancel()方法用于这个目的。当你想要取消一个任务时,有几点是你必须要考虑的,如下:
 - ForkJoinPool类并没有提供任何方法来取消正在池中运行或等待的所有任务。
 - 当你取消一个任务时,你不能取消一个已经执行的任务。

ForkJoinTask提供cancel()方法,允许你取消一个还未执行的任务,这是一个非常重要的点。如果任务已经开始它的执行,那么调用cancel()方法对它没有影响。这个方法接收一个Boolean值,名为mayInterruptIfRunning的参数。这个名字可能让你觉得,如果你传入一个true值给这个方法,这个任务将被取消,即使它正在运行。
Java API指出,在ForkJoinTask类的默认实现中,这个属性不起作用。任务只能在它们还未开始执行时被取消。一个任务的取消不会影响到已经提到到池的(其他)任务。它们继续它们的执行。 Fork/Join框架的一个局限性是,它不允许取消在ForkJoinPool中的所有任务。如果一个任务由于它正在运行或已经完成而不能被取消,cancel()方法返回false值,所以,你可以尝试取消所有任务,而不用担心可能有间接的影响。

 

注:

    ① 《A Java Fork/Join Framework》,原文:http://gee.cs.oswego.edu/dl/papers/fj.pdf,译文:http://ifeve.com/a-java-fork-join-framework/

   ② 《Java 7 Concurrency Cookbook》,原文:http://it-ebooks.info/book/1116/,译文:http://ifeve.com/java-7-concurrency-cookbook/

 

标签: Java
共有 人打赏支持
粉丝 6
博文 29
码字总数 106691
×
多弗哥
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: