文档章节

Spark Streaming(4):Spark updateStateByKey in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/10 12:29
字数 349
阅读 45
收藏 0
package com.pyrrha.examples;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class updateStateByKey {
	
	private static final String KAFKA_TOPIC = "TopicA";
	
	public static void main(String[] args) throws Exception {
		System.setProperty("hadoop.home.dir", "D:\\checkpoint\\hadoop-common-2.2.0-bin-master");
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordsCount");
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(2000));

		Map<String, String> kafkaParams = new HashMap<String, String>();
		kafkaParams.put("bootstrap.servers", "127.0.0.1:9092");
		kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		kafkaParams.put("group.id", "lingroup");
		//kafkaParams.put("auto.offset.reset", "latest");
		
		Set<String> topics = new HashSet<String>();
        topics.add(KAFKA_TOPIC);
		
        JavaPairInputDStream<String, String> stream = org.apache.spark.streaming.kafka.KafkaUtils.
			createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
		
        JavaPairDStream<String, Integer> transDStream = stream.flatMap(new FlatMapFunction<Tuple2<String,String>, String>() {
			public Iterator<String> call(Tuple2<String, String> t) throws Exception {
				return Arrays.asList(t._2.split(" ")).iterator();
			}
		}).filter(new Function<String, Boolean>() {
			public Boolean call(String v1) throws Exception {
				return v1.equals("a") ? false :true;
			}
		}).mapToPair(new PairFunction<String, String, Integer>() {
			public Tuple2<String, Integer> call(String t) throws Exception {
				return new Tuple2<String, Integer>(t, 1);
			}
		}).reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		}).window(new Duration(6000), new Duration(6000));
		
        /**
         * @param v1表示从上面获取的transDStream的所有value的List集合(如果transDStream=["b":2,"c":1],那么v1=[2,1])
         * @param v2表示上一个updateStateByKey的每个value保存的值
         * 所以,v1就是每次最新batch处理后的value集合,v2就是上个batch处理后缓存的value值
         */
        transDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
			public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
				Integer v3 = 0;
				if(v2.isPresent())
					v3 = v2.get();
				for (Integer v : v1)
					v3 += v;
				
				return Optional.of(v3);
			}
		}).print();
        
		jssc.checkpoint("file:///D:/checkpoint/");
		jssc.start();
		jssc.awaitTermination();
	}
	
	

}

 

© 著作权归作者所有

Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
私信 提问
加载中

评论(0)

Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
2018/05/24
0
0
Spark Streaming 基本概念

介绍 Spark Streaming架构图 the micro-batch architecture of Spark Streaming Execution of Spark Streaming within Spark’s components JAVA代码示例 执行方式 1:修改log4j的日志级别为......

cloud-coder
2015/06/18
1.4K
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
5.6K
0
[Kafka与Spark集成系列一] Spark入门

版权声明:本文为博主原创文章,未经博主朱小厮允许不得转载。 https://blog.csdn.net/u013256816/article/details/82081946 Spark是一个用来是实现快速而通用的集群计算的平台。Spark是UC ...

朱小厮
2018/08/26
0
0
[Spark]Spark RDD 指南一 引入Spark

2.3.0版本:Spark2.3.0 引入Spark 1. Java版 Spark 2.1.1适用于Java 7及更高版本。 如果您使用的是Java 8,则Spark支持使用lambda表达式来简洁地编写函数,否则可以使用org.apache.spark.ap...

sjf0115
2017/06/08
0
0

没有更多内容

加载失败,请刷新页面

加载更多

Android Binder机制 - interface_cast和asBinder讲解

研究Android底层代码时,尤其是Binder跨进程通信时,经常会发现interface_cast和asBinder,很容易被这两个函数绕晕,下面来讲解一下: interface_cast 下面根据下述ICameraClient例子进行分析...

天王盖地虎626
21分钟前
35
0
杭州哪里有开建材发票

杭州哪里有开建材发票【1.3.7 - 1.0.9.5 - 5.9.1.9.】李生,,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridge,是And...

143770
25分钟前
48
0
Java @Deprecated Annotation(注解)

在本部分的快速指南中,我们将会查看 Java 的 deprecated API 和如何在程序中使用 @Deprecated 注解。 @Deprecated Annotation(注解) 作为程序的进化和迭代,随着时间的推移,在项目中总会...

honeymoose
43分钟前
29
0
郑州开纸制品发票

郑州开纸制品发票【132 * 50 52 * 90 89】罗生,诚、信、合、作,保、真、售、后、保、障、长、期、有、效。adb的全称为Android Debug Bridg...

zhangyongli
今天
65
0
OSChina 周四乱弹 —— 失业后的阳光太刺眼了

Osc乱弹歌单(2020)请戳(这里) 【今日歌曲】 @大又 :分享Jack Stauber的单曲《Fighter》 《Fighter》- Jack Stauber 手机党少年们想听歌,请使劲儿戳(这里) @theLovelyBugfly :笑死我...

小小编辑
今天
146
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部