文档章节

Spark Streaming(1):Spark transformation in Java

Joe_Wu
 Joe_Wu
发布于 2017/08/08 16:39
字数 1878
阅读 12
收藏 0

    1. map

	/**
	 * map算子对RDD中每个元素进程操作
	 * @return 
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[YOU, ARE, WELCOME]
	 */
	public static void mapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
		
        JavaRDD<String> words = lines.map(new Function<String, String>() {
			private static final long serialVersionUID = 7871833419981488401L;
			public String call(String v) throws Exception {
				return v.toUpperCase();
			}
		});
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}

    2. flatMap

	/**
	 * 对RDD中每个元素进程操作,它可以对单个元素再进行重组,并逐一作为单个新的元素返回
	 * @return 
	 *  数据转换前:
			[Hello You, Hello Me, Hello World]
		
		数据转换后:
			[Hello, You, Hello, Me, Hello, World]
	 */
	public static void flatMapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("Hello You","Hello Me","Hello World");  
        JavaRDD<String> lines = sc.parallelize(list);  
          
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 4232115990240573928L;
			public Iterator<String> call(String v) throws Exception {
				return Arrays.asList(v.split(" ")).iterator();
			}
		});  
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}  

    3. foldByKey

	/**
	 * foldByKey相当于先groupByKey,再reduceByKey
	 * @return
	 *  数据转换前:
			[(name1,10), (name1,11), (name2,12), (name2,13)]
		
		数据转换后:
			[(name2,25), (name1,21)]

	 */
	public static void foldByKeyHandle() {
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
        List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
        list.add(new Tuple2<String, Integer>("name1", 10));
        list.add(new Tuple2<String, Integer>("name1", 11));
        list.add(new Tuple2<String, Integer>("name2", 12));
        list.add(new Tuple2<String, Integer>("name2", 13));
        
        JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
        JavaPairRDD<String, Integer> words = lines.foldByKey(0, new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    4. filter

	/**
	 * 对RDD中的元素进行过滤,把不需要参与计算的垃圾数据过滤掉
	 * @return
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[you]
	 */
	public static void filterHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
        
        JavaRDD<String> words = lines.filter(new Function<String, Boolean>() {
			private static final long serialVersionUID = 8698107583647672300L;
			public Boolean call(String v) throws Exception {
				return v.equals("you")? true :false;
			}
		});
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    5. groupByKey

	/**
	 * groupByKey: 相同key上的value进行聚合
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,[80, 97]), (class2,[100, 89])]
	 */
	public static <T> void groupByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> list = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		  
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
		JavaPairRDD<String, Iterable<Integer>> words = lines.groupByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    6. reduceByKey

	/**
	 * reduceByKey: 相同key上的value进行累加
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,177), (class2,189)]
	 */
	public static <T> void reduceByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> scoreList = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(scoreList);  
		JavaPairRDD<String, Integer> words = lines.reduceByKey(new Function2<Integer, Integer, Integer>() {  
			private static final long serialVersionUID = 7340267351025439164L;
			public Integer call(Integer v1, Integer v2) throws Exception {  
                return v1 + v2;  
            }  
        });  
          
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    7. sortByKey

	/**
	 * sortByKey: 通过key进行key-value的排序
	 * @return
	 *  数据转换前:
			[(90,MARRY), (40,BLACK), (70,JACK), (80,TOM)]
		
		数据转换后:
			[(40,BLACK), (70,JACK), (80,TOM), (90,MARRY)]
	 */
	public static <T> void sortByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list = Arrays.asList(
				new Tuple2<Integer, String>(90, "MARRY"),  
				new Tuple2<Integer, String>(40, "BLACK"),
				new Tuple2<Integer, String>(70, "JACK"),  
				new Tuple2<Integer, String>(80, "TOM")); 
		
		JavaPairRDD<Integer, String> lines = sc.parallelizePairs(list);
		JavaPairRDD<Integer, String> words = lines.sortByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    8. reduce

	/**
	 * reduce: 将数据两两计算,直到只剩一个元素
	 * @param
	 *  数据转换前:
			[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
		
		数据转换后:
			55
	 */
	public static void reduceHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> lines = sc.parallelize(list);
		
		Integer words = lines.reduce(new Function2<Integer, Integer, Integer>() {
			private static final long serialVersionUID = 4540563817287193672L;
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words);
        sc.close(); 
	}

    9. cogroup

	/**
	 * cogroup: 相当于两个RDD分别各自groupByKey后再把这两个RDD进行groupByKey
	 * @return
	 *  数据转换前:
			[(1,TOM), (2,JACK), (3,MARRY)]
		
		数据转换前:
			[(1,78), (2,87), (3,89), (1,98), (2,90), (3,95), (4,95)]
		
		数据转换后:
			[(4,([],[95])), (1,([TOM],[78, 98])), (3,([MARRY],[89, 95])), (2,([JACK],[87, 90]))]
	 */
	public static <T> void cogroupHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list1 = Arrays.asList(  
                new Tuple2<Integer, String>(1, "TOM"),  
                new Tuple2<Integer, String>(2, "JACK"),  
                new Tuple2<Integer, String>(3, "MARRY"));  
        
        List<Tuple2<Integer, Integer>> list2 = Arrays.asList(  
                new Tuple2<Integer, Integer>(1, 78),  
                new Tuple2<Integer, Integer>(2, 87),  
                new Tuple2<Integer, Integer>(3, 89),  
                new Tuple2<Integer, Integer>(1, 98),  
                new Tuple2<Integer, Integer>(2, 90),  
                new Tuple2<Integer, Integer>(3, 95),  
                new Tuple2<Integer, Integer>(4, 95));  
          
        JavaPairRDD<Integer, String> lines1 = sc.parallelizePairs(list1);  
        JavaPairRDD<Integer, Integer> lines2 = sc.parallelizePairs(list2);  
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> words = lines1.cogroup(lines2);  
        
        System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
        System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    10. countByKey

	/**
	 * countByKeyHandle: 统计相同key元素数量
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,MARRY), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{name=3, age1=2, age2=1}
	 */
	public static <T> void countByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","MARRY"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list); 
		Map<String, Long> words = lines.countByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close(); 
	}

    11. countByValue

	/**
	 * countByValue : 如果元素为Tuple2<K,V>结构,首先把元素的K和V组合作为新的Map的Key, 
	 * 				  再把相同元素的数量作为Map的Value.
	 * 			      如果元素为String结果,则把元素的Value作为新Map的Key, 
	 *                把统计元素结果作为Value
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,JOS), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{(name,TOM)=1, (age1,24)=1, (name,JOS)=2, (age2,25)=1, (age1,25)=1}
	 */
	public static <T> void countByValueHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list);
		Map<Tuple2<String, String>, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    12. countByValue

	/**
	 * @return
	 *  数据转换前:
			[name, age, high, name]
		
		数据转换后:
			{high=1, name=2, age=1}
	 */
	public static <T> void countByValueHandle2(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("name","age","high","name");
		
		JavaRDD<String> lines = sc.parallelize(list);
		Map<String, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    13. union

	/**
	 * union: 两个RDD进行合并,不去重
	 * @return
	 *  数据转换前:
			[One, Two, Zero]
		
		数据转换前:
			[One, Four]
		
		数据转换后:
			[One, Two, Zero, One, Four]
	 */
	public static <T> void unionHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("One","Two","Zero");
		List<String> list2 = Arrays.asList("One","Four");
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<String> lines2 = sc.parallelize(list2);
		
		JavaRDD<String> words = lines1.union(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

    14. cartesian

	/**
	 * cartesian: 笛卡尔积计算
	 * @return 
	 *  数据转换前:
			[A, B, C]
		
		数据转换前:
			[1, 2, 3]
		
		数据转换后:
			[(A,1), (A,2), (A,3), (B,1), (B,2), (B,3), (C,1), (C,2), (C,3)]
	 */
	public static void cartesianHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("A","B","C");
		List<Integer> list2 = Arrays.asList(1,2,3);
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<Integer> lines2 = sc.parallelize(list2);
		
		JavaPairRDD<String, Integer> words = lines1.cartesian(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

 

© 著作权归作者所有

共有 人打赏支持
Joe_Wu
粉丝 1
博文 9
码字总数 6567
作品 0
徐汇
程序员
Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer
05/24
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
0
1
Spark Streaming 基本概念

介绍 Spark Streaming架构图 the micro-batch architecture of Spark Streaming Execution of Spark Streaming within Spark’s components JAVA代码示例 执行方式 1:修改log4j的日志级别为......

cloud-coder
2015/06/18
0
0
spark出现GC overhead limit exceeded和java heap space

spark执行任务时出现java.lang.OutOfMemoryError: GC overhead limit exceeded和java.lang.OutOfMemoryError: java heap space 最直接的解决方式就是在spark-env.sh中将下面两个参数调节的尽...

闵开慧
2014/10/14
0
1
Comprehensive Introduction to Apache Spark

Introduction Industry estimates that we are creating more than 2.5 Quintillion bytes of data every year. Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you i......

grasp_D
06/15
0
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

(三)Nginx配置·续

概述 前文写了关于Nginx环境配置,但是还没有完,接下来将会继续讲三个相关的配置 主要是以下三个 1.Nginx访问日志 2.Nginx日志切割 3.静态文件不记录日志和过期时间 Nginx访问日志 1.先看看...

杉下
今天
1
0
jquery创建类似于java的map

var map = {}; // Map map = new HashMap(); map[key] = value; // map.put(key, value); var value = map[key]; // Object value = map.get(key); var has = key in map; // boolean has = ......

SuperDabai
今天
0
0
java大数据转换16进制转10进制

public static void main(String[] args) {String hex = "0xdbf3accc683297cf0000";BigInteger amount = new BigInteger(hex.substring(2), 16);System.out.println(amount);......

任梁荣
昨天
2
0
OSChina 周六乱弹 —— 目测我们程序员丁克的几率不大

Osc乱弹歌单(2018)请戳(这里) 【今日歌曲】 @真Skr小机灵鬼儿:8.13分享Jocelyn Pook/Russian Red的单曲《Loving Strangers》 《Loving Strangers》- Jocelyn Pook/Russian Red 手机党少...

小小编辑
昨天
13
3
TypeScript基础入门 - 函数 - 剩余参数

转载 TypeScript基础入门 - 函数 - 剩余参数 项目实践仓库 https://github.com/durban89/typescript_demo.gittag: 1.2.1 为了保证后面的学习演示需要安装下ts-node,这样后面的每个操作都能...

durban
昨天
1
0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部