文档章节

SparkStreaming 读取Hdfs

momisabuilder
 momisabuilder
发布于 2016/07/29 15:31
字数 145
阅读 69
收藏 0

一、代码

SparkConf conf = new SparkConf().setAppName("spark streaming tst").setMaster("local");

JavaStreamingContext javaStreamingContext = new JavaStreamingContext(conf, Durations.seconds(60));


//TODO 切记这是目录 目录 目录 然后动态的往里面加文件
JavaDStream<String> wordRDD = javaStreamingContext.textFileStream("/lwj/second/");


JavaPairDStream<String, Integer> wordsRDD = wordRDD.mapToPair(new PairFunction<String, String, Integer>() {
            @Override
            public Tuple2<String, Integer> call(String s) throws Exception {

                return new Tuple2<String, Integer>(s, 1);

            }
        }).reduceByKey(new Function2<Integer, Integer, Integer>() {
            @Override
            public Integer call(Integer v1, Integer v2) throws Exception {
                return v1 + v2;
            }
        }).persist(StorageLevel.MEMORY_ONLY());


        wordsRDD.print();

        javaStreamingContext.start();

        javaStreamingContext.awaitTermination();

二、结果

三、注意事项

HDFS:一定要是目录、其次是目录下的文件动态添加、可以在比如./bin/hadoop fs -put a.txt b.txt c.txt /lwj/second/

 

 

© 著作权归作者所有

共有 人打赏支持
momisabuilder

momisabuilder

粉丝 14
博文 66
码字总数 31440
作品 0
西安
程序员
私信 提问
使用Flume+Kafka+SparkStreaming进行实时日志分析

每个公司想要进行数据分析或数据挖掘,收集日志、ETL都是第一步的,今天就讲一下如何实时地(准实时,每分钟分析一次)收集日志,处理日志,把处理后的记录存入Hive中,并附上完整实战代码 ...

Trigl
2017/05/24
0
0
Spark Streaming源码解析之容错

---title: sparkStreaming源码解析之容错subtitle: sparkStream的数据容错机制description: sparkStream的数据容错思维脑图keywords: [spark,streaming,源码,容错]date: 2018-12-09tags: [s......

freeli
2018/12/07
0
0
对spark中RDD的partition通俗易懂的介绍

我们要想对spark中RDD的分区进行一个简单的了解的话,就不免要先了解一下hdfs的前世今生。 众所周知,hdfs是一个非常不错的分布式文件系统,这是这么多年来大家有目共睹的。 hdfs文件为分布式...

飞叔Brother
2018/06/06
0
0
Spark Streaming流式处理

Spark Streaming介绍 Spark Streaming概述 Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 它可以非常容易的构建一个可扩展、具有容错机制的流式......

jiFeng丶
2018/07/26
0
0
sparkStreaming基本概念

概述 Spark Streaming 是 Spark Core API 的扩展, 它支持弹性的, 高吞吐的, 容错的实时数据流的处理. 数据可以通过多种数据源获取, 例如 Kafka, Flume, Kinesis 以及 TCP sockets, 也可以通过...

freeli
2018/11/20
0
0

没有更多内容

加载失败,请刷新页面

加载更多

C++ vector和list的区别

1.vector数据结构 vector和数组类似,拥有一段连续的内存空间,并且起始地址不变。 因此能高效的进行随机存取,时间复杂度为o(1); 但因为内存空间是连续的,所以在进行插入和删除操作时,会造...

shzwork
今天
3
0
Spring之invokeBeanFactoryPostProcessors详解

Spring的refresh的invokeBeanFactoryPostProcessors,就是调用所有注册的、原始的BeanFactoryPostProcessor。 相关源码 public static void invokeBeanFactoryPostProcessors(Configu......

cregu
昨天
4
0
ibmcom/db2express-c_docker官方使用文档

(DEPRECIATED) Please check DB2 Developer-C Edition for the replacement. What is IBM DB2 Express-C ? ``IBM DB2 Express-C``` is the no-charge community edition of DB2 server, a si......

BG2KNT
昨天
3
0
Ubuntu 18.04.2 LTS nvidia-docker2 : 依赖: docker-ce (= 5:18.09.0~3-0~ubuntu-bionic)

平台:Ubuntu 18.04.2 LTS nvidia-docker2 版本:2.0.3 错误描述:在安装nvidia-docker2的时候报dpkg依赖错误 nvidia-docker2 : 依赖: docker-ce (= 5:18.09.0~3-0~ubuntu-bionic) 先看一下依......

Pulsar-V
昨天
4
0
学习笔记1-goland结构体(struct)

写在前面:若有侵权,请发邮件by.su@qq.com告知。 转载者告知:如果本文被转载,但凡涉及到侵权相关事宜,转载者需负责。请知悉! 本文永久更新地址:https://my.oschina.net/bysu/blog/3036...

不最醉不龟归
昨天
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部