文档章节

基于HDFS的实时wordcount程序

别寒
 别寒
发布于 2017/08/02 14:41
字数 219
阅读 5
收藏 0
package cn.hhb.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

/**
 * Created by dell on 2017/8/2.
 */
public class HDFSWordCount {

    public static void main(String[] args) {

        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("HDFSWordCount").setMaster("local[2]")
                .set("spark.testing.memory", "2147480000");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        JavaDStream<String> lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir");

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        wordCounts.print();

        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}

scala版:

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by dell on 2017/8/2.
  */
object HDFSWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("HDFSWordCount")
      .setMaster("local[2]")
      .set("spark.testing.memory", "2147480000")
    val ssc = new StreamingContext(conf, Seconds(5))

    val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir")
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map((_, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 30
博文 271
码字总数 137605
作品 0
永州
程序员
7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序

输入DStream之基础数据源 HDFS文件 基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。 streamingContext.fileStream(data...

weixin_32265569
2017/11/16
0
0
SparkStreaming+Kafka 实现统计基于缓存的实时uv

我的原创地址:https://dongkelun.com/2018/06/25/KafkaUV/ 前言 本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实...

董可伦
07/12
0
0
SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

我的原创地址:https://dongkelun.com/2018/06/14/updateStateBykeyWordCount/ 前言 本文利用SparkStreaming和Kafka实现基于缓存的实时wordcount程序,什么意思呢,因为一般的SparkStreamin...

董可伦
07/12
0
0
11.updateStateByKey以及基于缓存的实时wordcount程序

updateStateByKey updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。 1、首先,要定义一个state,可以是任意的数据类型; 2、其次,要定义state更新函数—...

weixin_32265569
2017/11/18
0
0
写给大数据开发初学者的话 | 附教程

导读: 第一章:初识Hadoop 第二章:更高效的WordCount 第三章:把别处的数据搞到Hadoop上 第四章:把Hadoop上的数据搞到别处去 第五章:快一点吧,我的SQL 第六章:一夫多妻制 第七章:越来...

小数点
2017/12/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Android JNI 开发系列(九)JNI调用Java的静态方法&实例方法

JNI调用Java的静态方法&实例方法 package org.professor.jni.bean;import android.util.Log;/** * Created by peng on 2018/10/11. */ public class Person { /*C/CPP 调用......

蔡小鹏
31分钟前
4
0
Flink 原理与实现:Window 机制

Flink 认为 Batch 是 Streaming 的一个特例,所以 Flink 底层引擎是一个流式引擎,在上面实现了流处理和批处理。而窗口(window)就是从 Streaming 到 Batch 的一个桥梁。Flink 提供了非常完...

xtof
33分钟前
1
0
Fabric.js高级点的教程2--添加表格的方法

有时候我们先在画布上画一个表格类似一下设计软件那种。一方面作为参考线,一方面给人一直专业的赶脚。。。 先贴上一段网上(不是我写的)可以运行的代码 https://codepen.io/rodan8888/pen/e...

xmqywx
33分钟前
1
0
Java中Lambda表达式的使用

Java中Lambda表达式的使用 简介 (译者注:虽然看着很先进,其实Lambda表达式的本质只是一个"语法糖",由编译器推断并帮你转换包装为常规的代码,因此你可以使用更少的代码来实现同样的功能。本人...

DemonsI
37分钟前
2
0
深入理解Java中停止线程

一.停止线程会带来什么? 对于单线程中,停止单线程就是直接使用关键字return或者break,但是在停止多线程时是让线程在完成任务前去开启另外一条线程,必须放弃当前任务,而这个过程是不可预...

Ala6
46分钟前
23
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部