Java SE 7新特性

原创
2014/03/25 11:53
阅读数 145

java SE 7对juc包进行了更新,增加了新的轻量级任务执行框架fork/join和多阶段thread同步工具。

lightweight task execute framework- ---fork/join

    该框架的目的是为了更好的利用底层平台上的多核cpu和多处理器来进行并行处理,解决问题时通过使用分治(divide and conquer)算法或map/reduce算法来进行。该名称来源于使用时的两个基本的操作fork,join,可以类比于map/reduce中的map和reduce操作。fork操作是把一个大的问题划分成若干个较小的问题,这个划分过程一般是递归进行的,直到得到可以直接进行计算的粒度合适的子问题。在划分时需要恰当选取子问题的大小。太大的子问题不利于通过并行方式来提高性能,而太小子问题则会带来较大的额外开销。每个子问题在计算完成之后,可以得到关于整个问题的部分解。join操作就是把这些部分解或结果收集并组织起来,得到最终完整的结果。调用join也可能是递归进行的,与fork操作相对应。

    相对于一般threadpool的实现,fork/join优势体现在对其中包含的任务的处理方式上。一般threadpool中如果一个thread正在执行的任务由于某些原因无法继续运行,则该thread牌等待状态。而在fork/join框架实现中,如果某个子问题由于等待另外一个子问题的完成而无法运行,则处理该子问题的thread会主动寻找其他尚未运行的子问题来执行,这种方式减少了thread等待时间,提高了性能。为了高效运行,每个fork/join子问题中应该避免使用synchronized关键词或其他方式来进行同步,也不应使用阻塞式io操作或过多的访问共享变量。在理想情况下,每个子问题实现中都应该只进行cpu去处,而且只使用每个问题的内部对象,唯一的同步应该只发生在子问题和创建它的父问题之间。

    一个fork/join框架执行的任务由ForkJoinTask类表示。它实现了Future接口,可以照future接口的方式来使用。中间有两个最重要的方法form和join,fork方法以异步方式启动任务的执行,而join则等待任务完成并返回执行的结果。创建自己的任务时,最好不要直接继承自ForkJoinTask,而是继承自它的子类RecursiveTask或RecursiveAction类。两者区别是前者表示的任务可以返回结果,后者则不可以。

    任务的执行由ForkJoinPool类的对象来完成。它实现了ExecutorService接口,还可以使用一般的Callable/Runnable接口来表示任务。forkjoinpool中执行的任务可以两类,一是通过execute,invoke,submit方法直接提交的任务,另外一类是在执行过程中产生的子任务,并通过fork方法来运行。一般做法是表示整个问题的forkjointask类的对象用第一类形式提交,而在执行过程中产生的子任务并不需要进行处理,forkjoinpool类对象会负责子任务的执行。

    示例使用fork/join查找数组中最大值。每个查找任务都在一定数组序号区间范围内进行。如果当前区间范围较大,则将其划分成两个子区间,对每个子区间创建子任务来进行查找,递归整个过程。直到范围足够小,再在这个区间中进行顺序比较来查找最大值。通过fork启动子任务,通过join方法来获取子任务的执行结果。子任务执行完成后,再把得到的部分结果合并到最终结果中,这就是一典型的使用分治算法实现的方式。

    public class MaxVAlue{
        private static final int RANGE_LENGTH=2000;
        private final ForkJoinPool forkJoinPool = new ForkJoinPool();
        private static class MaxValueTask extends RecursiveTask<Long>{
            private final long[] array;
            private final int start;
            private final int end;
            MaxValueTask(long[] array,int start,int end){
                this.array = array;
                this.start = start;
                this.end = end;
            }
            protected Long compute(){
                long max = Long.MIN_VALUE;
                if(end - start <= RANG_LENGTH){
                    for(int i = start;i < end;i++){
                        if(array[i]>max){
                            max = array[i];
                        }
                    }
                }else{
                    int mid = (start+end)/2;
                    MaxValueTask lowTask = new MaxValueTask(array,start,mid);
                    MaxValueTask highTask = new MaxValueTask(array,mid,end);
                    lowTask.fork();
                    highTask.fork();
                    max= Math.max(max,lowTask.join());
                    max = Math.max(max,highTask.join());
                    
                }
                return max;
            }
        }
        //
        public void calculate(long[] array){
            MaxValueTask task = new MaxValueTask(array,0,array.length);
            Long result = forkJoinPool.invoke(task);
            System.out.println(result);
        }
    }

    以上代码实现方式的效率要低于直接对整个数组进行顺序比较的方式。只用来做一个演示。因为多thread所带来的额外开销过大。实际应用场景中,在一个目录中包含所有文本文件中搜索某个关键词时,可以为每个文件创建一个子任务。这种实现方式性能要优于单thread的查找方法。如果相关功能可以用递归和分治算法来解决就适合使用fork/join框架。


多阶段thread同步工具

    Phaser类是新增一个实用同步工具。所提供的功能和灵活性比之前介绍的倒数闸门和循环屏障要强很多。在fork/join框架中子任务间进行同步时,要优先使用Phaser类对象。它的特点是把多thread协作执行的任务切分为多个阶段phase,每个阶段上都可以有任意个参与者参与。thread可以随时注册并参与到某个阶段的执行中来。当一阶段中所有thread都成功完成后,phaser类对象会自动进入下一阶段。如果循环下去直到phaser类对象中不再包含任何参与者,此phaser对象运行自动结束。该对象被创建出来后,其初始阶段编号为0,在构造方法中可以指定初始的参与者的个数也可以在创建后使用register方法或bulkregister方法动态添加一个或多个参与者。当某参与者完成其任务后调用arriave方法来进行声明。有的参与者完成一次执行后不再继续参与,可以调用arriveandderegister方法在声明完成后取消自己的注册。如果还需要等待其他参与者完成,可以调用arriveandawaitadvance方法,此时thread会阻塞,直到phaser类对象成功进入下个阶段。




展开阅读全文
打赏
0
7 收藏
分享
加载中
更多评论
打赏
0 评论
7 收藏
0
分享
返回顶部
顶部