文档章节

基于HDFS的实时wordcount程序

别寒
 别寒
发布于 2017/08/02 14:41
字数 219
阅读 6
收藏 0
package cn.hhb.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

import java.util.Arrays;

/**
 * Created by dell on 2017/8/2.
 */
public class HDFSWordCount {

    public static void main(String[] args) {

        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("HDFSWordCount").setMaster("local[2]")
                .set("spark.testing.memory", "2147480000");

        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        JavaDStream<String> lines = jssc.textFileStream("hdfs://spark1:9000/wordcount_dir");

        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });

        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        wordCounts.print();

        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}

scala版:

package com.hhb.spark.streaming

import org.apache.spark.SparkConf
import org.apache.spark.streaming.{Seconds, StreamingContext}

/**
  * Created by dell on 2017/8/2.
  */
object HDFSWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("HDFSWordCount")
      .setMaster("local[2]")
      .set("spark.testing.memory", "2147480000")
    val ssc = new StreamingContext(conf, Seconds(5))

    val lines = ssc.textFileStream("hdfs://spark1:9000/wordcount_dir")
    val words = lines.flatMap(_.split(" "))
    val pairs = words.map((_, 1))
    val wordCounts = pairs.reduceByKey(_ + _)

    wordCounts.print()

    ssc.start()
    ssc.awaitTermination()

  }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 29
博文 271
码字总数 137605
作品 0
永州
程序员
私信 提问
7.Spark Streaming:输入DStream之基础数据源以及基于HDFS的实时wordcount程序

输入DStream之基础数据源 HDFS文件 基于HDFS文件的实时计算,其实就是,监控一个HDFS目录,只要其中有新文件出现,就实时处理。相当于处理实时的文件流。 streamingContext.fileStream(data...

weixin_32265569
2017/11/16
0
0
SparkStreaming+Kafka 实现统计基于缓存的实时uv

我的原创地址:https://dongkelun.com/2018/06/25/KafkaUV/ 前言 本文利用SparkStreaming+Kafka实现实时的统计uv,即独立访客,一个用户一天内访问多次算一次,这个看起来要对用户去重,其实...

董可伦
2018/07/12
0
0
SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

我的原创地址:https://dongkelun.com/2018/06/14/updateStateBykeyWordCount/ 前言 本文利用SparkStreaming和Kafka实现基于缓存的实时wordcount程序,什么意思呢,因为一般的SparkStreamin...

董可伦
2018/07/12
0
0
11.updateStateByKey以及基于缓存的实时wordcount程序

updateStateByKey updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。 1、首先,要定义一个state,可以是任意的数据类型; 2、其次,要定义state更新函数—...

weixin_32265569
2017/11/18
0
0
写给大数据开发初学者的话 | 附教程

导读: 第一章:初识Hadoop 第二章:更高效的WordCount 第三章:把别处的数据搞到Hadoop上 第四章:把Hadoop上的数据搞到别处去 第五章:快一点吧,我的SQL 第六章:一夫多妻制 第七章:越来...

小数点
2017/12/07
0
0

没有更多内容

加载失败,请刷新页面

加载更多

c++ 定义新的异常

#include <iostream> #include <exception> using namespace std; struct MyException : public exception { const char * what () const throw () { return "C++ Exception"; } }; int main......

天王盖地虎626
40分钟前
1
0
PDMan-2.1.1 发布:用心开源,免费的国产数据库建模工具(春节前最后一个版本)

一、软件介绍 PDMan 是一款开源免费的数据库模型建模工具,是PowerDesigner之外另一种更好的选择。支持Windows,Mac,Linux等操作系统,具有上手容易,使用简单的特点。 2018年获得码云GVP (Gi...

O龙猫O
今天
12
0
OSChina 周二乱弹 —— 以后我偷小鱼干养你

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @庞巴哥 :只有这节奏瞬间变得轻松。。。。。。。。。分享Talking Eyes的单曲《In the sun (Extended Version)》: 《In the sun (Extended Ve...

小小编辑
今天
133
6
多表查询

第1章 多表关系实战 1.1 实战1:省和市  方案1:多张表,一对多  方案2:一张表,自关联一对多 1.2 实战2:用户和角色 (比如演员和扮演人物)  多对多关系 1.3 实战3:角色和权限 (比如...

stars永恒
今天
9
0
求推广,德邦快递坑人!!!!

完全没想好怎么来吐槽自己这次苦逼的德邦物流过程了,只好来记一个流水账。 从寄快递开始: 2019年1月15日从 德邦物流 微信小app上下单,截图如下: 可笑的是什么,我预约的是17号上门收件,...

o0无忧亦无怖
昨天
13
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部