文档章节

Spark学习笔记-SparkStreaming简单用法

Endless2010
 Endless2010
发布于 2017/07/24 22:28
字数 367
阅读 11
收藏 0

简单应用

SparkStreaming把数据流分割成一个一个的小批次进行处理,下面的简单示例程序,每5秒钟从端口4567取数据打印出来,在spark-shell运行中

import org.apache.spark._
import org.apache.spark.streaming._  
//第二个参数为流数据分割为块的时间间隔
val ssc = new StreamingContext(sc, Seconds(5))
val stream =ssc.socketTextStream("localhost", 4567)
stream .print()
ssc.start()
ssc.awaitTermination()

输入图片说明

window操作

reduceByKeyAndWindow是一个window操作,后面两个参数为windowDuration和slideDuration,本例中窗口长度是6S,滑动间隔是4S,就是每隔4S统计前6S的数据,这样执行的话,相邻2次会有2S的数据重合。 测试程序每秒钟打印1到2次当前时间,Streaming统计每个时间出现次数

import org.apache.spark._
import org.apache.spark.streaming._  
val ssc = new StreamingContext(sc, Seconds(2))
val stream =ssc.socketTextStream("localhost", 4567)
val lines= stream.map(x=>(x,1)) //每秒随机产生几条记录,计算windows内的行数
//窗口长度以及滑动间隔,这两个参数值都必须是batch间隔的整数倍,就是2的整数倍
val linecount=lines.reduceByKeyAndWindow((a:Int,b:Int)=>(a + b),Seconds(6), Seconds(4))
linecount.print()
ssc.start()
ssc.awaitTermination()

输入图片说明 输入图片说明

测试程序

public class SocketAPP {
	public static void main(String[] args) {
		int index=1;
		try {
			ServerSocket serverSocket = new ServerSocket(4567);
			Socket clientSocket = serverSocket.accept();
			OutputStream outputStream = clientSocket.getOutputStream();
			PrintWriter writer = new PrintWriter(outputStream, true);
			Random rand=new Random();
			SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
			int times=50;
			while (times-->0) {
				String time=formatter.format(new Date());
				long seconds=System.currentTimeMillis();
				for(int i=0;i<1+rand.nextInt(2);++i){
					String message=time+" "+seconds;
					writer.println(message);
					System.out.println(message);
				}
				index++;
				TimeUnit.SECONDS.sleep(1);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

© 著作权归作者所有

Endless2010
粉丝 1
博文 36
码字总数 23027
作品 0
南京
程序员
私信 提问
Spark 数据分析导论-笔记

Spark Core Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 Spark Core 中还包含了 对弹性分布式数据集(resilient distributed dataset,简...

Java搬砖工程师
2018/12/26
28
0
Spark_Streaming源码解析之思维脑图

sparkStreaming源码思维脑图: 脑图详解: 此博文共分为四个部分: DAG定义 Job动态生成 数据的产生与导入 容错 脑图制作参考

freeli
2018/12/07
33
0
Spark Streaming流式处理

Spark Streaming介绍 Spark Streaming概述 Spark Streaming makes it easy to build scalable fault-tolerant streaming applications. 它可以非常容易的构建一个可扩展、具有容错机制的流式......

jiFeng丶
2018/07/26
0
0
Spark cluster 部署

Spark 框架 Spark与Storm的对比 对于Storm来说: 1、建议在那种需要纯实时,不能忍受1秒以上延迟的场景下使用,比如实时金融系统,要求纯实时进行金融交易和分析 2、此外,如果对于实时计算的...

meteor_hy
2018/06/27
0
0
[Spark]Spark Streaming 指南一 Example

1. 概述 Spark streaming是Spark核心API的一个扩展,它对实时流式数据的处理具有可扩展性、高吞吐量、可容错性等特点。数据可以从诸如Kafka,Flume,Kinesis或TCP套接字等许多源中提取,并且...

sjf0115
2017/03/01
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Taro 兼容 h5 踩坑指南

最近一周在做 Taro 适配 h5 端,过程中改改补补,好不酸爽。 本文记录📝遇到的问题,希望为有相同需求的哥们👬节约点时间。 Taro 版本:1.3.9。 解决跨域问题 h5 发请求会报跨域问题,需...

dkvirus
57分钟前
4
0
Spring boot 静态资源访问

0. 两个配置 spring.mvc.static-path-patternspring.resources.static-locations 1. application中需要先行的两个配置项 1.1 spring.mvc.static-path-pattern 这个配置项是告诉springboo......

moon888
今天
3
0
hash slot(虚拟桶)

在分布式集群中,如何保证相同请求落到相同的机器上,并且后面的集群机器可以尽可能的均分请求,并且当扩容或down机的情况下能对原有集群影响最小。 round robin算法:是把数据mod后直接映射...

李朝强
今天
4
0
Kafka 原理和实战

本文首发于 vivo互联网技术 微信公众号 https://mp.weixin.qq.com/s/bV8AhqAjQp4a_iXRfobkCQ 作者简介:郑志彬,毕业于华南理工大学计算机科学与技术(双语班)。先后从事过电子商务、开放平...

vivo互联网技术
今天
19
0
java数据类型

基本类型: 整型:Byte,short,int,long 浮点型:float,double 字符型:char 布尔型:boolean 引用类型: 类类型: 接口类型: 数组类型: Byte 1字节 八位 -128 -------- 127 short 2字节...

audience_1
今天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部