文档章节

Java8流Stream中间操作、终止操作运行流程源码分析

aronykl
 aronykl
发布于 2017/08/12 15:16
字数 2054
阅读 102
收藏 2

通过前面的博客的介绍,我们知道Stream有一个源,0个或者多个中间操作,以及一个终止操作。Stream只有遇到终止操作,它的源才开始执行遍历操作,而且只会进行一次遍历,而不是每个操作都执行一次遍历。今天,我们就从源码的层面来分析一下JDK这一块是怎么实现的。

首先看下面一段代码,下面将以这一段代码来进行分析:

List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8);
list.stream().filter(i -> i % 2 == 0)
        .limit(3)
        .map(String::valueOf)
        .forEach(System.out::println);

上面这段代码,中间操作,短路操作,终止操作全部都有了。

首先list.stream()会返回一个Stream对象。我们可以跟进去,看看返回的到底是个什么对象。

default Stream<E> stream() {
    return StreamSupport.stream(spliterator(), false);
}

继续跟到StreamSupport.stream:

public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator, StreamOpFlag.fromCharacteristics(spliterator), parallel);
}

可以看到返回的是一个ReferencePipeline.Head的实例。再跟进去,可以发现Head是ReferencePipeline类的一个静态内部类,并且Head还继承了ReferencePipeline,JDK注释说它表示的是ReferencePipeline的一个源阶段。

再来看看ReferencePipeline这个类。它是一个抽象类,继承了AbstractPipeline这个类并且实现了Stream接口。我们将上面的操作跟进去看实现,会发现filter,limit,map,forEach这些方法都是在这个类中实现的。也就是说ReferencePipeline这个类提供了大量的对流的操作的实现。

继续分析ReferencePipeline这个类,它的父类AbstractPipeline中定义了三个个AbstractPipeline类型的变量:sourceStage(源阶段),previousStage(上游pipeline,前一阶段),nextStage(下一阶段)。根据JDK文档的对着三个属性的说明可以知道:ReferencePipeline实际上是一个双向链表的数据结构。而ReferencePipeline对Stream的操作做了实现,每一个中间操作都会返回一个Stream对象,实际上就是ReferencePipeline对象,因此可以得到结论:Stream底层是通过双向链表来实现的。

经过上面的分析,我们就知道了流的源阶段返回的Stream和中间阶段返回的Stream到底是什么了。源阶段返回的是ReferencePipeline.Head对象,而中间操作阶段返回的是ReferencePipeline对象。在流的源阶段和中间阶段仅仅只是返回了ReferencePipeline对象,并没有做其他的方法调用操作,这也是为什么流在执行中间操作并不会有任何的输出或者结果产生。

那么这些中间操作在执行终止操作的时候是怎么被调用的呢?还有流的短路是怎么实现的呢?

接着上面,跟进filter方法的实现:

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE, StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

可以看到仅仅只是new一个StatelessOp对象进行返回。跟进去StatelessOp发现它也是ReferencePipeline的一个静态内部类,并且也继承了ReferencePipeline,也印证了上面说的中间操作返回的都是ReferencePipeline对象。上面的代码还可以看到,StatelessOp对象对opWrapSink这个方法做了实现:返回一个Sink.ChainedReference对象。Sink.ChainedReference又是一个静态内部类,并且继承了Sink。并且Sink.ChainedReference中有一个Sink类型的属性downstream,可以知道:Sink.ChainedReference是一个单链表的数据结构。

跟到父类里面查看JDK对于opWrapSink这个方法的说明:接受一个用于接受操作结果的Sink对象并且返回一个接受当前操作输入类型元素的Sink。有点迷惑,查看一下这个Sink到底是什么。跟进去看JDK说明:Sink是Cosumer的一个子类,用来引导值来通过流管道的各个阶段,通过提供一些额外的方法实现来管理大小信息、控制流动等等。可以看到里面提供了一个accept方法,参数接受的是流中的值,用来定义一种行为,这个行为也就是流的操作。因此可以总结出的是:对于流的各种操作,底层其实是被封装成一个个的Sink(对于引用类型其实是Sink.ChainedReference)对象来进行操作的。而Sink.ChainedReference又是一种单链表的数据结构,所以,流中的操作会以单链表的形式被链接起来。而且查看各种中间操作的实现,可以发现opWrapSink这个方法返回的Sink.ChainedReference对象的accept方法实现中,都会调用下游Sink的accept方法。这无疑就是行为的一种串联。这也就解释了,流在遇到终止操作过后,是怎么实现的遍历的时候是每个元素经历一连串的操作,然后再遍历下一个元素。

最后在分析一下流的终止操作是什么样的以及流的短路是怎么实现的。

最后的forEach方法跟进去,这里肯定进入的是ReferencePipeline的forEach方法,因为Head是源的Pipeline。

public void forEach(Consumer<? super P_OUT> action) {
    evaluate(ForEachOps.makeRef(action, false));
}

看下ForEachOps.makeRef的实现:

public static <T> TerminalOp<T, Void> makeRef(Consumer<? super T> action, boolean ordered) {
        Objects.requireNonNull(action);
        return new ForEachOp.OfRef<>(action, ordered);
}

看到最终返回一个ForEachOp.OfRef对象,跟进去发现ForEachOp.OfRef是ForEachOp的一个静态内部类并且继承了ForEachOp。并且可以查看到ForEachOp这个类有一个祖先是Sink。那说明ForEachOps.makeRef这个方法的作用就是把forEach操作封装成Sink对象。

在查看evaluate方法实现:

final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel() 
            ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags())) 
            : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
}

在跟到串行流的实现:

public <S> Void evaluateSequential(PipelineHelper<T> helper, Spliterator<S> spliterator) {
    return helper.wrapAndCopyInto(this, spliterator).get();
}

这里的第一个参数PipelineHelper类型其实是AbstractPipeline的父类,而AbstractPipeline又是ReferencePipeline的父类。再跟进helper.wrapAndCopyInto方法:

final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
    copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
    return sink;
}
final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
    Objects.requireNonNull(sink);

    for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
        sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
    }
    return (Sink<P_IN>) sink;
}
final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
    Objects.requireNonNull(wrappedSink);

    if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        spliterator.forEachRemaining(wrappedSink);
        wrappedSink.end();
    }
    else {
        copyIntoWithCancel(wrappedSink, spliterator);
    }
}

首先看wrapSink方法,可以看到通过ReferencePipeline的双向链表,从最后一个操作(也就是终止操作)往前遍历,将所有的操作都串联起来,最终返回一个指向第一个操作的Sink引用。这里的p.opWrapSink调用的其实就是每个操作返回的ReferencePipeline对象的opWrapSink方法。前面对这个方法做过了分析。

再来分析copyInto方法。可以看到逻辑分为短路操作和非短路操作,如果有短路操作就会执行下面的copyIntoWithCancel方法,否则指向上面的逻辑,最终会对封装好的sink执行accept,而每一个sink的accept方法里优惠调用下游sink的accept方法,来实现操作串联。

最后分析下短路:

final <P_IN> void copyIntoWithCancel(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        @SuppressWarnings({"rawtypes","unchecked"})
        AbstractPipeline p = AbstractPipeline.this;
        while (p.depth > 0) {
            p = p.previousStage;
        }
        wrappedSink.begin(spliterator.getExactSizeIfKnown());
        p.forEachWithCancel(spliterator, wrappedSink);
        wrappedSink.end();
    }
final void forEachWithCancel(Spliterator<P_OUT> spliterator, Sink<P_OUT> sink) {
        do { } while (!sink.cancellationRequested() && spliterator.tryAdvance(sink));
    }

可以看到短路流在执行遍历的时候会调用Sink封装的cancellationRequested方法,如果返回出就不会进行后面的操作。也就是说短路流在操作的时候都要重写这个方法,例如limit操作,肯定是对传入的参数m进行自减– 操作,等到m<=0,cancellationRequested这个方法肯定会返回true。
可以跟进去看下limit操作的实现:

public final Stream<P_OUT> limit(long maxSize) {
        if (maxSize < 0)
            throw new IllegalArgumentException(Long.toString(maxSize));
        return SliceOps.makeRef(this, 0, maxSize);
    }

下面是SliceOps.makeRef返回的ReferencePipeline.StatefulOp对象对于opWrapSink方法的实现:

Sink<T> opWrapSink(int flags, Sink<T> sink) {
    return new Sink.ChainedReference<T, T>(sink) {
        long n = skip;
        long m = limit >= 0 ? limit : Long.MAX_VALUE;

        @Override
        public void begin(long size) {
            downstream.begin(calcSize(size, skip, m));
        }

        @Override
        public void accept(T t) {
            if (n == 0) {
                if (m > 0) {
                    m--;
                    downstream.accept(t);
                }
             }
             else {
                 n--;
             }
        }

        @Override
        public boolean cancellationRequested() {
            return m == 0 || downstream.cancellationRequested();
        }
    };
}

本文转载自:http://blog.csdn.net/zw19910924/article/details/77018798

aronykl
粉丝 1
博文 10
码字总数 0
作品 0
杭州
程序员
私信 提问
学习Java8 中的 Stream(一)

不想再用for嵌套for操作了,java8 带来了新的API —— Stream,非常强大! Stream中文翻译成流,是一个支持串行和并行操作元素的序列,也是Lambda表达式配合使用的强大工具。 源码在java.ut...

MaxZing
2018/01/21
0
0
Java8新特性系列(Stream)

上期我们分析了Java8中的引用,本期我们将分析Java8中的另一个重要的新特性:流Stream。 Stream是什么? 在Java8源代码中,是这么定义Stream的: A sequence of elements supporting sequen...

码上论剑
2018/02/03
0
0
Java8新特性及实战视频教程完整版

百度网盘 资料 第1讲:课程介绍 第2讲:课程介绍续 第3讲:Lambda表达式初步与函数式接口 第4讲:深入函数式接口与方法引用 第5讲:Lambda表达式深入与流初步 第6讲:Function接口详解 第7讲...

远近高低各不同
2018/11/09
6
0
Java8-Stream-创建流

友情提示:如果要看懂Stream,得先把Lambda先搞明白,可以去Java8-Lambda表达式瞧一瞧 什么是Stream Stream 是 Java8 中处理集合的关键抽象概念,它可以指定你希望对集合进行的操作,可以执行...

特拉仔
2018/02/24
62
0
java8 -函数式编程之Stream

在 java8 -函数式编程之Lambda表达式、 java8 -函数式编程之四个基本接口 、java8 -函数式编程之Optional 三篇文章中,我们已经对函数式编程有了充分的了解,接下来,我们将会运用之前学到的...

细肉云吞
2018/12/24
335
1

没有更多内容

加载失败,请刷新页面

加载更多

OpenStack 简介和几种安装方式总结

OpenStack :是一个由NASA和Rackspace合作研发并发起的,以Apache许可证授权的自由软件和开放源代码项目。项目目标是提供实施简单、可大规模扩展、丰富、标准统一的云计算管理平台。OpenSta...

小海bug
昨天
5
0
DDD(五)

1、引言 之前学习了解了DDD中实体这一概念,那么接下来需要了解的就是值对象、唯一标识。值对象,值就是数字1、2、3,字符串“1”,“2”,“3”,值时对象的特征,对象是一个事物的具体描述...

MrYuZixian
昨天
6
0
数据库中间件MyCat

什么是MyCat? 查看官网的介绍是这样说的 一个彻底开源的,面向企业应用开发的大数据库集群 支持事务、ACID、可以替代MySQL的加强版数据库 一个可以视为MySQL集群的企业级数据库,用来替代昂贵...

沉浮_
昨天
6
0
解决Mac下VSCode打开zsh乱码

1.乱码问题 iTerm2终端使用Zsh,并且配置Zsh主题,该主题主题需要安装字体来支持箭头效果,在iTerm2中设置这个字体,但是VSCode里这个箭头还是显示乱码。 iTerm2展示如下: VSCode展示如下: 2...

HelloDeveloper
昨天
7
0
常用物流快递单号查询接口种类及对接方法

目前快递查询接口有两种方式可以对接,一是和顺丰、圆通、中通、天天、韵达、德邦这些快递公司一一对接接口,二是和快递鸟这样第三方集成接口一次性对接多家常用快递。第一种耗费时间长,但是...

程序的小猿
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部