文档章节

Spark Streaming(5):Spark Window function in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/10 17:58
字数 287
阅读 7
收藏 0

首先,看下window函数的图解:

下面这个代码是计算一分钟之内的单词数量统计,每两秒获取一次数据,同时处理数据时间也是两秒,窗口大小为1分钟

1.数据源

package com.ssm.test;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketTest {

	public static void main(String[] args) {
		try{
            ServerSocket server=new ServerSocket(9999);
            System.out.println("The Server has started");
            
            Socket socket=server.accept();
            System.out.println("There was a socket client connected");
            
            BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
            String line=br.readLine();
            PrintWriter writer=new PrintWriter(socket.getOutputStream());
            
            while(!line.equals("end")){
            	writer = new PrintWriter(socket.getOutputStream());
    			writer.println("If you are not brave enough, no one will back you up.");
                writer.flush();
                System.out.println(System.currentTimeMillis() + "\nServer send:\t"+line);
                Thread.sleep(2000);
            }
            writer.close();
            socket.close();
            server.close();
        }catch(Exception e) {
        	e.printStackTrace();
        }
	}
}

2.spark streaming处理代码

package com.ssm.test;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class WordsCountTest {

	public static void main(String[] args) throws Exception {
		
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordsCount");
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
		JavaDStream<String> lines = jssc.socketTextStream("127.0.0.1", 9999).window(new Duration(60000));
		
		lines.flatMap(new FlatMapFunction<String, String>(){
			public Iterator<String> call(String x){
				return Arrays.asList(x.split(" ")).iterator();
			}
		}).mapToPair(new PairFunction<String, String, Integer>(){
			public Tuple2<String, Integer> call(String s){
				return new Tuple2<String, Integer>(s, 1);
			}
		}).reduceByKey(new Function2<Integer, Integer, Integer>(){
			public Integer call(Integer i1, Integer i2){
				return i1+i2;
			}
		}).print();
		
		jssc.start();
		jssc.awaitTermination();

	}

}

 

© 著作权归作者所有

共有 人打赏支持
Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer
05/24
0
0
Spark Streaming + Kafka Integration Guide

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partition......

刺猬一号
07/18
0
0
Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf
05/12
0
0
Spark Streaming 基本概念

介绍 Spark Streaming架构图 the micro-batch architecture of Spark Streaming Execution of Spark Streaming within Spark’s components JAVA代码示例 执行方式 1:修改log4j的日志级别为......

cloud-coder
2015/06/18
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

002,zabbix-agent的安装 监控Linux主机

2.1.安装zabbix-agent $ rpm -i http://repo.zabbix.com/zabbix/3.4/rhel/7/x86_64/zabbix-release-3.4-2.el7.noarch.rpm$ yum install -y zabbix-agent zabbix-get 2.2.配置zabbix-agent ......

happyeveryday32
34分钟前
2
0
docker learn :swarm

swarm是什么 swarm是一组运行docker服务的集群,之后,还是使用那些命令去操作docker,但是是通过swarm manager来执行的。 swarm中的机器可以是实体的也可以是虚拟的,加入swarm后,他们被当作...

writeademo
34分钟前
1
0
Golang + vscode 开发环境配置

GOPATH 环境变量的配置 https://my.oschina.net/xinxingegeya/blog/718305 安装vscode go 扩展 下载vscode ,安装go的扩展https://github.com/Microsoft/vscode-go 该扩展对 Golang 支持非常......

秋风醉了
37分钟前
0
0
idea jar包

我自己用idea新建一个springboot项目,打包一直有问题,百度了下,又总结了下。 方式一: 找到Project Structure菜单,然后点开,照下图点击 然后出现下图所示 红框1:为你项目的main函数所在...

朝如青丝暮成雪
38分钟前
0
0
Vue scoped CSS 与深度作用选择器 /deep/

使用 scoped 后,父组件的样式将不会渗透到子组件中。 例如(无效): <template> <div id="app"> <el-input class="text-box" v-model="text"></el-input> </div></template><......

不负好时光
38分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部