文档章节

Java Streams,第 2 部分: 使用流执行聚合

CasparLi
 CasparLi
发布于 2017/01/16 17:57
字数 5118
阅读 5
收藏 0

Java Streams,第 2 部分: 使用流执行聚合

轻松地分解数据

本系列的 第 1 部分 介绍了 Java SE 8 中添加的 java.util.stream 库。第二期文章将重点介绍 Streams 库的一个最重要的、灵活的方面 — 聚合和汇总数据的能力。

累加器反模式 / The "accumulator antipattern"

第 1 部分中的第 1 个例子使用 Streams 执行了一次简单的汇总,如清单 1 所示。

清单 1. 使用 Streams 声明性地计算聚合值

int totalSalesFromNY = txns.stream()
                           .filter(t -> t.getSeller().getAddr().getState().equals("NY"))
                           .mapToInt(t -> t.getAmount())
                           .sum();

清单 2 展示了如何采用 “老方法” 编写这个示例。

清单 2. 通过命令计算同一个聚合值

int sum = 0;
for (Txn t : txns){
    if (t.getSeller().getAddr().getState().equals("NY")){
        sum += t.getAmount();
    }
}

第 1 部分介绍了尽管新方法比老方法更长,但新方法更可取的一些原因:

  • 代码更加清晰,因为它被简单地构造为一些简单操作的组合。
  • 该代码是通过声明进行表达的(描述想要的结果),而不是通过命令进行表达的(一个计算结果的循序渐进的过程)。
  • 随着表达的查询变得更加复杂,此方法可以更干净地扩展。

应用这个特殊的聚合是有一些额外原因的。清单 2 演示了累加器反模式,其中代码首先声明并初始化一个可变累加器变量 (sum),然后继续在循环中更新累加器。为什么这样做是不正确的?首先,此代码样式难以并行化。没有协调(比如同步),对累加器的每次访问都导致一次数据争用(而借助协调,协调导致的争用会破坏并行性所带来的效率收益)。

累加器方法更不可取的另一个原因是,它在太低的级别上建模计算 — 在各个元素的级别上,而不是在整个数据集的级别上。与 “逐个依次迭代交易金额,将每笔金额添加到一个已初始化为 0 的累加器” 相比,“所有交易金额的总和” 是目标的更抽象、更直接的陈述。

所以,如果命令式累加是错误的工具,那什么才是正确的工具?在这个特定的问题中,您已经看到了答案的线索(sum() 方法),但这只是一个强大的、通用的 缩减 技术的一种特殊情况。缩减技术简单、灵活,而且可并行化,还能在比命令式累加更高的抽象级别上操作。

缩减 / Reduction

缩减技术简单、灵活,而且可并行化,还能在比命令式累加更高的抽象级别上操作。

缩减(也称为 折叠)是一种来自函数编程的技术,它抽象化了许多不同的累加操作。给定一个类型为 T,包含 n 个元素的非空数列 X1, x2, ..., xn 和 T 上的二元运算符(在这里表示为 *),* 下的 X 的缩减 被定义为:

(x1 * x2 * ...* xn)

当使用普通的加法作为二元运算符来应用于某个数列时,缩减就是求和。但其他许多操作也可以使用缩减来描述。如果二元运算符是 “获取两个元素中较大的一个”(这在 Java 中可以使用拉姆达表达式 (x,y) -> Math.max(x,y) 来表示,或者更简单地表示为方法引用 Math::max),则缩减对应于查找最大值。

通过将累加描述为缩减,而不使用累加器反模式,可以采用更 抽象、更 紧凑、更 并行化 的方式来描述计算 —— 只要您的二元运算符满足一个简单条件:结合性。回想一下,如果 a、b 和 c 元素满足以下条件,二元运算符 * 就是结合性的:

((a * b) * c) = (a * (b * c))

结合性意味着 分组无关紧要。如果二元运算符是结合性的,那么可以按照任何顺序安全地执行缩减。在顺序执行中,执行的自然顺序是从左向右;在并行执行中,数据划分为分段,分别缩减每个分段,然后组合结果。结合性可确保这两种方法得到相同的答案。如果将结合性的定义扩展到 4 项,可能更容易理解:

(((a * b) * c) * d) = ((a * b) * (c * d))

左侧对应于典型的顺序计算;右侧对应于表示典型的并行执行的分区执行,其中输入序列被分解为几部分,各部分并行缩减,并使用 * 将各部分的结果组合起来。(或许令人惊奇的是,* 不需要是可交换的,但许多运算符通常都可用于缩减,比如相加和求最大值等。具有结合性但没有可交换性的二元运算符的一个例子是字符串串联。) Streams 库有多种缩减方法,包括:

Optional<T> reduce(BinaryOperator<T> op)
T reduce(T identity, BinaryOperator<T> op)

在这些方法中,最简单的方法仅获得一个结合性二元运算符,在该运算符下计算流元素的缩减结果。结果被描述为 Optional;如果输入流是空的,则缩减结果也是空的。(如果输入只有一个元素,那么缩减结果就是该元素。)如果您有一个字符串集合,您可以将这些元素的串联计算为:

/*原文有误, 这里我改了一下*/
Optional<String> concatenated = strings.stream().reduce(String::concat);

对于这两种方法中的第二种方法,您需要提供一个标识值,在字符串为空时,还可以将该值用作结果。标识值必须满足所有 x 的限制:

身份 * x = x x * 身份 = x

不是所有二元运算符都有标识值,但当它们拥有标识值时,它们可能不会得到您想要的结果。例如,计算最大值时,您可能倾向于使用值 Integer.MIN_VALUE 作为标识(它确实满足要求)。但在空流上使用该身份时,结果可能不是您想要的;您无法确定空输入和仅包含 Integer.MIN_VALUE 的非空输入之间的区别。(有时这不是问题,但有时会导致问题 —— 因此 Streams 库将其留给客户端,由他决定是否指定标识。)

对于字符串串联,标识是空字符串,所以您可以将前面的示例重写为:

String concatenated = strings.stream().reduce("", String::concat);

类似地,您可以将数组上的整数总和描述为:

int sum = Stream.of(ints).reduce(0, (x,y) -> x+y);

(但实际上,您使用了 IntStream.sum() 便捷方法。)

缩减不需要仅应用于整数和字符串,它可以应用于您想要将元素序列缩减为该类型的单个元素的任何情形。例如,您可以通过缩减来计算最高的人:

Comparator<Person> byHeight = Comparators.comparingInt(Person::getHeight);
BinaryOperator<Person> tallerOf = BinaryOperator.maxBy(byHeight);
Optional<Person> tallest = people.stream().reduce(tallerOf);

如果提供的二元运算符不是结合性的,或者提供的标识值实际上不是该二元运算符的标识,那么在并行执行该操作时,结果可能是错的,而且同一个数据集上的不同执行过程可能会生成不同的结果。

可变缩减 / Mutable reduction

缩减获取一个值序列并将它缩减为单个值,比如数列的和或它的最大值。但是有时您不想要单个汇总值;您想将结果组织为类似 List 或 Map 的数据结构,或者将它缩减为多个汇总值。在这种情况下,您应该使用 缩减 的可变类似方法,也称为 收集

考虑将元素累积到一个 List 中的简单情况。使用累加器反模式,您可以这样编写它:

ArrayList<String> list = new ArrayList<>();
for (Person p : people){
    list.add(p.toString());
}

当累加器变量是一个简单值时,缩减是累加的更好替代方法,与此类似,在累加器结果是更复杂的数据结构时,也有一种更好的替代方法。缩减的构建块是一个标识值和一种将两个值组合成新值的途径;可变缩减的类似方法包括:

  • 一种生成空结果容器的途径
  • 一种将新元素合并到结果容器中的途径
  • 一种合并两个结果容器的途径

这些构建块可以轻松地表达为函数。这些函数中的第 3 个支持并行执行可变缩减:您可以对数据集进行分区,为每一部分生成一个中间累加结果,然后合并中间结果。Streams 库有一个 collect() 方法,它接受以下 3 个参数:

<R> R collect(Supplier<R> supplier, BiConsumer<R, ? super T> accumulator, BiConsumer<R, R> combiner);

在前一节中,您看到了一个使用缩减来计算字符串串联的示例。该代码会生成正确的结果,但是,因为 Java 中的字符串是不可变的,而且串联要求复制整个字符串,所以它还有 O(n^2) 运行时(一些字符串将复制多次)。您可以通过将结果收集到 StringBuilder 中,更高效地表达字符串串联:

StringBuilder concat = strings.stream()
                              .collect(() -> new StringBuilder(),
                                       (sb, s) -> sb.append(s),
                                       (sb, sb2) -> sb.append(sb2));

此方法使用 StringBuilder 作为结果容器。传递给 collect() 的 3 个函数使用默认构造函数创建了一个空容器,append(String) 方法将一个元素添加到容器中,append(StringBuilder) 方法将一个容器合并到另一个容器中。使用方法引用可能可以比拉姆达表达式更好地表达此代码:

StringBuilder concat = strings.stream()
                              .collect(StringBuilder::new,
                                       StringBuilder::append,
                                       StringBuilder::append);

类似地,要将一个流收集到一个 HashSet 中,您可以这样做:

Set<String> uniqueStrings = strings.stream()
                                   .collect(HashSet::new,
                                            HashSet::add,
                                            HashSet::addAll);

在这个版本中,结果容器是一个 HashSet 而不是 StringBuilder,但方法是一样的:使用默认构造函数创建一个新的结果容器,使用 add() 方法将一个新元素合并到结果集中,使用 addAll() 方法合并两个结果集。很容易看到如何将此代码调整为其他任何类型的集合。

您可能会想,因为使用了可变结果容器(StringBuilderHashSet),所以这也是累加器反模式的一个例子。但其实不然。累加器反模式在这种情况下采用的类似方法是:

Set<String> set = new HashSet<>();
strings.stream().forEach(s -> set.add(s));

就像只要组合函数是结合性的,且没有相互干扰的副作用,就可以安全地并行化缩减一样,如果满足一些简单的一致性要求(在 collect() 的规范中列出),就可以安全地并行化使用了 Stream.collect() 的可变缩减。关键区别在于,对于 forEach() 版本,多个线程会同时尝试访问一个结果容器,而对于并行 collect(),每个线程拥有自己的本地结果容器,会在以后合并其中的结果。

收集器 / Collectors

传递给 collect() 的 3 个函数(创建、填充和合并结果容器)之间的关系非常重要,所以有必要提供它自己的抽象 Collector 和 collect() 的相应简化版本。字符串串联示例可重写为:

String contact = strings.stream().collect(Collectors.joining());

收集到结果集的示例可重写为:

Set<String> uniqueStrings = strings.stream().collect(Collectors.toSet());

Collectors 类包含许多常见聚合操作的因素,比如累加到集合中、字符串串联、缩减和其他汇总计算,以及创建汇总表(通过 groupingBy())。表 1 包含部分内置收集器的列表,而且如果它们不够用,编写您自己的收集器也很容易(请参阅 “自定义收集器” 部分)。

表 1. 内置收集器

收集器 行为
toList() 将元素收集到一个 List 中。
toSet() 将元素收集到一个 Set 中。
toCollection(Supplier<Collection>) 将元素收集到一个特定类型的 Collection 中。
toMap(Function<T, K>, Function<T, V>) 将元素收集到一个 Map 中,依据提供的映射函数将元素转换为键值。
summingInt(ToIntFunction<T>) 计算将提供的 int 值映射函数应用于每个元素(以及 long 和 double 版本)的结果的总和。
summarizingInt(ToIntFunction<T>) 计算将提供的 int 值映射函数应用于每个元素(以及 long 和 double 版本)的结果的 sum、min、max、count 和 average。
reducing() 向元素应用缩减(通常用作下游收集器,比如用于 groupingBy)(各种版本)。
partitioningBy(Predicate<T>) 将元素分为两组:为其保留了提供的预期的组和未保留预期的组。
partitioningBy(Predicate<T>, Collector) 将元素分区,使用指定的下游收集器处理每个分区。
groupingBy(Function<T,U>) 将元素分组到一个 Map 中,其中的键是所提供的应用于流元素的函数,值是共享该键的元素列表。
groupingBy(Function<T,U>, Collector) 将元素分组,使用指定的下游收集器来处理与每个组有关联的值。
minBy(BinaryOperator<T>) 计算元素的最小值(与 maxBy() 相同)。
mapping(Function<T,U>, Collector) 将提供的映射函数应用于每个元素,并使用指定的下游收集器(通常用作下游收集器本身,比如用于 groupingBy)进行处理。
joining() 假设元素为 String 类型,将这些元素联结到一个字符串中(或许使用分隔符、前缀和后缀)。
counting() 计算元素数量。(通常用作下游收集器。)

将收集器函数分组到 Collector 抽象中在语法上更简单,但实际收益来自您开始将收集器组合在一起时,比如您想要创建复杂的汇总结果(比如 groupingBy() 收集器创建的摘要)的时候,该收集器依据来自元素的一个键将元素收集到 Map 中。例如,要创建超过 1000 美元的交易的 Map,可以使用卖家作为键:

Map<Seller, List<Txn>> bigTxnsBySeller = txns.stream()
                                             .filter(t -> t.getAmount() > 1000)
                                             .collect(groupingBy(Txn::getSeller));

但是,假设您不想要每个卖家的交易 List,而想要来自每个卖家的最大交易。您仍希望使用卖家作为结果的键,但您希望进一步处理与卖家关联的交易,以便将它缩减为最大的交易。可以使用 groupingBy() 的替代版本,无需将每个键的元素收集到列表中,而是将它们提供给另一个收集器(downstream 收集器)。对于下游收集器,您可以选择 maxBy() 等缩减方法:

Map<Seller, Txn> biggestTxnBySeller = txns.stream()
                                          .collect(groupingBy(Txn::getSeller, maxBy(comparing(Txn::getAmount))));

在这里,您将交易分组到以卖家作为键的映射中,但该映射的值是使用 maxBy() 收集器收集该卖家的所有销售的结果。如果您不想要该卖家的最大交易,而想要总和,可以使用 summingInt() 收集器:

Map<Seller, Integer> salesBySeller = txns.stream()
                                         .collect(groupingBy(Txn::getSeller, summingInt(Txn::getAmount)));

要获得多级汇总结果,比如每个区域和卖家的销售,可以使用另一个 groupingBy 收集器作为下游收集器:

Map<Region, Map<Seller, Integer>> salesByRegionAndSeller = txns.stream()
                                                               .collect(groupingBy(Txn::getRegion, groupingBy(Txn::getSeller, summingInt(Txn::getAmount))));

举一个不同领域的例子:要计算一个文档中的词频直方图,可以使用 BufferedReader.lines() 将文档拆分为行,使用 Pattern.splitAsStream() 将它分解为一个单词流,然后使用 collect() 和 groupingBy() 创建一个 Map,后者的键是单词,值是这些单词的数量,如清单 3 所示。

清单 3. 使用 Streams 计算单词数量直方图

Pattern whitespace = Pattern.compile("\\s+");
Map<String, Integer> wordFrequencies = reader.lines()
                                             .flatMap(s -> whitespace.splitAsStream())
                                             .collect(groupingBy(String::toLowerCase), Collectors.counting());

自定义收集器

尽管 JDK 提供的标准的收集器集合非常大,但编写您自己的收集器也非常容易。Collector 接口(如清单 4 所示)非常简单。该接口通过 3 种类型来实现参数化:输入类型 T、累加器类型 A 和最终的返回类型 R(A 和 R 通常是相同的),这些方法返回的函数与之前演示的 collect() 3 参数版本所接受的函数类似。

清单 4. Collector 接口

public interface Collector<T, A, R> {
    /** Return a function that creates a new empty result container */
    Supplier<A> supplier();
    /** Return a function that incorporates an element into a container */
    BiConsumer<A, T> accumulator();
    /** Return a function that merges two result containers */
    BinaryOperator<A> combiner();
    /** Return a function that converts the intermediate result container
        into the final representation */
    Function<A, R> finisher();
    /** Special characteristics of this collector */
    Set<Characteristics> characteristics();
}

Collectors 中的大部分收集器工厂的实现都很简单。例如,toList() 的实现是:

return new CollectorImpl<>(ArrayList::new, List::add, (left, right) -> { left.addAll(right); return left; }, CH_ID);

此实现使用 ArrayList 作为结果容器,使用 add() 合并一个元素,并使用 addAll() 将一个列表合并到另一个中,通过这些特征表明它的完成函数是标识函数(这使得流框架可以优化执行)。

与之前看到的一样,一些一致性要求与缩减中的身份和累加器函数之间的限制类似。这些要求已在 Collector 的规范中列出。

作为一个更复杂的示例,可以考虑在数据集上创建汇总统计数据的问题。很容易使用缩减来计算数字数据集的总和、最小值、最大值或数量(而且您可以使用总和和数量来计算平均值)。在数据上,使用缩减在一轮计算中一次性计算所有这些结果更加困难。但您可以轻松地编写一个 Collector 来高效地(如果愿意,还可并行地)执行此计算。

Collectors 类包含一个 collectingInt() 工厂方法,该方法返回一个 IntSummaryStatistics,后者会执行您想要的准确操作,比如在一轮计算中计算 sum、min、max、count 和 average。IntSummaryStatistics 的实现很简单,而且您可以轻松地编写自己的类似收集器来计算任意数据汇总结果(或扩展此结果)。

清单 5 显示了 IntSummaryStatistics 类。实际实现包含更多细节(包含用于获取汇总统计数据的 getter),但它的核心是简单的 accept() 和 combine() 方法。

清单 5. summarizingInt() 收集器使用的 IntSummaryStatistics 类

public class IntSummaryStatistics implements IntConsumer {
    private long count;
    private long sum;
    private int min = Integer.MAX_VALUE;
    private int max = Integer.MIN_VALUE;

    public void accept(int value) {
        ++count;
        sum += value;
        min = Math.min(min, value);
        max = Math.max(max, value);
    }

    public void combine(IntSummaryStatistics other) {
        count += other.count;
        sum += other.sum;
        min = Math.min(min, other.min);
        max = Math.max(max, other.max);
    }

    // plus getters for count, sum, min, max, and average
}

如您所见,这是一个非常简单的类。在观察每个新数据元素时,会以各种方式更新各种汇总结果,而且会以各种方式组合两个 IntSummaryStatistics 持有者。Collectors.summarizingInt() 的实现(如清单 6 所示)同样很简单;它创建了一个 Collector,以便通过应用一个整数值来提取器函数,并将结果传递给 IntSummaryStatistics.accept() 来合并一个元素。

清单 6. summarizingInt() 收集器工厂

public static <T>
Collector<T, ?, IntSummaryStatistics> summarizingInt(ToIntFunction<? super T> mapper) {
    return new CollectorImpl<T, IntSummaryStatistics, IntSummaryStatistics>(
        IntSummaryStatistics::new,
        (r, t) -> r.accept(mapper.applyAsInt(t)),
        (l, r) -> { l.combine(r); return l; },
        CH_ID
    );
}

组合收集器的容易性(您在 groupingBy() 示例中已看到)和创建新收集器的容易性相结合,可以创建流数据的几乎任何汇总结果,同时保持您的代码紧凑而又清晰。

第 2 部分的小结

聚合工具是 Streams 库的最有用和灵活的部分之一。可以使用缩减操作来轻松地按顺序或并行聚合简单的值;更复杂的汇总结果可通过 collect() 创建。该库附带了一组简单的基本收集器,可以组合它们来执行更复杂的聚合,而且您可以轻松地将自己的收集器添加到组合中。

在 第 3 部分 中,将深入剖析 Streams 的内部结构,以便了解在性能至关重要时如何最高效地使用该库。

本文转载自:https://www.ibm.com/developerworks/cn/java/j-java-streams-2-brian-goetz/index.html

CasparLi
粉丝 15
博文 132
码字总数 26994
作品 0
成都
程序员
私信 提问
Java I/O Streams

原文:https://docs.oracle.com/javase/tutorial/essential/io/streams.html 前言 Java中基础的I/O知识包括两方面:I/O Streams 和 File I/O。这部分学习I/O Streams。 I/O Streams I/O Stre......

琚建飞
03/13
0
0
Reactive Programming 一种技术,各自表述

前言 作为一名 Java 开发人员,尤其是 Java 服务端工程师,对于 Reactive Programming 的概念似乎相对陌生。随着 Java 9 以及 Spring Framework 5 的相继发布,Reactive 技术逐渐开始被广大从...

小马哥mercyblitz
2018/07/23
0
0
Kafka Streams 剖析

1.概述   Kafka Streams 是一个用来处理流式数据的库,属于Java类库,它并不是一个流处理框架,和Storm,Spark Streaming这类流处理框架是明显不一样的。那这样一个库是做什么的,能应用到...

smartloli
2017/09/14
0
0
java9之Reactive Streams

Java 9 Reactive Streams允许我们实现非阻塞异步流处理。这是将响应式编程模型应用于核心java编程的重要一步。   如果您对响应式编程不熟悉,请阅读Reactive Manifesto并阅读Reactive Stre...

李玉长
04/11
65
0
Java 8 Stream.md

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/xidiancoder/article/details/82962933 文章目录 Java 8 Stream入门概述 工作比较忙,平时少有时间进行学习和...

zxca368
2018/10/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Eureka应用注册与集群数据同步源码解析

在之前的EurekaClient自动装配及启动流程解析一文中我们提到过,在构造DiscoveryClient类时,会把自身注册到服务端,本文就来分析一下这个注册流程 客户端发起注册 boolean register() t...

Java学习录
18分钟前
4
0
Java描述设计模式(15):责任链模式

本文源码:GitHub·点这里 || GitEE·点这里 一、生活场景描述 1、请假审批流程 公司常见的请假审批流程:请假天数 当 day<=3 天,项目经理审批当 3<day<=5 天,部门经理审批当 day>5 天...

知了一笑
29分钟前
6
0
总结:数组与链表

1、内存申请:数组在内存上是连续的空间;链表,内存地址上可以是不连续的。 2、查询速度:数组可以随机访问,链表必须顺序访问,即从首个元素开始遍历,逐个查找,所以数组查询很快。 3、写入...

浮躁的码农
37分钟前
6
0
HashMap源码分析

read

V丶zxw
56分钟前
5
0
Python字符串或JSON字符串转字典dict、列表list

有3种方法 1、使用ast模块 >>> import ast>>> s = '["test",1]'>>> ast.literal_eval(s)['test',1]>>> s = '{"test":1}'>>> ast.literal_eval(s){'test': 1} 2、eval函数,这个......

编程老陆
今天
7
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部