文档章节

基于HDFS的实时wordcount程序

别寒
 别寒
发布于 2017/08/02 14:41
字数 219
阅读 26
收藏 0

钉钉、微博极速扩容黑科技,点击观看阿里云弹性计算年度发布会!>>>

package cn.hhb.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

/**
 * Created by dell on 2017/8/2.
 */
public class HDFSWordCount {

    public static void main(String[] args) {

        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("HDFSWordCount").setMaster("local[2]")
                .set("spark.testing.memory", "2147480000");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        JavaDStream<String> lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir");

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        wordCounts.print();

        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}

scala版:

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by dell on 2017/8/2.
  */
object HDFSWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("HDFSWordCount")
      .setMaster("local[2]")
      .set("spark.testing.memory", "2147480000")
    val ssc = new StreamingContext(conf, Seconds(5))

    val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir")
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map((_, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

别寒
粉丝 30
博文 273
码字总数 155300
作品 0
永州
程序员
私信 提问
加载中
请先登录后再评论。
7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序

输入DStream之基础数据源 HDFS文件 基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。 streamingContext.fileStream(data...

weixin_32265569
2017/11/16
0
0
7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序

输入DStream之基础数据源 HDFS文件 基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。 Spark Streaming会监视指定的HDFS目...

独立小桥风满袖
06/23
0
0
SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

我的原创地址:https://dongkelun.com/2018/06/14/updateStateBykeyWordCount/ 前言 本文利用SparkStreaming和Kafka实现基于缓存的实时wordcount程序,什么意思呢,因为一般的SparkStreamin...

董可伦
2018/07/12
0
0
SparkStreaming+Kafka 实现统计基于缓存的实时uv

我的原创地址:https://dongkelun.com/2018/06/25/KafkaUV/ 前言 本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实...

董可伦
2018/07/12
0
0
SparkStreaming+Kafka 实现统计基于缓存的实时uv

版权声明:本文由董可伦首发于https://dongkelun.com,非商业转载请注明作者及原创出处。商业转载请联系作者本人。 https://blog.csdn.net/dkl12/article/details/80943294 我的原创地址:h...

董可伦
2018/07/06
0
0

没有更多内容

加载失败,请刷新页面

加载更多

JDK路径设置无效问题解决

JDK 配置环境无效的两种情况 第 ① 种:输入java -version,显示:'java' 不是内部或外部命令,也不是可运行的程序或批处理文件。这个问题一般出现在电脑第一次配置环境的时候。 第 ② 种:输...

osc_s3ka07m5
28分钟前
27
0
代理模式

一 代理模式简介 代理(Proxy)是一种设计模式 提供了对目标对象另外的访问方式 代理对象代理目标对象 达到增强目标对象功能的目的 二 静态代理 需要定义接口或者父类 代理对象与目标对象一起实...

osc_5w65ebjo
29分钟前
23
0
软件测试基础自学之测试基础理论,先看完这篇你再做测试

第一章、系统测试之测试基础 软件测试不只只只是测试源代码 软件测试人交流社群313782132,内有学习资料、面试技巧、内推机会。 1、测试定义: 通过人工或自动的手段, 对被测对象进行检阅的...

osc_aowxrpuv
32分钟前
17
0
软件测试基础之手工测试,你能想到的都在这

手工测试是传统的测试方法,由测试人员手工编写测试用例、执行、观察结果。软件测试中发现问题最多的都是手工测试,占整个项目的百分之九十五左右,所以说手工测试是软件测试基础。但手工测试...

osc_sfl7wfr9
33分钟前
23
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部