Spark的JavaWordCount例子

原创
2017/03/04 14:24
阅读数 437

JavaWordCount的例子很简单,就是利用spark的API统计单词出现的次数。网络上很多人都拿它来当案例。但是,我没看到有人解释spark的API怎么做并发处理的细节。于是,我就自己动手做实验,增加了一些打印信息。果然,看到了多线程并发处理的情况。Spark程序不是串行地处理,而是在收集了两行数据之后就开始多线程并发处理。有些和CUDA程序类似。

JAVA的主要代码:

//处理每一行数据的函数

JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

      @Override

      public Iterator<String> call(String s) {

      System.out.println("FlatMapFunction s="+s);

        return Arrays.asList(SPACE.split(s)).iterator();

      }

    });

//分割每一行成多个key

    JavaPairRDD<String, Integer> ones = words.mapToPair(

      new PairFunction<String, String, Integer>() {

        @Override

        public Tuple2<String, Integer> call(String s) {

        System.out.println("PairFunction s="+s);

          return new Tuple2<>(s, 1);

        }

      });

//根据key归约统计数字

    JavaPairRDD<String, Integer> counts = ones.reduceByKey(

      new Function2<Integer, Integer, Integer>() {

        @Override

        public Integer call(Integer i1, Integer i2) {

        System.out.println("reduceByKey count="+(i1+i2));

          return i1 + i2;

        }

      });

整段代码看起来像是串行执行的,实际上是并发执行的。

 

需要统计的文件内容:

good bad cool

hadoop spark mlib good

good spark mlib spark

cool spark bad

 

JavaWordCount的运行输出:

FlatMapFunction s=good bad cool

PairFunction s=good

PairFunction s=bad

PairFunction s=cool

FlatMapFunction s=hadoop spark mlib good

PairFunction s=hadoop

PairFunction s=spark

PairFunction s=mlib

PairFunction s=good

reduceByKey count=2

FlatMapFunction s=good spark mlib spark

PairFunction s=good

reduceByKey count=3

PairFunction s=spark

reduceByKey count=2

PairFunction s=mlib

reduceByKey count=2

PairFunction s=spark

reduceByKey count=3

FlatMapFunction s=cool spark bad

PairFunction s=cool

reduceByKey count=2

PairFunction s=spark

reduceByKey count=4

PairFunction s=bad

reduceByKey count=2

 

从上面的打印信息可以看到,当FlatMapFunction处理完第二行数据的时候,reduceByKey线程就开始工作了。直接进入并发模式,不需要等到FlatMapFunction处理完所有的行。这样计算的效率大幅提高,节省了很多等待时间。

值得注意的是,hadoop这个单词只出现了一次,结果它没有触发reduceByKey的call函数调用。说明这个call函数是针对两个输入数据进行归约处理的。

 

展开阅读全文
加载中
点击引领话题📣 发布并加入讨论🔥
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部