文档章节

SparkStreaming实时wordcount案例

别寒
 别寒
发布于 2017/08/02 14:01
字数 595
阅读 8
收藏 0
package cn.hhb.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;

/**
 * Created by dell on 2017/8/1.
 */
public class WordCount {
    public static void main(String[] args) {

        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("WordCount").setMaster("local[2]")
                .set("spark.testing.memory", "2147480000");

        // 创建JavaStreamingContext
        // 该对象除了接收sparkconf对象之外,还必须接收一个batch interval参数,
        // 就是说,每收集多长时间的数据,划分一个batch,进行处理
        // 这里设置一秒
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        // 首先创建DStream,代表了一个从数据源来的持续不断的实时数据流
        // 调用JavaStreamingContext的socketTextStream方法,可以创建一个数据源为socket网络端口的
        // 数据流,JavaReceiverInputDStream代表了一个输入的DStream
        // socketTextStream()方法接收两个基本参数,第一个是监听哪个主机上的端口,第二个是监听哪个端口
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        // 到这里为止,可以理解为JavaReceiverInputDStream中的,每隔一秒,会有一个RDD,其中封装了
        // 这一秒发送过来的数据,RDD的元素类型为String,即一行一行的文本,所以
        // 这里JavaReceiverInputDStream的泛型类型<String>,其实就代表了它底层的RDD的泛型类型

        // 开始对接收到的数据执行计算,使用sparkcore提供的算子,执行应用在dstream中即可
        // 在底层,实际上是会对DStream中的一个一个的rdd,执行我们应用在dstream上的算子
        // 产生的新RDD,会作为新DStream中的RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });

        // 这个时候,每秒的数据,一行一行的文本,就会被拆分为多个单词,words DStream中的RDD的元素类型
        // 即为一个一个的单词
        // 接着,开始进行flatmap、reduceByKey操作
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 到此为止,我们就实现了实时的wordcount程序
        // 每秒中发送到指定socket端口上的数据,都会被lines DStream接收到
        wordCounts.print();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 首先对JavaStreamingContext进行一下后续处理
        // 必须调用JavaStreamingContext的start方法,整个spark streaming application才会启动执行
        // 否则是不会执行的
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 30
博文 271
码字总数 137605
作品 0
永州
程序员
SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

我的原创地址:https://dongkelun.com/2018/06/14/updateStateBykeyWordCount/ 前言 本文利用SparkStreaming和Kafka实现基于缓存的实时wordcount程序,什么意思呢,因为一般的SparkStreamin...

董可伦
07/12
0
0
Spark Streaming

一、spark的野心: 1、取代mapreduce--->Batch Processing 2、spark Sql --->hive 3、stream processing --->strom 吞吐量比storm大,处理速度是storm的2到5倍,但是延迟是秒级别的 sparkstr......

captainliu
2016/07/24
34
0
第1课:通过案例对Spark Streaming透彻理解

一.SparkStreaming在线另类实验 如何清晰的看到数据的流入、被处理的过程?使用一个小技巧,通过调节放大BatchInterval的方式,来降低批处理次数,以方便看清楚各个环节。我们从已写过的广告...

天蓝一枫
2016/05/08
2.2K
0
Spark Streaming流式处理

Spark Streaming介绍 Spark Streaming概述 Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 它可以非常容易的构建一个可扩展、具有容错机制的流式......

jiFeng丶
07/26
0
0
SparkStreaming下Python报net.jpountz.lz4.LZ4BlockInputStream的解决

这几天在测试SparkStreaming,连接Kafka一直报这个错, 连接同事用java写进去的都OK,用client端或NiFi都抱着个错. 开始一直怀疑jar包版本的问题, 用了最新的spark-streaming-kafka-0-8-assembl...

seng
08/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

arts-week10

Algorithm 905. Sort Array By Parity - LeetCode Review Who’s Afraid of the Big Bad Preloader? 一文读懂前端缓存 一个网络请求3个步骤:请求,处理,响应,而前端缓存主要在请求处响应这两步...

yysue
53分钟前
0
0
00.编译OpenJDK-8u40的整个过程

前言 历经2天的折腾总算把OpenJDK给编译成功了,要说为啥搞这个,还得从面试说起,最近出去面试经常被问到JVM的相关东西,总感觉自己以前学的太浅薄,所以回来就打算深入学习,目标把《深入理...

凌晨一点
今天
4
0
python: 一些关于元组的碎碎念

初始化元组的时候,尤其是元组里面只有一个元素的时候,会出现一些很蛋疼的情况: def checkContentAndType(obj): print(obj) print(type(obj))if __name__=="__main__": tu...

Oh_really
昨天
6
2
jvm crash分析工具

介绍一款非常好用的jvm crash分析工具,当jvm挂掉时,会产生hs_err_pid.log。里面记录了jvm当时的运行状态以及错误信息,但是内容量比较庞大,不好分析。所以我们要借助工具来帮我们。 Cras...

xpbob
昨天
126
0
Qt编写自定义控件属性设计器

以前做.NET开发中,.NET直接就集成了属性设计器,VS不愧是宇宙第一IDE,你能够想到的都给你封装好了,用起来不要太爽!因为项目需要自从全面转Qt开发已经6年有余,在工业控制领域,有一些应用...

飞扬青云
昨天
6
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部