文档章节

Spark Streaming(5):Spark Window function in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/10 17:58
字数 287
阅读 27
收藏 0

#程序员薪资揭榜#你做程序员几年了?月薪多少?发量还在么?>>>

首先,看下window函数的图解:

下面这个代码是计算一分钟之内的单词数量统计,每两秒获取一次数据,同时处理数据时间也是两秒,窗口大小为1分钟

1.数据源

package com.ssm.test;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketTest {

	public static void main(String[] args) {
		try{
            ServerSocket server=new ServerSocket(9999);
            System.out.println("The Server has started");
            
            Socket socket=server.accept();
            System.out.println("There was a socket client connected");
            
            BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
            String line=br.readLine();
            PrintWriter writer=new PrintWriter(socket.getOutputStream());
            
            while(!line.equals("end")){
            	writer = new PrintWriter(socket.getOutputStream());
    			writer.println("If you are not brave enough, no one will back you up.");
                writer.flush();
                System.out.println(System.currentTimeMillis() + "\nServer send:\t"+line);
                Thread.sleep(2000);
            }
            writer.close();
            socket.close();
            server.close();
        }catch(Exception e) {
        	e.printStackTrace();
        }
	}
}

2.spark streaming处理代码

package com.ssm.test;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class WordsCountTest {

	public static void main(String[] args) throws Exception {
		
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordsCount");
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
		JavaDStream<String> lines = jssc.socketTextStream("127.0.0.1", 9999).window(new Duration(60000));
		
		lines.flatMap(new FlatMapFunction<String, String>(){
			public Iterator<String> call(String x){
				return Arrays.asList(x.split(" ")).iterator();
			}
		}).mapToPair(new PairFunction<String, String, Integer>(){
			public Tuple2<String, Integer> call(String s){
				return new Tuple2<String, Integer>(s, 1);
			}
		}).reduceByKey(new Function2<Integer, Integer, Integer>(){
			public Integer call(Integer i1, Integer i2){
				return i1+i2;
			}
		}).print();
		
		jssc.start();
		jssc.awaitTermination();

	}

}

 

© 著作权归作者所有

Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
私信 提问
加载中

评论(0)

spark streaming基本概念一

在学习spark streaming时,建议先学习和掌握RDD。spark streaming无非是针对流式数据处理这个场景,在RDD基础上做了一层封装,简化流式数据处理过程。 spark streaming 引入一些新的概念和方...

osc_yo4ttf5q
2018/03/06
2
0
Spark(一)—— 大数据处理入门

一、Spark介绍 Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that suppor......

osc_joco7w0s
04/16
3
0
【Spark】SparkStreaming-Kafka-Redis-集成-基础参考资料

SparkStreaming-Kafka-Redis-集成-基础参考资料 Overview - Spark 2.2.0 Documentation Spark Streaming + Kafka Integration Guide - Spark 2.2.0 Documentation Spark Streaming + Kafka I......

osc_qvqynsub
2018/01/11
5
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
5.6K
0
【Spark】SparkStreaming-Kafka-集成-终极参考资料

SparkStreaming-Kafka-集成-终极参考资料 Spark Streaming和Kafka整合开发指南(二) – 过往记忆 Streamingkafka零丢失 | 等英博客 spark-streaming 读取kafka数据不丢失(一) | 等英博客 sp...

osc_qvqynsub
2018/01/11
6
0

没有更多内容

加载失败,请刷新页面

加载更多

PHP实战之文件上传与下载

目录 1. 前言 2.代码实战 2.1客户端页面配置说明 2.2 $_FILES预定义变量解析 2.3文件的移动方式 2.3.1第一种移动形式 2.3.2第二种移动形式 2.4文件上传配置及解析 2.5 错误信息说明 3. 文件上...

六道木
39分钟前
34
0
rebar3 的使用

安装 $ git clone https://github.com/erlang/rebar3.git$ cd rebar3$ ./bootstrap $ ./rebar3 local install===> Extracting rebar3 libs to ~/.cache/rebar3/lib...===> Writing r......

SummerGao
41分钟前
20
0
聊聊nifi的AbstractBinlogTableEventWriter

序 本文主要研究一下nifi的AbstractBinlogTableEventWriter AbstractBinlogTableEventWriter nifi-1.11.4/nifi-nar-bundles/nifi-cdc/nifi-cdc-mysql-bundle/nifi-cdc-mysql-processors/src......

go4it
42分钟前
18
0
如何解决Git中的合并冲突 - How to resolve merge conflicts in Git

问题: 如何解决Git中的合并冲突? 解决方案: 参考一: https://stackoom.com/question/g5t/如何解决Git中的合并冲突 参考二: https://oldbug.net/q/g5t/How-to-resolve-merge-conflicts-...

fyin1314
45分钟前
23
0
最常用的linux命令

查看磁盘挂载情况: df -h 查看当前目录下每个文件夹的大小 du -lh --max-depth=1 清空特定文件root >root 查看安装的linux发型版本 cat /proc/version *******lsb_release -a uname --m 查看...

fairy1674
49分钟前
22
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部