文档章节

SparkStreaming实时wordcount案例

别寒
 别寒
发布于 2017/08/02 14:01
字数 595
阅读 8
收藏 0
package cn.hhb.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;

/**
 * Created by dell on 2017/8/1.
 */
public class WordCount {
    public static void main(String[] args) {

        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("WordCount").setMaster("local[2]")
                .set("spark.testing.memory", "2147480000");

        // 创建JavaStreamingContext
        // 该对象除了接收sparkconf对象之外,还必须接收一个batch interval参数,
        // 就是说,每收集多长时间的数据,划分一个batch,进行处理
        // 这里设置一秒
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        // 首先创建DStream,代表了一个从数据源来的持续不断的实时数据流
        // 调用JavaStreamingContext的socketTextStream方法,可以创建一个数据源为socket网络端口的
        // 数据流,JavaReceiverInputDStream代表了一个输入的DStream
        // socketTextStream()方法接收两个基本参数,第一个是监听哪个主机上的端口,第二个是监听哪个端口
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        // 到这里为止,可以理解为JavaReceiverInputDStream中的,每隔一秒,会有一个RDD,其中封装了
        // 这一秒发送过来的数据,RDD的元素类型为String,即一行一行的文本,所以
        // 这里JavaReceiverInputDStream的泛型类型<String>,其实就代表了它底层的RDD的泛型类型

        // 开始对接收到的数据执行计算,使用sparkcore提供的算子,执行应用在dstream中即可
        // 在底层,实际上是会对DStream中的一个一个的rdd,执行我们应用在dstream上的算子
        // 产生的新RDD,会作为新DStream中的RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });

        // 这个时候,每秒的数据,一行一行的文本,就会被拆分为多个单词,words DStream中的RDD的元素类型
        // 即为一个一个的单词
        // 接着,开始进行flatmap、reduceByKey操作
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 到此为止,我们就实现了实时的wordcount程序
        // 每秒中发送到指定socket端口上的数据,都会被lines DStream接收到
        wordCounts.print();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 首先对JavaStreamingContext进行一下后续处理
        // 必须调用JavaStreamingContext的start方法,整个spark streaming application才会启动执行
        // 否则是不会执行的
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 30
博文 271
码字总数 137605
作品 0
永州
程序员
私信 提问
SparkStreaming+Kafka 实现基于缓存的实时wordcount程序

我的原创地址:https://dongkelun.com/2018/06/14/updateStateBykeyWordCount/ 前言 本文利用SparkStreaming和Kafka实现基于缓存的实时wordcount程序,什么意思呢,因为一般的SparkStreamin...

董可伦
07/12
0
0
Spark Streaming

一、spark的野心: 1、取代mapreduce--->Batch Processing 2、spark Sql --->hive 3、stream processing --->strom 吞吐量比storm大,处理速度是storm的2到5倍,但是延迟是秒级别的 sparkstr......

captainliu
2016/07/24
34
0
第1课:通过案例对Spark Streaming透彻理解

一.SparkStreaming在线另类实验 如何清晰的看到数据的流入、被处理的过程?使用一个小技巧,通过调节放大BatchInterval的方式,来降低批处理次数,以方便看清楚各个环节。我们从已写过的广告...

天蓝一枫
2016/05/08
2.2K
0
Spark Streaming流式处理

Spark Streaming介绍 Spark Streaming概述 Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 它可以非常容易的构建一个可扩展、具有容错机制的流式......

jiFeng丶
07/26
0
0
SparkStreaming下Python报net.jpountz.lz4.LZ4BlockInputStream的解决

这几天在测试SparkStreaming,连接Kafka一直报这个错, 连接同事用java写进去的都OK,用client端或NiFi都抱着个错. 开始一直怀疑jar包版本的问题, 用了最新的spark-streaming-kafka-0-8-assembl...

seng
08/30
0
0

没有更多内容

加载失败,请刷新页面

加载更多

小程序异步操作 跨js执行 在微信小程序里面实现跨页面通信

我们知道,在小程序里面一个页面的变化,是通过调用 setData 函数来实现的。所以想做到在二级页面里让一级页面产生变化,最 Quick And Dirty 的做法就是把一级页面的 this 传入到二级页面去,...

xiaogg
16分钟前
1
0
授于管理员登录其它用户

1.沙盒中,授予管理员登录 安全性控制==>登录访问权限政策

在山的那边
18分钟前
2
0
线程安全的CopyOnWriteArrayList介绍

证明CopyOnWriteArrayList是线程安全的 先写一段代码证明CopyOnWriteArrayList确实是线程安全的。 ReadThread.java import java.util.List; public class ReadThread implements Runnable {......

绝地逢生
20分钟前
1
0
Java重写的7个规则

几年前你可能会遇到这样一个面试题:“重写和重载的区别”、而现在随着科技的更迭、面试的问题越来越高级、面试官的问题也越来越深入、此文是上述面试题的一个延伸、让你从简单的重写规则中更...

architect刘源源
20分钟前
2
0
JavaScript异步编程:Generator与Async

从Promise开始,JavaScript就在引入新功能,来帮助更简单的方法来处理异步编程,帮助我们远离回调地狱。 Promise是下边要讲的Generator/yield与async/await的基础,希望你已经提前了解了它。...

前端攻城老湿
21分钟前
10
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部