文档章节

java8 Stream之原理

my_juke
 my_juke
发布于 06/01 14:10
字数 1579
阅读 34
收藏 6

Stream

java8的Stream很重要,spring-reactor里面用到reactor-core,而java8的stream与之很相似,搞懂了再看reactor-core必定事半功倍。
先看一下它的强大,这里只是冰山一角:
从List<Student> 列表中取出name,将name组成一个List。
老代码

List<String> nameList = new ArrayList();
if(null != list){
	for(Student stu : list){
		nameList.add(stu.getName());
	}
}

JAVA8

List<String> nameList = Optional.ofNullable(list).orElse(Collections.emptyList()).stream()
                 .map(Stu::getName).collect(Collectors.toList());

Stream.of 创建Stream

这里给大家演示一下通过Stream.of创建Stream。
常见的集合通过stream()方法都可以创建Stream。 其实他们最终都是调用以下方法创建的。

    public static <T> Stream<T> stream(Spliterator<T> spliterator, boolean parallel) {
        Objects.requireNonNull(spliterator);
        return new ReferencePipeline.Head<>(spliterator,
                                            StreamOpFlag.fromCharacteristics(spliterator),
                                            parallel);
    }

Stream.of有两种创建Stream的方法。
第一种

Stream.of("a1")

第二种

Stream.of("a1","a2"); //这种通过Arrays.stream 构建

这里介绍两个相关的类:


如果是单个元素,直接使用Spliterator进行构建。 如果是多个元素,会有一个优化,使用SpineBuffer构建。
如果是大数组,使用SpineBuffer,小数组是使用ArrayList。 如何使用SpineBuffer构建?

Stream.builder().add("a1").add("a2").build();

Stream 相关概念

stream的操作分为两种:
一种是中间操作,就是不需要结果,只需要记录这个过程,一般返回Stream对象都是属于这种
一种是终极操作,就是立即需要返回结果,一般返回非Stream对象,都是属于这种。
stream的状态分为三种:
第一种:Head,第一次创建的时候就是这种
第二种:Stateless,无状态,每个对象的操作是独立的。
第三种:Stateful,有状态,需要联合多个象才能得出结果。
stream操作特性:
操作特性是指:该stream有固定大小,大小不固定,操作有序,数据有序等。

Stream.filter

顾名思义:对 Stream进行filter,然后返回新的Stream。 由前一节我们知道,stream的具体数据存储在Spliterator中。而它本身可以理解为只是一个算法。
filter只是一个中间操作,我们只需要记录这一个过程就OK了。然后返回新的Stream。如果再次调用fileter,会再次返回一个新Stream。

上面是一个流程图,Sink是包装算子的一个类,比如调用filter,从Head里面拿到对象,经过第一个Sink,再经过第二个Sink的运算,最终得到结果。
下面是Strea.filter的源码实现:

public final Stream<P_OUT> filter(Predicate<? super P_OUT> predicate) {
        Objects.requireNonNull(predicate);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        //如果通过当前filter,就进入下一个算子
                        if (predicate.test(u))
                            downstream.accept(u);
                    }
                };
            }
        };
    }

Stream.peek

这个方法可以理解为调试方法,它不对结果产生任何影响,将数据原封不动的传给下一个算子

    public final Stream<P_OUT> peek(Consumer<? super P_OUT> action) {
        Objects.requireNonNull(action);
        return new StatelessOp<P_OUT, P_OUT>(this, StreamShape.REFERENCE,
                                     0) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<P_OUT> sink) {
                return new Sink.ChainedReference<P_OUT, P_OUT>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        action.accept(u);
                        downstream.accept(u);
                    }
                };
            }
        };
    }

Stream.flatMap

算子应该是通过一个对象映身成一个Stream,然后调用foreach,将每个元素传递到下一个算子。

public final <R> Stream<R> flatMap(Function<? super P_OUT, ? extends Stream<? extends R>> mapper) {
        Objects.requireNonNull(mapper);
        // We can do better than this, by polling cancellationRequested when stream is infinite
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT | StreamOpFlag.NOT_SIZED) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void begin(long size) {
                        downstream.begin(-1);
                    }

                    @Override
                    public void accept(P_OUT u) {
                        try (Stream<? extends R> result = mapper.apply(u)) {
                            // We can do better that this too; optimize for depth=0 case and just grab spliterator and forEach it
                            if (result != null)
                                result.sequential().forEach(downstream);
                        }
                    }
                };
            }
        };
    }

Stream.map

与上面的类似,只是映射成另一个对象

    public final <R> Stream<R> map(Function<? super P_OUT, ? extends R> mapper) {
        Objects.requireNonNull(mapper);
        return new StatelessOp<P_OUT, R>(this, StreamShape.REFERENCE,
                                     StreamOpFlag.NOT_SORTED | StreamOpFlag.NOT_DISTINCT) {
            @Override
            Sink<P_OUT> opWrapSink(int flags, Sink<R> sink) {
                return new Sink.ChainedReference<P_OUT, R>(sink) {
                    @Override
                    public void accept(P_OUT u) {
                        downstream.accept(mapper.apply(u));
                    }
                };
            }
        };
    }

Stream.limit

这是一个有状态的操作,因为它返回一定数据的数据组成的Stream。 这里只贴一段核心算法:

Sink<T> opWrapSink(int flags, Sink<T> sink) {
                return new Sink.ChainedReference<T, T>(sink) {
                    long n = skip;
                    long m = limit >= 0 ? limit : Long.MAX_VALUE;

                    @Override
                    public void begin(long size) {
                        downstream.begin(calcSize(size, skip, m));
                    }

                    @Override
                    public void accept(T t) {
                        if (n == 0) {
                            if (m > 0) {
                                m--;
                                downstream.accept(t);
                            }
                        }
                        else {
                            n--;
                        }
                    }

                    @Override
                    public boolean cancellationRequested() {
                        return m == 0 || downstream.cancellationRequested();
                    }
                };
            }

Stream.skip

这个与Stram.limit类似,两个联合起来就可以分面查询了。

Stream.sorted

排序,如果没传比较器就用默认的。
如果有顺序,就不用排序了,如果给定大小了就用一个固定大小的数组来排序,否则用一个列来来排序。

public Sink<T> opWrapSink(int flags, Sink<T> sink) {
            Objects.requireNonNull(sink);

            // If the input is already naturally sorted and this operation
            // also naturally sorted then this is a no-op
            if (StreamOpFlag.SORTED.isKnown(flags) && isNaturalSort)
                return sink;
            else if (StreamOpFlag.SIZED.isKnown(flags))
                return new SizedRefSortingSink<>(sink, comparator);
            else
                return new RefSortingSink<>(sink, comparator);
        }

通过排序,分页,说明这个算子需要支持开始,结束方法。还需要一个取消方法,为什么了,比如第一个Stream有20个对象,但是后面只需要第一个,所以我第一个算子给到你一个数据时,第一个算子就需要终止了。

Stream.anyMatch

下面看一个anyMatch是怎么实现的。

    @Override
    public final boolean anyMatch(Predicate<? super P_OUT> predicate) {
        return evaluate(MatchOps.makeRef(predicate, MatchOps.MatchKind.ANY));
    }

第二步,主要是用当前stream,和原始的数据容器spliterator

    final <R> R evaluate(TerminalOp<E_OUT, R> terminalOp) {
        assert getOutputShape() == terminalOp.inputShape();
        if (linkedOrConsumed)
            throw new IllegalStateException(MSG_STREAM_LINKED);
        linkedOrConsumed = true;

        return isParallel()
               ? terminalOp.evaluateParallel(this, sourceSpliterator(terminalOp.getOpFlags()))
               : terminalOp.evaluateSequential(this, sourceSpliterator(terminalOp.getOpFlags()));
    }

第三步,最后一个算子和原始容器

        @Override
        public <S> Boolean evaluateSequential(PipelineHelper<T> helper,
                                              Spliterator<S> spliterator) {
            return helper.wrapAndCopyInto(sinkSupplier.get(), spliterator).getAndClearState();
        }

第四步 包装算子

    final <P_IN> Sink<P_IN> wrapSink(Sink<E_OUT> sink) {
        Objects.requireNonNull(sink);

        for ( @SuppressWarnings("rawtypes") AbstractPipeline p=AbstractPipeline.this; p.depth > 0; p=p.previousStage) {
            sink = p.opWrapSink(p.previousStage.combinedFlags, sink);
        }
        return (Sink<P_IN>) sink;
    }

第五步 数据传递

    @Override
    final <P_IN, S extends Sink<E_OUT>> S wrapAndCopyInto(S sink, Spliterator<P_IN> spliterator) {
        copyInto(wrapSink(Objects.requireNonNull(sink)), spliterator);
        return sink;
    }
	
    final <P_IN> void copyInto(Sink<P_IN> wrappedSink, Spliterator<P_IN> spliterator) {
        Objects.requireNonNull(wrappedSink);
     //满足要求后,是否需要停止计算
        if (!StreamOpFlag.SHORT_CIRCUIT.isKnown(getStreamAndOpFlags())) {
            wrappedSink.begin(spliterator.getExactSizeIfKnown());
            spliterator.forEachRemaining(wrappedSink);
            wrappedSink.end();
        }
        else {
		//需要停止计算
            copyIntoWithCancel(wrappedSink, spliterator);
        }
    }

Stream.spliterator

只需要一个Sink,然后调用wrapSink,再copyInto就可以实现了

    final <P_IN> Spliterator<P_OUT> wrap(PipelineHelper<P_OUT> ph,
                                     Supplier<Spliterator<P_IN>> supplier,
                                     boolean isParallel) {
        return new StreamSpliterators.WrappingSpliterator<>(ph, supplier, isParallel);
    }

© 著作权归作者所有

my_juke
粉丝 4
博文 34
码字总数 22261
作品 0
深圳
私信 提问
跟上Java8 - Stream API快速入门

跟上Java8 - Stream API快速入门 王爵的技术博客2017-07-1821 阅读 streamfilterJava8 在前面我们简单介绍了 表达式,Java8旨在帮助程序员写出更好的代码, 其对核心类库的改进也是关键的一部...

王爵的技术博客
2017/07/18
0
0
Java8 Stream详解 & 原理深度解析

Java8中提供了Stream对集合操作作出了极大的简化,学习了Stream之后,我们以后不用使用for循环就能对集合作出很好的操作。 一、流的初始化与转换: Java中的Stream的所有操作都是针对流的,所...

勇恒
2018/06/08
124
0
Java8 Stream原理深度解析

常用的流操作 在深入原理之前,我们有必要知道关于Stream的一些基础知识,关于Stream的操作分类,如表1-1所示。 表1-1 Stream的常用操作分类(表格引自这里) 如表1-1中所示,Stream中的操作可...

长河
06/03
0
0
Java8新特性(一)_interface中的static方法和default方法

为什么要单独写个Java8新特性,一个原因是我目前所在的公司用的是jdk8,并且框架中用了大量的Java8的新特性,如上篇文章写到的stream方法进行过滤map集合。stream方法就是接口Collection中的...

阳光温暖
2018/07/31
0
0
Java8新特性系列(Stream)

上期我们分析了Java8中的引用,本期我们将分析Java8中的另一个重要的新特性:流Stream。 Stream是什么? 在Java8源代码中,是这么定义Stream的: A sequence of elements supporting sequen...

码上论剑
2018/02/03
0
0

没有更多内容

加载失败,请刷新页面

加载更多

postman批量测试

postman批量调用: 先单个调用,成功了 再save为collection 再点击三角形,点击run 设置1000次,run就可以 见《postman批量测试.docx》

Danni3
34分钟前
8
0
js 对象操作 js 对象和对象赋值 去除关联性 对象原型操作 把一个对象A赋值给另一个对象B 并且对象B 修改 不会影响 A对象

当我们在项目需要 复制一个对象到另一个对象并且 被复制的对象不能受复制后的对象的影响。 我先总结下 我们哪些方法可以复制对象 // 直接赋值var obj1 = { a: 1 };var obj2 = obj1;...

xiaogg
35分钟前
7
0
Go微服务全链路跟踪详解

在微服务架构中,调用链是漫长而复杂的,要了解其中的每个环节及其性能,你需要全链路跟踪。 它的原理很简单,你可以在每个请求开始时生成一个唯一的ID,并将其传递到整个调用链。 该ID称为C...

倚天码农
49分钟前
8
0
QML笔记-对QML中信号与槽的基本认识

版权声明:本文为博主原创文章,遵循 CC 4.0 BY-SA 版权协议,转载请附上原文出处链接和本声明。 本文链接:https://blog.csdn.net/qq78442761/article/details/90753986 目录 基本概念 演示...

shzwork
56分钟前
5
0
SSH安全加强两步走

从 OpenSSH 6.2 开始已经支持 SSH 多因素认证,本文就来讲讲如何在 OpenSSH 下启用该特性。 OpenSSH 6.2 以后的版本多了一个配置项 AuthenticationMethods。该配置项可以让 OpenSSH 同时指定...

xiangyunyan
今天
8
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部