文档章节

Spark Streaming(1):Spark transformation in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/08 16:39
字数 1878
阅读 11
收藏 0
点赞 0
评论 0

    1. map

	/**
	 * map算子对RDD中每个元素进程操作
	 * @return 
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[YOU, ARE, WELCOME]
	 */
	public static void mapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
		
        JavaRDD<String> words = lines.map(new Function<String, String>() {
			private static final long serialVersionUID = 7871833419981488401L;
			public String call(String v) throws Exception {
				return v.toUpperCase();
			}
		});
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}

    2. flatMap

	/**
	 * 对RDD中每个元素进程操作,它可以对单个元素再进行重组,并逐一作为单个新的元素返回
	 * @return 
	 *  数据转换前:
			[Hello You, Hello Me, Hello World]
		
		数据转换后:
			[Hello, You, Hello, Me, Hello, World]
	 */
	public static void flatMapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("Hello You","Hello Me","Hello World");  
        JavaRDD<String> lines = sc.parallelize(list);  
          
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 4232115990240573928L;
			public Iterator<String> call(String v) throws Exception {
				return Arrays.asList(v.split(" ")).iterator();
			}
		});  
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}  

    3. foldByKey

	/**
	 * foldByKey相当于先groupByKey,再reduceByKey
	 * @return
	 *  数据转换前:
			[(name1,10), (name1,11), (name2,12), (name2,13)]
		
		数据转换后:
			[(name2,25), (name1,21)]

	 */
	public static void foldByKeyHandle() {
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
        List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
        list.add(new Tuple2<String, Integer>("name1", 10));
        list.add(new Tuple2<String, Integer>("name1", 11));
        list.add(new Tuple2<String, Integer>("name2", 12));
        list.add(new Tuple2<String, Integer>("name2", 13));
        
        JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
        JavaPairRDD<String, Integer> words = lines.foldByKey(0, new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    4. filter

	/**
	 * 对RDD中的元素进行过滤,把不需要参与计算的垃圾数据过滤掉
	 * @return
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[you]
	 */
	public static void filterHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
        
        JavaRDD<String> words = lines.filter(new Function<String, Boolean>() {
			private static final long serialVersionUID = 8698107583647672300L;
			public Boolean call(String v) throws Exception {
				return v.equals("you")? true :false;
			}
		});
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    5. groupByKey

	/**
	 * groupByKey: 相同key上的value进行聚合
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,[80, 97]), (class2,[100, 89])]
	 */
	public static <T> void groupByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> list = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		  
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
		JavaPairRDD<String, Iterable<Integer>> words = lines.groupByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    6. reduceByKey

	/**
	 * reduceByKey: 相同key上的value进行累加
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,177), (class2,189)]
	 */
	public static <T> void reduceByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> scoreList = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(scoreList);  
		JavaPairRDD<String, Integer> words = lines.reduceByKey(new Function2<Integer, Integer, Integer>() {  
			private static final long serialVersionUID = 7340267351025439164L;
			public Integer call(Integer v1, Integer v2) throws Exception {  
                return v1 + v2;  
            }  
        });  
          
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    7. sortByKey

	/**
	 * sortByKey: 通过key进行key-value的排序
	 * @return
	 *  数据转换前:
			[(90,MARRY), (40,BLACK), (70,JACK), (80,TOM)]
		
		数据转换后:
			[(40,BLACK), (70,JACK), (80,TOM), (90,MARRY)]
	 */
	public static <T> void sortByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list = Arrays.asList(
				new Tuple2<Integer, String>(90, "MARRY"),  
				new Tuple2<Integer, String>(40, "BLACK"),
				new Tuple2<Integer, String>(70, "JACK"),  
				new Tuple2<Integer, String>(80, "TOM")); 
		
		JavaPairRDD<Integer, String> lines = sc.parallelizePairs(list);
		JavaPairRDD<Integer, String> words = lines.sortByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    8. reduce

	/**
	 * reduce: 将数据两两计算,直到只剩一个元素
	 * @param
	 *  数据转换前:
			[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
		
		数据转换后:
			55
	 */
	public static void reduceHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> lines = sc.parallelize(list);
		
		Integer words = lines.reduce(new Function2<Integer, Integer, Integer>() {
			private static final long serialVersionUID = 4540563817287193672L;
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words);
        sc.close(); 
	}

    9. cogroup

	/**
	 * cogroup: 相当于两个RDD分别各自groupByKey后再把这两个RDD进行groupByKey
	 * @return
	 *  数据转换前:
			[(1,TOM), (2,JACK), (3,MARRY)]
		
		数据转换前:
			[(1,78), (2,87), (3,89), (1,98), (2,90), (3,95), (4,95)]
		
		数据转换后:
			[(4,([],[95])), (1,([TOM],[78, 98])), (3,([MARRY],[89, 95])), (2,([JACK],[87, 90]))]
	 */
	public static <T> void cogroupHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list1 = Arrays.asList(  
                new Tuple2<Integer, String>(1, "TOM"),  
                new Tuple2<Integer, String>(2, "JACK"),  
                new Tuple2<Integer, String>(3, "MARRY"));  
        
        List<Tuple2<Integer, Integer>> list2 = Arrays.asList(  
                new Tuple2<Integer, Integer>(1, 78),  
                new Tuple2<Integer, Integer>(2, 87),  
                new Tuple2<Integer, Integer>(3, 89),  
                new Tuple2<Integer, Integer>(1, 98),  
                new Tuple2<Integer, Integer>(2, 90),  
                new Tuple2<Integer, Integer>(3, 95),  
                new Tuple2<Integer, Integer>(4, 95));  
          
        JavaPairRDD<Integer, String> lines1 = sc.parallelizePairs(list1);  
        JavaPairRDD<Integer, Integer> lines2 = sc.parallelizePairs(list2);  
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> words = lines1.cogroup(lines2);  
        
        System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
        System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    10. countByKey

	/**
	 * countByKeyHandle: 统计相同key元素数量
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,MARRY), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{name=3, age1=2, age2=1}
	 */
	public static <T> void countByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","MARRY"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list); 
		Map<String, Long> words = lines.countByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close(); 
	}

    11. countByValue

	/**
	 * countByValue : 如果元素为Tuple2<K,V>结构,首先把元素的K和V组合作为新的Map的Key, 
	 * 				  再把相同元素的数量作为Map的Value.
	 * 			      如果元素为String结果,则把元素的Value作为新Map的Key, 
	 *                把统计元素结果作为Value
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,JOS), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{(name,TOM)=1, (age1,24)=1, (name,JOS)=2, (age2,25)=1, (age1,25)=1}
	 */
	public static <T> void countByValueHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list);
		Map<Tuple2<String, String>, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    12. countByValue

	/**
	 * @return
	 *  数据转换前:
			[name, age, high, name]
		
		数据转换后:
			{high=1, name=2, age=1}
	 */
	public static <T> void countByValueHandle2(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("name","age","high","name");
		
		JavaRDD<String> lines = sc.parallelize(list);
		Map<String, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    13. union

	/**
	 * union: 两个RDD进行合并,不去重
	 * @return
	 *  数据转换前:
			[One, Two, Zero]
		
		数据转换前:
			[One, Four]
		
		数据转换后:
			[One, Two, Zero, One, Four]
	 */
	public static <T> void unionHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("One","Two","Zero");
		List<String> list2 = Arrays.asList("One","Four");
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<String> lines2 = sc.parallelize(list2);
		
		JavaRDD<String> words = lines1.union(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

    14. cartesian

	/**
	 * cartesian: 笛卡尔积计算
	 * @return 
	 *  数据转换前:
			[A, B, C]
		
		数据转换前:
			[1, 2, 3]
		
		数据转换后:
			[(A,1), (A,2), (A,3), (B,1), (B,2), (B,3), (C,1), (C,2), (C,3)]
	 */
	public static void cartesianHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("A","B","C");
		List<Integer> list2 = Arrays.asList(1,2,3);
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<Integer> lines2 = sc.parallelize(list2);
		
		JavaPairRDD<String, Integer> words = lines1.cartesian(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

 

© 著作权归作者所有

共有 人打赏支持
Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer ⋅ 05/24 ⋅ 0

Spark2.1.0之运行环境准备

学习一个工具的最好途径,就是使用它。这就好比《极品飞车》玩得好的同学,未必真的会开车,要学习车的驾驶技能,就必须用手触摸方向盘、用脚感受刹车与油门的力道。在IT领域,在深入了解一个...

beliefer ⋅ 05/04 ⋅ 0

Comprehensive Introduction to Apache Spark

Introduction Industry estimates that we are creating more than 2.5 Quintillion bytes of data every year. Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you i......

grasp_D ⋅ 06/15 ⋅ 0

Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf ⋅ 05/12 ⋅ 0

教你如何成为Spark大数据高手

Spark目前被越来越多的企业使用,和Hadoop一样,Spark也是以作业的形式向集群提交任务,那么如何成为Spark大数据高手?下面就来个深度教程。 分享之前我还是要推荐下我自己创建的大数据学习交...

风火数据 ⋅ 05/20 ⋅ 0

一文理清Apache Spark内存管理脉络

Spark 作为一个基于内存的分布式计算引擎,其内存管理模块在整个系统中扮演着非常重要的角色。理解 Spark 内存管理的基本原理,有助于更好地开发 Spark 应用程序和进行性能调优。本文旨在梳理...

技术小能手 ⋅ 05/31 ⋅ 0

你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836 ⋅ 04/12 ⋅ 0

Spark基本工作原理与RDD及wordcount程序实例和原理深度剖析

RDD以及其特点 1、RDD是Spark提供的核心抽象,全称为Resillient Distributed Dataset,即弹性分布式数据集。 2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每...

qq1137623160 ⋅ 05/10 ⋅ 0

Spark2.1.0之剖析spark-shell

通过在spark-shell中执行word count的过程,让读者了解到可以使用spark-shell提交Spark作业。现在读者应该很想知道spark-shell究竟做了什么呢? 脚本分析 在Spark安装目录的bin文件夹下可以找...

beliefer ⋅ 04/20 ⋅ 0

Spark及Spark Streaming核心原理及实践

  【IT168 技术】Spark 已经成为广告、报表以及推荐系统等大数据计算场景中首选系统,因效率高,易用以及通用性越来越得到大家的青睐,我自己最近半年在接触spark以及spark streaming之后,...

中国大数据 ⋅ 05/31 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

大数据,必须掌握的10项顶级安全技术

我们看到越来越多的数据泄漏事故、勒索软件和其他类型的网络攻击,这使得安全成为一个热门话题。 去年,企业IT面临的威胁仍然处于非常高的水平,每天都会看到媒体报道大量数据泄漏事故和攻击...

p柯西 ⋅ 31分钟前 ⋅ 0

Linux下安装配置Hadoop2.7.6

前提 安装jdk 下载 wget http://mirrors.hust.edu.cn/apache/hadoop/common/hadoop-2.7.6/hadoop-2.7.6.tar.gz 解压 配置 vim /etc/profile # 配置java环境变量 export JAVA_HOME=/opt/jdk1......

晨猫 ⋅ 36分钟前 ⋅ 0

crontab工具介绍

crontab crontab 是一个用于设置周期性被执行的任务工具。 周期性执行的任务列表称为Cron Table crontab(选项)(参数) -e:编辑该用户的计时器设置; -l:列出该用户的计时器设置; -r:删除该...

Linux学习笔记 ⋅ 今天 ⋅ 0

深入Java多线程——Java内存模型深入(2)

5. final域的内存语义 5.1 final域的重排序规则 1.对于final域,编译器和处理器要遵守两个重排序规则: (1)在构造函数内对一个final域的写入,与随后把这个被构造对象的引用赋值给一个引用...

江左煤郎 ⋅ 今天 ⋅ 0

面试-正向代理和反向代理

面试-正向代理和反向代理 Nginx 是一个高性能的反向代理服务器,但同时也支持正向代理方式的配置。

秋日芒草 ⋅ 今天 ⋅ 0

Spring 依赖注入(DI)

1、Setter方法注入: 通过设置方法注入依赖。这种方法既简单又常用。 类中定义set()方法: public class HelloWorldOutput{ HelloWorld helloWorld; public void setHelloWorld...

霍淇滨 ⋅ 昨天 ⋅ 0

马氏距离与欧氏距离

马氏距离 马氏距离也可以定义为两个服从同一分布并且其协方差矩阵为Σ的随机变量之间的差异程度。 如果协方差矩阵为单位矩阵,那么马氏距离就简化为欧氏距离,如果协方差矩阵为对角阵,则其也...

漫步当下 ⋅ 昨天 ⋅ 0

聊聊spring cloud的RequestRateLimiterGatewayFilter

序 本文主要研究一下spring cloud的RequestRateLimiterGatewayFilter GatewayAutoConfiguration @Configuration@ConditionalOnProperty(name = "spring.cloud.gateway.enabled", matchIfMi......

go4it ⋅ 昨天 ⋅ 0

Spring clound 组件

Spring Cloud技术应用从场景上可以分为两大类:润物无声类和独挑大梁类。 润物无声,融合在每个微服务中、依赖其它组件并为其提供服务。 Ribbon,客户端负载均衡,特性有区域亲和、重试机制。...

英雄有梦没死就别停 ⋅ 昨天 ⋅ 0

Confluence 6 重新获得站点备份文件

Confluence 将会创建备份,同时压缩 XML 文件后存储熬你的 <home-directory>/backups> 目录中。你需要自己访问你安装的 Confluence 服务器,并且从服务器上获得这个文件。 运行从 Confluence...

honeymose ⋅ 昨天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部