文档章节

Spark Streaming(5):Spark Window function in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/10 17:58
字数 287
阅读 7
收藏 0

首先,看下window函数的图解:

下面这个代码是计算一分钟之内的单词数量统计,每两秒获取一次数据,同时处理数据时间也是两秒,窗口大小为1分钟

1.数据源

package com.ssm.test;

import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.io.PrintWriter;
import java.net.ServerSocket;
import java.net.Socket;

public class SocketTest {

	public static void main(String[] args) {
		try{
            ServerSocket server=new ServerSocket(9999);
            System.out.println("The Server has started");
            
            Socket socket=server.accept();
            System.out.println("There was a socket client connected");
            
            BufferedReader br=new BufferedReader(new InputStreamReader(System.in));
            String line=br.readLine();
            PrintWriter writer=new PrintWriter(socket.getOutputStream());
            
            while(!line.equals("end")){
            	writer = new PrintWriter(socket.getOutputStream());
    			writer.println("If you are not brave enough, no one will back you up.");
                writer.flush();
                System.out.println(System.currentTimeMillis() + "\nServer send:\t"+line);
                Thread.sleep(2000);
            }
            writer.close();
            socket.close();
            server.close();
        }catch(Exception e) {
        	e.printStackTrace();
        }
	}
}

2.spark streaming处理代码

package com.ssm.test;

import java.util.Arrays;
import java.util.Iterator;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class WordsCountTest {

	public static void main(String[] args) throws Exception {
		
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordsCount");
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.seconds(2));
		JavaDStream<String> lines = jssc.socketTextStream("127.0.0.1", 9999).window(new Duration(60000));
		
		lines.flatMap(new FlatMapFunction<String, String>(){
			public Iterator<String> call(String x){
				return Arrays.asList(x.split(" ")).iterator();
			}
		}).mapToPair(new PairFunction<String, String, Integer>(){
			public Tuple2<String, Integer> call(String s){
				return new Tuple2<String, Integer>(s, 1);
			}
		}).reduceByKey(new Function2<Integer, Integer, Integer>(){
			public Integer call(Integer i1, Integer i2){
				return i1+i2;
			}
		}).print();
		
		jssc.start();
		jssc.awaitTermination();

	}

}

 

© 著作权归作者所有

共有 人打赏支持
Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer
05/24
0
0
Spark Streaming + Kafka Integration Guide

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partition......

刺猬一号
07/18
0
0
Spark Streaming 基本概念

介绍 Spark Streaming架构图 the micro-batch architecture of Spark Streaming Execution of Spark Streaming within Spark’s components JAVA代码示例 执行方式 1:修改log4j的日志级别为......

cloud-coder
2015/06/18
0
0
Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf
05/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

关于Jackson默认丢失Bigdecimal精度问题分析

问题描述 最近在使用一个内部的RPC框架时,发现如果使用Object类型,实际类型为BigDecimal的时候,作为传输对象的时候,会出现丢失精度的问题;比如在序列化前为金额1.00,反序列化之后为1.0...

ksfzhaohui
21分钟前
0
0
vue less安装

$ npm install less less-loader --save 安装成功后修改文件:build>webpack.base.conf.js 在model.rules添加对象: { test: /\.less$/, loader: "style-loader!css-loader!less-loade......

shawnDream
26分钟前
0
0
kolla-ansible部署容器ceph

kolla是从openstack孵化出的一个项目,kolla项目可以制作镜像包括openstack、ceph等容器镜像, ansible是自动化部署工具,执行playbook中的任务。 kolla-ansible是容器部署工具,部署opensta...

zrz11
31分钟前
0
0
【三 异步HTTP编程】 1. 处理异步results

异步results 事实上整个Play框架都是异步的。Play非阻塞地处理每个request请求。 默认的配置适配的正是异步的controller。因此开发者应该尽力避免在在controller中阻塞,如在controller方法中...

Landas
33分钟前
0
0
Android Studio 3.1.4 buildApk遇到问题 Connection reset

打开设置,找到Android Studio选项卡,把下图选项打上勾就ok

lanyu96
34分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部