文档章节

Spark Streaming(4):Spark updateStateByKey in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/10 12:29
字数 349
阅读 14
收藏 0
点赞 0
评论 0
package com.pyrrha.examples;

import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;

import kafka.serializer.StringDecoder;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.Optional;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.Durations;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;

import scala.Tuple2;

public class updateStateByKey {
	
	private static final String KAFKA_TOPIC = "TopicA";
	
	public static void main(String[] args) throws Exception {
		System.setProperty("hadoop.home.dir", "D:\\checkpoint\\hadoop-common-2.2.0-bin-master");
		SparkConf conf = new SparkConf().setMaster("local[2]").setAppName("WordsCount");
		JavaStreamingContext jssc = new JavaStreamingContext(conf, Durations.milliseconds(2000));

		Map<String, String> kafkaParams = new HashMap<String, String>();
		kafkaParams.put("bootstrap.servers", "127.0.0.1:9092");
		kafkaParams.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		kafkaParams.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
		kafkaParams.put("group.id", "lingroup");
		//kafkaParams.put("auto.offset.reset", "latest");
		
		Set<String> topics = new HashSet<String>();
        topics.add(KAFKA_TOPIC);
		
        JavaPairInputDStream<String, String> stream = org.apache.spark.streaming.kafka.KafkaUtils.
			createDirectStream(jssc, String.class, String.class, StringDecoder.class, StringDecoder.class, kafkaParams, topics);
		
        JavaPairDStream<String, Integer> transDStream = stream.flatMap(new FlatMapFunction<Tuple2<String,String>, String>() {
			public Iterator<String> call(Tuple2<String, String> t) throws Exception {
				return Arrays.asList(t._2.split(" ")).iterator();
			}
		}).filter(new Function<String, Boolean>() {
			public Boolean call(String v1) throws Exception {
				return v1.equals("a") ? false :true;
			}
		}).mapToPair(new PairFunction<String, String, Integer>() {
			public Tuple2<String, Integer> call(String t) throws Exception {
				return new Tuple2<String, Integer>(t, 1);
			}
		}).reduceByKey(new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		}).window(new Duration(6000), new Duration(6000));
		
        /**
         * @param v1表示从上面获取的transDStream的所有value的List集合(如果transDStream=["b":2,"c":1],那么v1=[2,1])
         * @param v2表示上一个updateStateByKey的每个value保存的值
         * 所以,v1就是每次最新batch处理后的value集合,v2就是上个batch处理后缓存的value值
         */
        transDStream.updateStateByKey(new Function2<List<Integer>, Optional<Integer>, Optional<Integer>>() {
			public Optional<Integer> call(List<Integer> v1, Optional<Integer> v2) throws Exception {
				Integer v3 = 0;
				if(v2.isPresent())
					v3 = v2.get();
				for (Integer v : v1)
					v3 += v;
				
				return Optional.of(v3);
			}
		}).print();
        
		jssc.checkpoint("file:///D:/checkpoint/");
		jssc.start();
		jssc.awaitTermination();
	}
	
	

}

 

© 著作权归作者所有

共有 人打赏支持
Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer
05/24
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
Spark Streaming 基本概念

介绍 Spark Streaming架构图 the micro-batch architecture of Spark Streaming Execution of Spark Streaming within Spark’s components JAVA代码示例 执行方式 1:修改log4j的日志级别为......

cloud-coder
2015/06/18
0
0
spark出现GC overhead limit exceeded和java heap space

spark执行任务时出现java.lang.OutOfMemoryError: GC overhead limit exceeded和java.lang.OutOfMemoryError: java heap space 最直接的解决方式就是在spark-env.sh中将下面两个参数调节的尽...

闵开慧
2014/10/14
0
1
Spark 1.4.1 Standalone 模式部署安装配置

各节点执行如下操作(或在一个节点上操作完后 scp 到其它节点): 1、 解压spark安装程序到程序目录/bigdata/soft/spark-1.4.1,约定此目录为$SPARKHOME tar –zxvf spark-1.4-bin-hadoop2.6....

山疯
2015/08/12
0
0
Spark Streaming + Kafka Integration Guide

The Spark Streaming integration for Kafka 0.10 is similar in design to the 0.8 Direct Stream approach. It provides simple parallelism, 1:1 correspondence between Kafka partition......

刺猬一号
07/18
0
0
Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf
05/12
0
0
Tuning Java Garbage Collection for Spark Applicati

This is a guest post from our friends in the SSG STO Big Data Technology group at Intel. Join us at the Spark Summit to hear from Intel and other companies deploying Spark in pr......

kuerant
2015/05/30
0
1
关于Spark 的一些调优选项(待完善)

各位看到的大侠们,,,,如果有什么问题,不要拍砖,后期进行完善。谢谢协助完善。 几个比较重要的配置属性: 1.手动启动集群 参数 含义 -i IP,--ip IP 要监听的IP地址或者 DNS 机器名 -p P...

Ryan-瑞恩
2015/08/28
0
3
Spark 伪分布式 & 全分布式 安装指南

0、前言 3月31日是 Spark 五周年纪念日,从第一个公开发布的版本开始,Spark走过了不平凡的5年:从刚开始的默默无闻,到13年的鹊起,14年的大爆发。Spark核心之上有分布式的机器学习,SQL,s...

大数据之路
2015/04/02
0
5

没有更多内容

加载失败,请刷新页面

加载更多

下一页

eclipse SVN 项目重新定位

SVN 重新定位 1.方法一 首先:在Eclipse中选择Windows-> Show View->others 就会出现【SVN资源库/SVN Repositories】,选中后,点击确认; 然后:选中原有的地址,选择【重新定位/Relocate】...

qimh
11分钟前
0
0
Linux 第29课 ——Linux集群架构(下)

Linux集群架构(下) 八、DR模式搭建 8.1 准备工作 试验需求三台机器: 分发器,也叫调度器(简写为dir) 192.168.112.136 ying01 rs1 192.168.112.138 ying02 rs2 192.168.112.139 ying03 vip...

feng-01
17分钟前
0
0
轻松搭建svn版本管理工具+svnmanager管理客户端

前面的文章有写过svn版本管理工具的安装是基于svn的安装包进行安装,对于svn与apache的结合还得下svn和apache的模块进行结合过程比较繁琐,今天来介绍下通过centos的yum来安装svn能够快速安装...

javazyw
26分钟前
0
0
keepalived配置高可用集群

Linux集群概述 根据功能划分为两大类:高可用和负载均衡 高可用集群通常为两台服务器,一台工作,另外一台作为冗余,当提供服务的机器宕机,冗余将接替继续提供服务 实现高可用的开源软件有:...

TaoXu
31分钟前
0
0
mysql联表批处理操作

1 概述 mysql中的单表增删改查操作,可以说是基本中的基本. 实际工作中,常常会遇到一些基本用法难以处理的数据操作,譬如遇到主从表甚至多级关联表的情况(如一些历史问题数据的批量处理),考虑到...

社哥
34分钟前
0
0
IntelliJ IDEA 详细图解最常用的配置,适合刚刚用的新人。

刚刚使用IntelliJ IDEA 编辑器的时候,会有很多设置,会方便以后的开发,磨刀不误砍柴工。 比如:设置文件字体大小,代码自动完成提示,版本管理,本地代码历史,自动导入包,修改注释,修改...

kim_o
49分钟前
0
0
Google Java编程风格指南

目录 前言 源文件基础 源文件结构 格式 命名约定 编程实践 Javadoc 后记 前言 这份文档是Google Java编程风格规范的完整定义。当且仅当一个Java源文件符合此文档中的规则, 我们才认为它符合...

niithub
51分钟前
0
0
java.net.MalformedURLException异常说明

1.异常片段 Java代码中,在进行URL url = new URL(urllink)操作时,提示以下异常信息,该类异常主要问题出在参数urllink上面。 异常片段1 java.net.MalformedURLException at java.ne...

lqlm
51分钟前
1
0
CentOS7修改mysql5.6字符集

解决办法:CentOS7下修改MySQL数据库字符编码为UTF-8,UTF-8包含全世界所有国家所需要的字符集,是国际编码。 具体操作如下: 1.进入MySQL [root@tianqi-01 ~]# mysql -uroot -p Enter passw...

河图再现
53分钟前
0
0
DevExpress v18.1新版亮点——WPF篇(一)

用户界面套包DevExpress v18.1日前终于正式发布,本站将以连载的形式为大家介绍各版本新增内容。本文将介绍了DevExpress WPF v18.1 的新功能,快来下载试用新版本!点击下载>> Accordion Co...

Miss_Hello_World
55分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部