文档章节

SparkStreaming实时wordcount案例

别寒
 别寒
发布于 2017/08/02 14:01
字数 595
阅读 8
收藏 0
点赞 0
评论 0
package cn.hhb.spark.streaming;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;
import scala.actors.threadpool.Arrays;

/**
 * Created by dell on 2017/8/1.
 */
public class WordCount {
    public static void main(String[] args) {

        // 创建SparkConf
        SparkConf conf = new SparkConf()
                .setAppName("WordCount").setMaster("local[2]")
                .set("spark.testing.memory", "2147480000");

        // 创建JavaStreamingContext
        // 该对象除了接收sparkconf对象之外,还必须接收一个batch interval参数,
        // 就是说,每收集多长时间的数据,划分一个batch,进行处理
        // 这里设置一秒
        JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(1));

        // 首先创建DStream,代表了一个从数据源来的持续不断的实时数据流
        // 调用JavaStreamingContext的socketTextStream方法,可以创建一个数据源为socket网络端口的
        // 数据流,JavaReceiverInputDStream代表了一个输入的DStream
        // socketTextStream()方法接收两个基本参数,第一个是监听哪个主机上的端口,第二个是监听哪个端口
        JavaReceiverInputDStream<String> lines = jssc.socketTextStream("localhost", 9999);

        // 到这里为止,可以理解为JavaReceiverInputDStream中的,每隔一秒,会有一个RDD,其中封装了
        // 这一秒发送过来的数据,RDD的元素类型为String,即一行一行的文本,所以
        // 这里JavaReceiverInputDStream的泛型类型<String>,其实就代表了它底层的RDD的泛型类型

        // 开始对接收到的数据执行计算,使用sparkcore提供的算子,执行应用在dstream中即可
        // 在底层,实际上是会对DStream中的一个一个的rdd,执行我们应用在dstream上的算子
        // 产生的新RDD,会作为新DStream中的RDD
        JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
            @Override
            public Iterable<String> call(String s) throws Exception {
                return Arrays.asList(s.split(" "));
            }
        });

        // 这个时候,每秒的数据,一行一行的文本,就会被拆分为多个单词,words DStream中的RDD的元素类型
        // 即为一个一个的单词
        // 接着,开始进行flatmap、reduceByKey操作
        JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {
                return new Tuple2<String, Integer>(s, 1);
            }
        });

        JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        });

        // 到此为止,我们就实现了实时的wordcount程序
        // 每秒中发送到指定socket端口上的数据,都会被lines DStream接收到
        wordCounts.print();
        try {
            Thread.sleep(5000);
        } catch (InterruptedException e) {
            e.printStackTrace();
        }

        // 首先对JavaStreamingContext进行一下后续处理
        // 必须调用JavaStreamingContext的start方法,整个spark streaming application才会启动执行
        // 否则是不会执行的
        jssc.start();
        jssc.awaitTermination();
        jssc.close();
    }
}

© 著作权归作者所有

共有 人打赏支持
别寒
粉丝 30
博文 269
码字总数 136120
作品 0
永州
程序员
Spark Streaming

一、spark的野心: 1、取代mapreduce--->Batch Processing 2、spark Sql --->hive 3、stream processing --->strom 吞吐量比storm大,处理速度是storm的2到5倍,但是延迟是秒级别的 sparkstr......

captainliu
2016/07/24
34
0
第1课:通过案例对Spark Streaming透彻理解

一.SparkStreaming在线另类实验 如何清晰的看到数据的流入、被处理的过程?使用一个小技巧,通过调节放大BatchInterval的方式,来降低批处理次数,以方便看清楚各个环节。我们从已写过的广告...

天蓝一枫
2016/05/08
2.2K
0
【源码追踪】SparkStreaming 中用 Direct 方式每次从 Kafka 拉取多少条数据(offset取值范围)

我们知道 SparkStreaming 用 Direct 的方式拉取 Kafka 数据时,是根据 kafka 中的 fromOffsets 和 untilOffsets 来进行获取数据的,而 fromOffsets 一般都是需要我们自己管理的,而每批次的 ...

lin_wj1995
04/19
0
0
spark spark streaming + kafka direct方式消费消息

kafka + spark streaming direct方式消费消息 前提: spark 安装成功,spark 1.6.0 zookeeper 安装成功 kafka 安装成功 集群环境见:《0423_SparkStreaming数据源kafka解析和安装配置及测试实...

柯里昂
2016/04/25
959
0
11.updateStateByKey以及基于缓存的实时wordcount程序

updateStateByKey updateStateByKey操作,可以让我们为每个key维护一份state,并持续不断的更新该state。 1、首先,要定义一个state,可以是任意的数据类型; 2、其次,要定义state更新函数—...

weixin_32265569
2017/11/18
0
0
使用Flume+Kafka+SparkStreaming进行实时日志分析

每个公司想要进行数据分析或数据挖掘,收集日志、ETL都是第一步的,今天就讲一下如何实时地(准实时,每分钟分析一次)收集日志,处理日志,把处理后的记录存入Hive中,并附上完整实战代码 ...

Trigl
2017/05/24
0
0
SparkStreaming入门及例子

看书大概了解了下Streaming的原理,但是木有动过手啊。。。万事开头难啊,一个wordcount 2小时怎么都运行不出结果。是我太蠢了,好了言归正传。 SparkStreaming是一个批处理的流式计算框架,...

雪童子
2015/11/14
0
0
sparkStreaming wordcount累加

1.目的 对netcat输出的单词按时间5s分片进行单词计数 2.素材 启动linux上的netcat程序 nc -lk 9999 不断输入字符 hello world hi world 3.代码 4.输出 ------------------------------------...

JPblog
2016/08/19
4
0
第3课:通过案例对SparkStreaming 透彻理解三板斧之三:解密SparkStreaming

第一部份 课堂的第一部份是用IMF 晚上案例实战课的程序再运行一次,把数据再次输入数据库里面,从图一你可以看出里面有很多运行细节,例如receiver.ReceiverSupervisor,receiver.BlockManag...

jcchoiling
2016/05/09
56
0
大数据经典学习路线(及供参考)之 三

3.Storm实时计算部分阶段 实时课程分为两个部分:流式计算核心技术和流式计算计算案例实战。 1.流式计算核心技术 流式计算核心技术主要分为两个核心技术点:Storm和Kafka,学完此阶段能够掌握...

柯西带你学编程
05/22
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

spring-@RequestBody

@RequestMapping("/login")    public void login(@RequestBody String userName,@RequestBody String pwd){      System.out.println(userName+" :"+pwd);    }    ......

说回答
6分钟前
0
0
Redis安装

大家可以通过该链接获取安装详情(这是一个Word文档,支持下载): http://note.youdao.com/noteshare?id=7a327ed6c58fb2037ba537e58ecf7510&sub=480DB8EF349747C3983B73AE94D45BB1 其他参考...

一梦心草
6分钟前
0
0
MySQL按天,按周,按月,按时间段统计【转载】

https://blog.csdn.net/qq_28056641/article/details/78306870 select DATE_FORMAT(create_time,'%Y%m%d') days,count(caseid) count from tc_case group by days; select DATE_FORMAT(creat......

李道福
8分钟前
0
0
浅谈parallelStream

parallelStream是什么,它是一个集合的并发处理流.其作用是把一个集合中的数据分片,进行一个多线程的处理,增快运行速度. 比如说这样一段代码 private Set<SysRole> sysRoles;private Set<St...

算法之名
10分钟前
3
0
器者,道之所载

形而上者谓之道,形而下者谓之器,化而裁之谓之变;推而行之谓之通,举而措之天下之民,谓之事业。—— 《道德经》

了凡川
11分钟前
0
0
C#命名规范中文版/C#编码规范中文版

最新文档地址https://github.com/hiramtan/CSharpNamingGuidelines_Chinese C#命名规范中文版/C#编码规范中文版 示例 /*****************************************************************......

海贝Hibey
13分钟前
0
0
刚从eclipse转到Intellij IDEA,分享一些配置经验

刚从eclipse转到Intellij IDEA,分享一些配置经验,IntelliJ IDEA作为最好的Java开发工具,在智能代码助手、代码自动提示、重构、J2EE支持、Ant、JUnit、CVS整合、代码审查、 创新的GUI设计等...

舒文joven
14分钟前
1
0
lombok 引入后,测试类始终找不到get,set方法。

开发环境为idea,jdk1.7,maven3.5. 网上直接搜出来的方法有: 1、在setting里安装lombok的plugins; 2、如下图,勾选enable annocation processing选项 3、升级maven plugins插件 我尝试了以...

Kidult
20分钟前
0
0
Duang,HUAWEI DevEco IDE全面升级啦

想感受全新UI带来的视觉及交互体验、 HiKey970开发板调测、 HiAI API推荐和收藏、 深度AI模型分析等新功能, 体验高清晰度和流畅度的远程AI真机调测吗? 全新的UI设计 采用最优秀的视觉及交互...

华为终端开放实验室
29分钟前
1
0
阻止事件冒泡,阻止默认事件

1.event.stopPropagation()方法 这是阻止事件的冒泡方法,不让事件向documen上蔓延,但是默认事件任然会执行,当你掉用这个方法的时候,如果点击一个连接,这个连接仍然会被打开, 2.event....

闫亚亚
31分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部