文档章节

Spark Streaming(1):Spark transformation in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/08 16:39
字数 1878
阅读 12
收藏 0

    1. map

	/**
	 * map算子对RDD中每个元素进程操作
	 * @return 
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[YOU, ARE, WELCOME]
	 */
	public static void mapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
		
        JavaRDD<String> words = lines.map(new Function<String, String>() {
			private static final long serialVersionUID = 7871833419981488401L;
			public String call(String v) throws Exception {
				return v.toUpperCase();
			}
		});
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}

    2. flatMap

	/**
	 * 对RDD中每个元素进程操作,它可以对单个元素再进行重组,并逐一作为单个新的元素返回
	 * @return 
	 *  数据转换前:
			[Hello You, Hello Me, Hello World]
		
		数据转换后:
			[Hello, You, Hello, Me, Hello, World]
	 */
	public static void flatMapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("Hello You","Hello Me","Hello World");  
        JavaRDD<String> lines = sc.parallelize(list);  
          
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 4232115990240573928L;
			public Iterator<String> call(String v) throws Exception {
				return Arrays.asList(v.split(" ")).iterator();
			}
		});  
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}  

    3. foldByKey

	/**
	 * foldByKey相当于先groupByKey,再reduceByKey
	 * @return
	 *  数据转换前:
			[(name1,10), (name1,11), (name2,12), (name2,13)]
		
		数据转换后:
			[(name2,25), (name1,21)]

	 */
	public static void foldByKeyHandle() {
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
        List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
        list.add(new Tuple2<String, Integer>("name1", 10));
        list.add(new Tuple2<String, Integer>("name1", 11));
        list.add(new Tuple2<String, Integer>("name2", 12));
        list.add(new Tuple2<String, Integer>("name2", 13));
        
        JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
        JavaPairRDD<String, Integer> words = lines.foldByKey(0, new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    4. filter

	/**
	 * 对RDD中的元素进行过滤,把不需要参与计算的垃圾数据过滤掉
	 * @return
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[you]
	 */
	public static void filterHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
        
        JavaRDD<String> words = lines.filter(new Function<String, Boolean>() {
			private static final long serialVersionUID = 8698107583647672300L;
			public Boolean call(String v) throws Exception {
				return v.equals("you")? true :false;
			}
		});
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    5. groupByKey

	/**
	 * groupByKey: 相同key上的value进行聚合
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,[80, 97]), (class2,[100, 89])]
	 */
	public static <T> void groupByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> list = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		  
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
		JavaPairRDD<String, Iterable<Integer>> words = lines.groupByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    6. reduceByKey

	/**
	 * reduceByKey: 相同key上的value进行累加
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,177), (class2,189)]
	 */
	public static <T> void reduceByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> scoreList = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(scoreList);  
		JavaPairRDD<String, Integer> words = lines.reduceByKey(new Function2<Integer, Integer, Integer>() {  
			private static final long serialVersionUID = 7340267351025439164L;
			public Integer call(Integer v1, Integer v2) throws Exception {  
                return v1 + v2;  
            }  
        });  
          
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    7. sortByKey

	/**
	 * sortByKey: 通过key进行key-value的排序
	 * @return
	 *  数据转换前:
			[(90,MARRY), (40,BLACK), (70,JACK), (80,TOM)]
		
		数据转换后:
			[(40,BLACK), (70,JACK), (80,TOM), (90,MARRY)]
	 */
	public static <T> void sortByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list = Arrays.asList(
				new Tuple2<Integer, String>(90, "MARRY"),  
				new Tuple2<Integer, String>(40, "BLACK"),
				new Tuple2<Integer, String>(70, "JACK"),  
				new Tuple2<Integer, String>(80, "TOM")); 
		
		JavaPairRDD<Integer, String> lines = sc.parallelizePairs(list);
		JavaPairRDD<Integer, String> words = lines.sortByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    8. reduce

	/**
	 * reduce: 将数据两两计算,直到只剩一个元素
	 * @param
	 *  数据转换前:
			[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
		
		数据转换后:
			55
	 */
	public static void reduceHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> lines = sc.parallelize(list);
		
		Integer words = lines.reduce(new Function2<Integer, Integer, Integer>() {
			private static final long serialVersionUID = 4540563817287193672L;
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words);
        sc.close(); 
	}

    9. cogroup

	/**
	 * cogroup: 相当于两个RDD分别各自groupByKey后再把这两个RDD进行groupByKey
	 * @return
	 *  数据转换前:
			[(1,TOM), (2,JACK), (3,MARRY)]
		
		数据转换前:
			[(1,78), (2,87), (3,89), (1,98), (2,90), (3,95), (4,95)]
		
		数据转换后:
			[(4,([],[95])), (1,([TOM],[78, 98])), (3,([MARRY],[89, 95])), (2,([JACK],[87, 90]))]
	 */
	public static <T> void cogroupHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list1 = Arrays.asList(  
                new Tuple2<Integer, String>(1, "TOM"),  
                new Tuple2<Integer, String>(2, "JACK"),  
                new Tuple2<Integer, String>(3, "MARRY"));  
        
        List<Tuple2<Integer, Integer>> list2 = Arrays.asList(  
                new Tuple2<Integer, Integer>(1, 78),  
                new Tuple2<Integer, Integer>(2, 87),  
                new Tuple2<Integer, Integer>(3, 89),  
                new Tuple2<Integer, Integer>(1, 98),  
                new Tuple2<Integer, Integer>(2, 90),  
                new Tuple2<Integer, Integer>(3, 95),  
                new Tuple2<Integer, Integer>(4, 95));  
          
        JavaPairRDD<Integer, String> lines1 = sc.parallelizePairs(list1);  
        JavaPairRDD<Integer, Integer> lines2 = sc.parallelizePairs(list2);  
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> words = lines1.cogroup(lines2);  
        
        System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
        System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    10. countByKey

	/**
	 * countByKeyHandle: 统计相同key元素数量
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,MARRY), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{name=3, age1=2, age2=1}
	 */
	public static <T> void countByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","MARRY"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list); 
		Map<String, Long> words = lines.countByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close(); 
	}

    11. countByValue

	/**
	 * countByValue : 如果元素为Tuple2<K,V>结构,首先把元素的K和V组合作为新的Map的Key, 
	 * 				  再把相同元素的数量作为Map的Value.
	 * 			      如果元素为String结果,则把元素的Value作为新Map的Key, 
	 *                把统计元素结果作为Value
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,JOS), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{(name,TOM)=1, (age1,24)=1, (name,JOS)=2, (age2,25)=1, (age1,25)=1}
	 */
	public static <T> void countByValueHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list);
		Map<Tuple2<String, String>, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    12. countByValue

	/**
	 * @return
	 *  数据转换前:
			[name, age, high, name]
		
		数据转换后:
			{high=1, name=2, age=1}
	 */
	public static <T> void countByValueHandle2(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("name","age","high","name");
		
		JavaRDD<String> lines = sc.parallelize(list);
		Map<String, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    13. union

	/**
	 * union: 两个RDD进行合并,不去重
	 * @return
	 *  数据转换前:
			[One, Two, Zero]
		
		数据转换前:
			[One, Four]
		
		数据转换后:
			[One, Two, Zero, One, Four]
	 */
	public static <T> void unionHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("One","Two","Zero");
		List<String> list2 = Arrays.asList("One","Four");
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<String> lines2 = sc.parallelize(list2);
		
		JavaRDD<String> words = lines1.union(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

    14. cartesian

	/**
	 * cartesian: 笛卡尔积计算
	 * @return 
	 *  数据转换前:
			[A, B, C]
		
		数据转换前:
			[1, 2, 3]
		
		数据转换后:
			[(A,1), (A,2), (A,3), (B,1), (B,2), (B,3), (C,1), (C,2), (C,3)]
	 */
	public static void cartesianHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("A","B","C");
		List<Integer> list2 = Arrays.asList(1,2,3);
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<Integer> lines2 = sc.parallelize(list2);
		
		JavaPairRDD<String, Integer> words = lines1.cartesian(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

 

© 著作权归作者所有

共有 人打赏支持
Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
私信 提问
Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
05/24
0
0
Spark Streaming 基本概念

介绍 Spark Streaming架构图 the micro-batch architecture of Spark Streaming Execution of Spark Streaming within Spark’s components JAVA代码示例 执行方式 1:修改log4j的日志级别为......

cloud-coder
2015/06/18
0
0
Spark 2.0 时代全面到来 —— 2.0.1 版本发布

距离Spark 2.0.0发布两个月后,Spark 2.0.1版本发布了,这是一个修正版本,共处理了300多个Issue,涉及spark稳定性和bug等方面的修复 ,它的发布意味着Spark 2.0接近生产环境使用要求,想要尝...

达尔文
2016/10/08
13.1K
22
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
Comprehensive Introduction to Apache Spark

Introduction Industry estimates that we are creating more than 2.5 Quintillion bytes of data every year. Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you i......

grasp_D
06/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

ConcurrentHashMap源码解析

初始化 先看看ConcurrentHashMap中几个重要的属性: // 初始化容量大小static final int DEFAULT_INITIAL_CAPACITY = 16;//默认负载因子static final float DEFAULT_LOAD_FACTOR = 0.75f...

grace_233
9分钟前
0
0
java对象的浅拷贝和深拷贝

浅拷贝 java的数据类型有基本数据类型(如:int、long等)和引用数据类型。例如:对象1中有属性a(基本数据类型)和属性b(引用数据类型),在进行浅拷贝到对象2时,属性a复制属性的值给对象...

yangyangyyyy
9分钟前
0
0
SQLServer AlwaysOn在阿里云的前世今生

缘起 早在2015年的时候,随着阿里云业务突飞猛进的发展,SQLServer业务也积累了大批忠实客户,其中一些体量较大的客户在类似大促的业务高峰时RDS的单机规格(规格是按照 内存CPUIOPS 一定比例...

阿里云云栖社区
10分钟前
0
0
ubuntu16.04 LNMP搭建 php7.1

sudo apt-get update sudo apt-get install mysql-server mysql-client sudo apt-add-repository ppa:ondrej/php sudo apt-get update sudo apt-get install php7.1 php7.1-fpm php7.1-cgi p......

一千零一夜个为什么
16分钟前
0
0
阿里云高级技术专家带你全面了解云主机性能评测

钱超,花名西邪,阿里云高级技术专家,超12年老阿里,是云主机性能领域的知名专家。 在目前的云计算测评领域,很多性能测评存在营销的包装,容易引起误导:比如用瞬时性能引导读者得出结论,...

阿里云官方博客
24分钟前
1
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部