文档章节

Spark(十):Spark Streaming

berg-dm
 berg-dm
发布于 2016/06/08 21:59
字数 1012
阅读 207
收藏 3

一:Spark Streaming 概览。

1.1    简单了解 Spark Streaming。

         Spark Streaming 是核心 Spark API的一个扩展。具有可扩展性,高吞吐量,容错性,实时性等特征。

        数据从许多来如中摄入数据,如 Kafka, Flume, Twitter, ZeroMQ, Kinesis, or TCP sockets。

也可以使用复杂的算法与高级别的功能像map,reduce,join和window处理。

        最后,也可以将处理过的数据推送到文件系统、数据库。事实上,我们也可以用Spark 的机器学习和图形处理数据流上的算法。用图形表示如下:
        在内部,其工作原理如下。Spark Streaming接收实时输入的数据流和数据划分批次,然后由Spark引擎批处理生成的最终结果流。如图示:  

      另外,Spark Streaming提供一个高级抽象,称为离散的流或 DStream,表示连续的流的数据。DStreams 可以被创建从输入的数据流,如Kafka, Flume, and Kinesis,

        或采用其他的DStreams高级别操作的输入的数据流。

        在内部,DStream 是以 RDDs 的序列来表示。

首先,看看Maven的依赖包(spark-streaming_2.10)管理:

        <dependency>
			<groupId>org.apache.spark</groupId>
			<artifactId>spark-streaming_2.10</artifactId>
			<version>1.6.1</version>
		</dependency>

1.2    eg:从一个数据服务器监听 TCP 套接字接收的文本数据中的单词进行计数

package com.berg.spark.test5.streaming;

import java.util.Arrays;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaReceiverInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import scala.Tuple2;

public class SparkStreamingDemo1 {

	public static void main(String[] args) {

		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
		conf.set("spark.testing.memory", "269522560000");

		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
		System.out.println("jssc: " + jssc);

		// 创建一个DStream, 将连接 hostname:port, 比如 master:9999
		JavaReceiverInputDStream<String> lines = jssc.socketTextStream("master", 9999);
		System.out.println("lines : " + lines);

		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

			private static final long serialVersionUID = 1L;

			public Iterable<String> call(String x) {
				return Arrays.asList(x.split(" "));
			}
		});

		// Count each word in each batch
		JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});
		JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});

		// Print the first ten elements of each RDD generated in this DStream to
		// the console
		wordCounts.print();

		jssc.start();  // Start the computation
		jssc.awaitTermination();  // Wait for the computation to terminate
	}
}

至于代码如何运行了,首先在Linux下终端输入:$ nc -lk 9999

然后在Eclipse中运行代码 。

随意输入一行文本单词,单词之间用空格隔开,如下:

hadoop@master:~$ nc -lk 9999
berg hello world berg hello

可以在Eclipse控制台看到如下结果:

Time: 1465386060000 ms
-------------------------------------------
(hello,2)
(berg,2)
(world,1)

 

1.3 将HDFS目录下的某些文件内容当做 输入的数据流。

public class SparkStreamingDemo2 {

	public static void main(String[] args) {

		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount");
		conf.set("spark.testing.memory", "269522560000");

		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(10));
		System.out.println("jssc: " + jssc);

		// 创建一个DStream, 读取HDFS上的文件,作为数据源。
		JavaDStream<String> lines = jssc.textFileStream("hdfs://master:9000/txt/sparkstreaming/");

		System.out.println("lines : " + lines);

		// Split each line into words
		JavaDStream<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 1L;

			public Iterable<String> call(String x) {
				System.out.println(Arrays.asList(x.split(" ")).get(0));
				return Arrays.asList(x.split(" "));
			}
		});

		// Count each word in each batch
		JavaPairDStream<String, Integer> pairs = words.mapToPair(new PairFunction<String, String, Integer>() {
			private static final long serialVersionUID = 1L;

			public Tuple2<String, Integer> call(String s) {
				return new Tuple2<String, Integer>(s, 1);
			}
		});
		System.out.println(pairs);
		
		JavaPairDStream<String, Integer> wordCounts = pairs.reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer i1, Integer i2) {
				return i1 + i2;
			}
		});

		// Print the first ten elements of each RDD generated in this DStream to the console
		wordCounts.print();
		
		JavaDStream<Long> count = wordCounts.count();
		count.print(); // 统计
		
		DStream<Tuple2<String, Integer>>  dstream = wordCounts.dstream();
		dstream.saveAsTextFiles("hdfs://master:9000/bigdata/spark/xxxx", "sparkstreaming");
		//wordCounts.dstream().saveAsTextFiles("hdfs://master:9000/bigdata/spark/xxxx", "sparkstreaming");
		
		jssc.start();  
		jssc.awaitTermination();   // Wait for the computation to terminate
	}
}

上述代码完成的操作是,一直监听HDFS即hdfs://master:9000/txt/sparkstreaming/目录下是否有文件存入,如果有,则统计文件中的单词。。。。

尝试运行程序后,然后往该目录中手动添加一个文件,会在控制台看到对该文件内容中的单词统计后的数据。

注意参数的意义:


 public JavaDStream<java.lang.String> textFileStream(java.lang.String directory)
  Create an input stream that monitors a Hadoop-compatible filesystem for 
             new files and reads them as text 
                      files (using key as LongWritable, value as Text and input format as TextInputFormat).
                 Files must be written to the monitored directory 
                 by "moving" them from another location within the same file system. 
                 File names starting with . are ignored.
 Parameters:
 directory - HDFS directory to monitor for new file
 Returns:
 (undocumented)
 

参照官网 和 API学习。

http://spark.apache.org/docs/latest/api/java/index.html?org/apache/spark/streaming/api/java/JavaStreamingContext.html

© 著作权归作者所有

berg-dm
粉丝 26
博文 98
码字总数 88970
作品 0
深圳
程序员
私信 提问
Apache Spark 2.4.0 正式发布

Apache Spark 2.4 与昨天正式发布,Apache Spark 2.4 版本是 2.x 系列的第五个版本。 如果想及时了解 Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号: itebloghadoop Apache Spa...

Spark
2018/11/09
0
0
Spark(五) -- Spark Streaming介绍与基本执行过程

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/qq1010885678/article/details/45770881 Spark Streaming作为Spark上的四大子框架之一,肩负着实时流计算的重...

jchubby
2015/05/16
0
0
Spark cluster 部署

Spark 框架 Spark与Storm的对比 对于Storm来说: 1、建议在那种需要纯实时,不能忍受1秒以上延迟的场景下使用,比如实时金融系统,要求纯实时进行金融交易和分析 2、此外,如果对于实时计算的...

meteor_hy
2018/06/27
0
0
Spark—4(Spark核心组件)

1、Spark Streaming Spark Sreaming基于Spark Core实现了可扩展、高吞吐和容错的实时数据流处理。现在支持的数据源和处理后的结果存储如下图所示。 Spark Streaming将流式计算分解成一系列短...

叶枫啦啦
07/09
51
0
[Kafka与Spark集成系列一] Spark入门

版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 https://blog.csdn.net/u013256816/article/details/82081946 Spark是一个用来是实现快速而通用的集群计算的平台。Spark是UC ...

朱小厮
2018/08/26
0
0

没有更多内容

加载失败,请刷新页面

加载更多

秒杀系统思路

业务分析 技术挑战 请求响应要快:无论成功失败,需要尽快返回给用户 架构设计   前端:静态化   站点层:限制请求数   服务层:乐观锁写缓存   数据库CAP:读写高可用,一致性,扩容...

雷开你的门
24分钟前
10
0
最全的教育行业大数据解决方案,个个针对痛点

大数据的悄然兴起也带动了教育行业的革新,移动教育、云课堂等的出现,使得教育行业再次成为了可以中长期保持高景气的行业。然而,初涉数据领域的教育行业同时也面临着相当大的难题,还需要更...

朕想上头条
28分钟前
7
0
预约模块设计分析

1.预约功能描述: 预约是小程序中常见的一种商品管理系统,商家可根据商品或服务的特性,灵活设置预约细节,为用户提供线上预约服务,如场地预约,商品预定等,实现高效经营。 预约场景: ...

鱼煎
31分钟前
5
0
阿里云日志服务构建网站实时分析大盘实战

场景分析 挖掘数据价值是当前企业级网站共同面临的问题。买买网是一个电商平台网站,每天拥有大量的用户访问和购买记录。为了引导用户直接消费,提升购买率和转化率,不同的用户类别需要推荐...

阿里云官方博客
32分钟前
5
0
TL665xF-EasyEVM开发板硬件处理器、NAND FLASH、RAM

广州创龙结合TI KeyStone系列多核架构TMS320C665x及Xilinx Artix-7系列FPGA设计的TL665xF-EasyEVM开发板是一款DSP+FPGA高速大数据采集处理平台,其底板采用沉金无铅工艺的6层板设计,适用于高...

Tronlong创龙
35分钟前
11
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部