文档章节

Spark Streaming(1):Spark transformation in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/08 16:39
字数 1878
阅读 12
收藏 0

    1. map

	/**
	 * map算子对RDD中每个元素进程操作
	 * @return 
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[YOU, ARE, WELCOME]
	 */
	public static void mapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
		
        JavaRDD<String> words = lines.map(new Function<String, String>() {
			private static final long serialVersionUID = 7871833419981488401L;
			public String call(String v) throws Exception {
				return v.toUpperCase();
			}
		});
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}

    2. flatMap

	/**
	 * 对RDD中每个元素进程操作,它可以对单个元素再进行重组,并逐一作为单个新的元素返回
	 * @return 
	 *  数据转换前:
			[Hello You, Hello Me, Hello World]
		
		数据转换后:
			[Hello, You, Hello, Me, Hello, World]
	 */
	public static void flatMapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("Hello You","Hello Me","Hello World");  
        JavaRDD<String> lines = sc.parallelize(list);  
          
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 4232115990240573928L;
			public Iterator<String> call(String v) throws Exception {
				return Arrays.asList(v.split(" ")).iterator();
			}
		});  
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}  

    3. foldByKey

	/**
	 * foldByKey相当于先groupByKey,再reduceByKey
	 * @return
	 *  数据转换前:
			[(name1,10), (name1,11), (name2,12), (name2,13)]
		
		数据转换后:
			[(name2,25), (name1,21)]

	 */
	public static void foldByKeyHandle() {
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
        List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
        list.add(new Tuple2<String, Integer>("name1", 10));
        list.add(new Tuple2<String, Integer>("name1", 11));
        list.add(new Tuple2<String, Integer>("name2", 12));
        list.add(new Tuple2<String, Integer>("name2", 13));
        
        JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
        JavaPairRDD<String, Integer> words = lines.foldByKey(0, new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    4. filter

	/**
	 * 对RDD中的元素进行过滤,把不需要参与计算的垃圾数据过滤掉
	 * @return
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[you]
	 */
	public static void filterHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
        
        JavaRDD<String> words = lines.filter(new Function<String, Boolean>() {
			private static final long serialVersionUID = 8698107583647672300L;
			public Boolean call(String v) throws Exception {
				return v.equals("you")? true :false;
			}
		});
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    5. groupByKey

	/**
	 * groupByKey: 相同key上的value进行聚合
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,[80, 97]), (class2,[100, 89])]
	 */
	public static <T> void groupByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> list = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		  
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
		JavaPairRDD<String, Iterable<Integer>> words = lines.groupByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    6. reduceByKey

	/**
	 * reduceByKey: 相同key上的value进行累加
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,177), (class2,189)]
	 */
	public static <T> void reduceByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> scoreList = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(scoreList);  
		JavaPairRDD<String, Integer> words = lines.reduceByKey(new Function2<Integer, Integer, Integer>() {  
			private static final long serialVersionUID = 7340267351025439164L;
			public Integer call(Integer v1, Integer v2) throws Exception {  
                return v1 + v2;  
            }  
        });  
          
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    7. sortByKey

	/**
	 * sortByKey: 通过key进行key-value的排序
	 * @return
	 *  数据转换前:
			[(90,MARRY), (40,BLACK), (70,JACK), (80,TOM)]
		
		数据转换后:
			[(40,BLACK), (70,JACK), (80,TOM), (90,MARRY)]
	 */
	public static <T> void sortByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list = Arrays.asList(
				new Tuple2<Integer, String>(90, "MARRY"),  
				new Tuple2<Integer, String>(40, "BLACK"),
				new Tuple2<Integer, String>(70, "JACK"),  
				new Tuple2<Integer, String>(80, "TOM")); 
		
		JavaPairRDD<Integer, String> lines = sc.parallelizePairs(list);
		JavaPairRDD<Integer, String> words = lines.sortByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    8. reduce

	/**
	 * reduce: 将数据两两计算,直到只剩一个元素
	 * @param
	 *  数据转换前:
			[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
		
		数据转换后:
			55
	 */
	public static void reduceHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> lines = sc.parallelize(list);
		
		Integer words = lines.reduce(new Function2<Integer, Integer, Integer>() {
			private static final long serialVersionUID = 4540563817287193672L;
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words);
        sc.close(); 
	}

    9. cogroup

	/**
	 * cogroup: 相当于两个RDD分别各自groupByKey后再把这两个RDD进行groupByKey
	 * @return
	 *  数据转换前:
			[(1,TOM), (2,JACK), (3,MARRY)]
		
		数据转换前:
			[(1,78), (2,87), (3,89), (1,98), (2,90), (3,95), (4,95)]
		
		数据转换后:
			[(4,([],[95])), (1,([TOM],[78, 98])), (3,([MARRY],[89, 95])), (2,([JACK],[87, 90]))]
	 */
	public static <T> void cogroupHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list1 = Arrays.asList(  
                new Tuple2<Integer, String>(1, "TOM"),  
                new Tuple2<Integer, String>(2, "JACK"),  
                new Tuple2<Integer, String>(3, "MARRY"));  
        
        List<Tuple2<Integer, Integer>> list2 = Arrays.asList(  
                new Tuple2<Integer, Integer>(1, 78),  
                new Tuple2<Integer, Integer>(2, 87),  
                new Tuple2<Integer, Integer>(3, 89),  
                new Tuple2<Integer, Integer>(1, 98),  
                new Tuple2<Integer, Integer>(2, 90),  
                new Tuple2<Integer, Integer>(3, 95),  
                new Tuple2<Integer, Integer>(4, 95));  
          
        JavaPairRDD<Integer, String> lines1 = sc.parallelizePairs(list1);  
        JavaPairRDD<Integer, Integer> lines2 = sc.parallelizePairs(list2);  
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> words = lines1.cogroup(lines2);  
        
        System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
        System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    10. countByKey

	/**
	 * countByKeyHandle: 统计相同key元素数量
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,MARRY), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{name=3, age1=2, age2=1}
	 */
	public static <T> void countByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","MARRY"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list); 
		Map<String, Long> words = lines.countByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close(); 
	}

    11. countByValue

	/**
	 * countByValue : 如果元素为Tuple2<K,V>结构,首先把元素的K和V组合作为新的Map的Key, 
	 * 				  再把相同元素的数量作为Map的Value.
	 * 			      如果元素为String结果,则把元素的Value作为新Map的Key, 
	 *                把统计元素结果作为Value
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,JOS), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{(name,TOM)=1, (age1,24)=1, (name,JOS)=2, (age2,25)=1, (age1,25)=1}
	 */
	public static <T> void countByValueHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list);
		Map<Tuple2<String, String>, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    12. countByValue

	/**
	 * @return
	 *  数据转换前:
			[name, age, high, name]
		
		数据转换后:
			{high=1, name=2, age=1}
	 */
	public static <T> void countByValueHandle2(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("name","age","high","name");
		
		JavaRDD<String> lines = sc.parallelize(list);
		Map<String, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    13. union

	/**
	 * union: 两个RDD进行合并,不去重
	 * @return
	 *  数据转换前:
			[One, Two, Zero]
		
		数据转换前:
			[One, Four]
		
		数据转换后:
			[One, Two, Zero, One, Four]
	 */
	public static <T> void unionHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("One","Two","Zero");
		List<String> list2 = Arrays.asList("One","Four");
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<String> lines2 = sc.parallelize(list2);
		
		JavaRDD<String> words = lines1.union(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

    14. cartesian

	/**
	 * cartesian: 笛卡尔积计算
	 * @return 
	 *  数据转换前:
			[A, B, C]
		
		数据转换前:
			[1, 2, 3]
		
		数据转换后:
			[(A,1), (A,2), (A,3), (B,1), (B,2), (B,3), (C,1), (C,2), (C,3)]
	 */
	public static void cartesianHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("A","B","C");
		List<Integer> list2 = Arrays.asList(1,2,3);
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<Integer> lines2 = sc.parallelize(list2);
		
		JavaPairRDD<String, Integer> words = lines1.cartesian(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

 

© 著作权归作者所有

共有 人打赏支持
Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer
05/24
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
Spark Streaming 基本概念

介绍 Spark Streaming架构图 the micro-batch architecture of Spark Streaming Execution of Spark Streaming within Spark’s components JAVA代码示例 执行方式 1:修改log4j的日志级别为......

cloud-coder
2015/06/18
0
0
spark出现GC overhead limit exceeded和java heap space

spark执行任务时出现java.lang.OutOfMemoryError: GC overhead limit exceeded和java.lang.OutOfMemoryError: java heap space 最直接的解决方式就是在spark-env.sh中将下面两个参数调节的尽...

闵开慧
2014/10/14
0
1
Spark 1.4.1 Standalone 模式部署安装配置

各节点执行如下操作(或在一个节点上操作完后 scp 到其它节点): 1、 解压spark安装程序到程序目录/bigdata/soft/spark-1.4.1,约定此目录为$SPARKHOME tar –zxvf spark-1.4-bin-hadoop2.6....

山疯
2015/08/12
0
0

没有更多内容

加载失败,请刷新页面

加载更多

5whys分析法在美团工程师中的实践

前言 网站的质量和稳定性对于用户和公司来说至关重要,但是在网站的快速发展过程中,由于各种原因导致事故不可避免的发生,这些大大小小的事故对公司难免会造成一些负面的影响,为了避免同类...

Skqing
21分钟前
0
0
Android 接收监听开机完成,并且开机自启动

1,定义一个广播接收者的类 ,并重写抽象方法 public class BootCompleteReceiver extends BroadcastReceiver 2,在Androidmanifest 注册 <receiver android:name=".receiver.BootCompleteRece......

lanyu96
25分钟前
1
0
小程序记录

1、button的边框、角等需要在伪元素after修改去除

originDu
27分钟前
0
0
微博什么技术啊……还说支持八个明星并发出轨,结果…

是的,大家可能都知道了,女神张靓颖结婚了。。 我去,写错了,是————赵丽颖。 为什么我头脑一瞬间出现的是张靓颖,作为一个码农,技术宅,拼音缩小都是 ZLY,博主我真有点傻傻分不清楚了...

Java技术栈
27分钟前
3
0
模块化

1,什么是模块化? 模块化是指将一个复杂的系统分解为多个模块,方便编码。 2,为什么要用模块化? 降低复杂性,降低代码耦合度,部署方便,提高效率。 3,模块化的好处? a,避免命名冲突,减少...

羊皮卷
28分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部