文档章节

Spark Streaming(4):Spark updateStateByKey in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/10 12:29
字数 349
阅读 21
收藏 0
package com.pyrrha.examples;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class updateStateByKey {
	
	private static final String KAFKA_TOPIC = "TopicA";
	
	public static void main(String[] args) throws Exception {
		System.setProperty("hadoop.home.dir", "D:\\checkpoint\\hadoop-common-2.2.0-bin-master");
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordsCount");
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(2000));

		Map<String, String> kafkaParams = new HashMap<String, String>();
		kafkaParams.put("bootstrap.servers", "127.0.0.1:9092");
		kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		kafkaParams.put("group.id", "lingroup");
		//kafkaParams.put("auto.offset.reset", "latest");
		
		Set<String> topics = new HashSet<String>();
        topics.add(KAFKA_TOPIC);
		
        JavaPairInputDStream<String, String> stream = org.apache.spark.streaming.kafka.KafkaUtils.
			createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
		
        JavaPairDStream<String, Integer> transDStream = stream.flatMap(new FlatMapFunction<Tuple2<String,String>, String>() {
			public Iterator<String> call(Tuple2<String, String> t) throws Exception {
				return Arrays.asList(t._2.split(" ")).iterator();
			}
		}).filter(new Function<String, Boolean>() {
			public Boolean call(String v1) throws Exception {
				return v1.equals("a") ? false :true;
			}
		}).mapToPair(new PairFunction<String, String, Integer>() {
			public Tuple2<String, Integer> call(String t) throws Exception {
				return new Tuple2<String, Integer>(t, 1);
			}
		}).reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		}).window(new Duration(6000), new Duration(6000));
		
        /**
         * @param v1表示从上面获取的transDStream的所有value的List集合(如果transDStream=["b":2,"c":1],那么v1=[2,1])
         * @param v2表示上一个updateStateByKey的每个value保存的值
         * 所以,v1就是每次最新batch处理后的value集合,v2就是上个batch处理后缓存的value值
         */
        transDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
			public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
				Integer v3 = 0;
				if(v2.isPresent())
					v3 = v2.get();
				for (Integer v : v1)
					v3 += v;
				
				return Optional.of(v3);
			}
		}).print();
        
		jssc.checkpoint("file:///D:/checkpoint/");
		jssc.start();
		jssc.awaitTermination();
	}
	
	

}

 

© 著作权归作者所有

共有 人打赏支持
Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
私信 提问
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
05/24
0
0
【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)

本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角 Spark SQL、DataFrames 和 Datase...

跑呀跑
09/19
0
0
Tuning Java Garbage Collection for Spark Applicati

This is a guest post from our friends in the SSG STO Big Data Technology group at Intel. Join us at the Spark Summit to hear from Intel and other companies deploying Spark in pr......

kuerant
2015/05/30
0
1
关于Spark 的一些调优选项(待完善)

各位看到的大侠们,,,,如果有什么问题,不要拍砖,后期进行完善。谢谢协助完善。 几个比较重要的配置属性: 1.手动启动集群 参数 含义 -i IP,--ip IP 要监听的IP地址或者 DNS 机器名 -p P...

Ryan-瑞恩
2015/08/28
0
3

没有更多内容

加载失败,请刷新页面

加载更多

Pycharm上Django的使用 Day8

1.添加新条目 1>编写用于添加新条目的表单 在forms.py中创建一个与模型Entry相关联的表单 1处给字段'text'指定一个空标签 2处定义小部件widgets,widgets是一个HTML表单元素 2>定义new_entry...

不会TC的猫
30分钟前
3
0
MongoDB副本集

MongoDB介绍 早期版本使用master-slave,一主一从和MySQL类似,但slave在此架构中为只读,当主库宕机后,从库不能自动切换为主 目前已经淘汰master-slave模式,改为副本集,这种模式下有一个...

chencheng-linux
43分钟前
1
0
WebService 客户端记录

https://blog.csdn.net/qiuhan/article/details/49487009

呼呼南风
43分钟前
1
0
七牛云彭垚:智能平台的创新和发展

2018 年 11 月 14 日至 11 月 18 日,第二十届中国国际高新技术成果交易会(简称高交会)在深圳成功举办,七牛云作为国内领先的以数据智能和视觉智能为核心的企业级云计算服务商受邀参展。 ...

七牛云
50分钟前
1
0
Java内存模型原理,你真的理解透彻了吗?

内存模型产生背景 在介绍 Java 内存模型之前,我们先了解一下物理计算机中的并发问题,理解这些问题可以搞清楚内存模型产生的背景。 物理机遇到的并发问题与虚拟机中的情况有不少相似之处,物...

小刀爱编程
55分钟前
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部