Spark学习笔记-SparkStreaming简单用法

原创
2017/07/24 22:28
阅读数 12

简单应用

SparkStreaming把数据流分割成一个一个的小批次进行处理,下面的简单示例程序,每5秒钟从端口4567取数据打印出来,在spark-shell运行中

import org.apache.spark._
import org.apache.spark.streaming._  
//第二个参数为流数据分割为块的时间间隔
val ssc = new StreamingContext(sc, Seconds(5))
val stream =ssc.socketTextStream("localhost", 4567)
stream .print()
ssc.start()
ssc.awaitTermination()

输入图片说明

window操作

reduceByKeyAndWindow是一个window操作,后面两个参数为windowDuration和slideDuration,本例中窗口长度是6S,滑动间隔是4S,就是每隔4S统计前6S的数据,这样执行的话,相邻2次会有2S的数据重合。 测试程序每秒钟打印1到2次当前时间,Streaming统计每个时间出现次数

import org.apache.spark._
import org.apache.spark.streaming._  
val ssc = new StreamingContext(sc, Seconds(2))
val stream =ssc.socketTextStream("localhost", 4567)
val lines= stream.map(x=>(x,1)) //每秒随机产生几条记录,计算windows内的行数
//窗口长度以及滑动间隔,这两个参数值都必须是batch间隔的整数倍,就是2的整数倍
val linecount=lines.reduceByKeyAndWindow((a:Int,b:Int)=>(a + b),Seconds(6), Seconds(4))
linecount.print()
ssc.start()
ssc.awaitTermination()

输入图片说明 输入图片说明

测试程序

public class SocketAPP {
	public static void main(String[] args) {
		int index=1;
		try {
			ServerSocket serverSocket = new ServerSocket(4567);
			Socket clientSocket = serverSocket.accept();
			OutputStream outputStream = clientSocket.getOutputStream();
			PrintWriter writer = new PrintWriter(outputStream, true);
			Random rand=new Random();
			SimpleDateFormat formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
			int times=50;
			while (times-->0) {
				String time=formatter.format(new Date());
				long seconds=System.currentTimeMillis();
				for(int i=0;i<1+rand.nextInt(2);++i){
					String message=time+" "+seconds;
					writer.println(message);
					System.out.println(message);
				}
				index++;
				TimeUnit.SECONDS.sleep(1);
			}
		} catch (Exception e) {
			e.printStackTrace();
		}
	}
}

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部