Spark Streaming(1):Spark transformation in Java
博客专区 > Joe_Wu 的博客 > 博客详情
Spark Streaming(1):Spark transformation in Java
Joe_Wu 发表于4个月前
Spark Streaming(1):Spark transformation in Java
  • 发表于 4个月前
  • 阅读 9
  • 收藏 0
  • 点赞 0
  • 评论 0

腾讯云 技术升级10大核心产品年终让利>>>   

摘要: Sprak Streaming 基本算子Java实现: 1.map 2.flatMap 3.foldByKey 4.filter 5.groupByKey 6.reduceByKey 7.sortByKey 8.reduce 9.cogroup 10.countByKey 11.countByValue 12.union 13.cartesian

    1. map

	/**
	 * map算子对RDD中每个元素进程操作
	 * @return 
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[YOU, ARE, WELCOME]
	 */
	public static void mapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
		
        JavaRDD<String> words = lines.map(new Function<String, String>() {
			private static final long serialVersionUID = 7871833419981488401L;
			public String call(String v) throws Exception {
				return v.toUpperCase();
			}
		});
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}

    2. flatMap

	/**
	 * 对RDD中每个元素进程操作,它可以对单个元素再进行重组,并逐一作为单个新的元素返回
	 * @return 
	 *  数据转换前:
			[Hello You, Hello Me, Hello World]
		
		数据转换后:
			[Hello, You, Hello, Me, Hello, World]
	 */
	public static void flatMapHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("Hello You","Hello Me","Hello World");  
        JavaRDD<String> lines = sc.parallelize(list);  
          
        JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
			private static final long serialVersionUID = 4232115990240573928L;
			public Iterator<String> call(String v) throws Exception {
				return Arrays.asList(v.split(" ")).iterator();
			}
		});  
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();  
	}  

    3. foldByKey

	/**
	 * foldByKey相当于先groupByKey,再reduceByKey
	 * @return
	 *  数据转换前:
			[(name1,10), (name1,11), (name2,12), (name2,13)]
		
		数据转换后:
			[(name2,25), (name1,21)]

	 */
	public static void foldByKeyHandle() {
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
        List<Tuple2<String, Integer>> list = new ArrayList<Tuple2<String, Integer>>();
        list.add(new Tuple2<String, Integer>("name1", 10));
        list.add(new Tuple2<String, Integer>("name1", 11));
        list.add(new Tuple2<String, Integer>("name2", 12));
        list.add(new Tuple2<String, Integer>("name2", 13));
        
        JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
        JavaPairRDD<String, Integer> words = lines.foldByKey(0, new Function2<Integer, Integer, Integer>() {
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    4. filter

	/**
	 * 对RDD中的元素进行过滤,把不需要参与计算的垃圾数据过滤掉
	 * @return
	 *  数据转换前:
			[you, are, welcome]
		
		数据转换后:
			[you]
	 */
	public static void filterHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<String> list = Arrays.asList("you","are","welcome");  
        JavaRDD<String> lines = sc.parallelize(list);  
        
        JavaRDD<String> words = lines.filter(new Function<String, Boolean>() {
			private static final long serialVersionUID = 8698107583647672300L;
			public Boolean call(String v) throws Exception {
				return v.equals("you")? true :false;
			}
		});
        
        System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    5. groupByKey

	/**
	 * groupByKey: 相同key上的value进行聚合
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,[80, 97]), (class2,[100, 89])]
	 */
	public static <T> void groupByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> list = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		  
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(list);  
		JavaPairRDD<String, Iterable<Integer>> words = lines.groupByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    6. reduceByKey

	/**
	 * reduceByKey: 相同key上的value进行累加
	 * @return
	 *  数据转换前:
			[(class1,80), (class2,100), (class1,97), (class2,89)]
		
		数据转换后:
			[(class1,177), (class2,189)]
	 */
	public static <T> void reduceByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");  
		JavaSparkContext sc = new JavaSparkContext(conf);  
		
		List<Tuple2<String, Integer>> scoreList = Arrays.asList(  
		        new Tuple2<String, Integer>("class1", 80),  
		        new Tuple2<String, Integer>("class2", 100),  
		        new Tuple2<String, Integer>("class1", 97),  
		        new Tuple2<String, Integer>("class2", 89));  
		
		JavaPairRDD<String, Integer> lines = sc.parallelizePairs(scoreList);  
		JavaPairRDD<String, Integer> words = lines.reduceByKey(new Function2<Integer, Integer, Integer>() {  
			private static final long serialVersionUID = 7340267351025439164L;
			public Integer call(Integer v1, Integer v2) throws Exception {  
                return v1 + v2;  
            }  
        });  
          
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    7. sortByKey

	/**
	 * sortByKey: 通过key进行key-value的排序
	 * @return
	 *  数据转换前:
			[(90,MARRY), (40,BLACK), (70,JACK), (80,TOM)]
		
		数据转换后:
			[(40,BLACK), (70,JACK), (80,TOM), (90,MARRY)]
	 */
	public static <T> void sortByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list = Arrays.asList(
				new Tuple2<Integer, String>(90, "MARRY"),  
				new Tuple2<Integer, String>(40, "BLACK"),
				new Tuple2<Integer, String>(70, "JACK"),  
				new Tuple2<Integer, String>(80, "TOM")); 
		
		JavaPairRDD<Integer, String> lines = sc.parallelizePairs(list);
		JavaPairRDD<Integer, String> words = lines.sortByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    8. reduce

	/**
	 * reduce: 将数据两两计算,直到只剩一个元素
	 * @param
	 *  数据转换前:
			[1, 2, 3, 4, 5, 6, 7, 8, 9, 10]
		
		数据转换后:
			55
	 */
	public static void reduceHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Integer> list = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
		JavaRDD<Integer> lines = sc.parallelize(list);
		
		Integer words = lines.reduce(new Function2<Integer, Integer, Integer>() {
			private static final long serialVersionUID = 4540563817287193672L;
			public Integer call(Integer v1, Integer v2) throws Exception {
				return v1 + v2;
			}
		});
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words);
        sc.close(); 
	}

    9. cogroup

	/**
	 * cogroup: 相当于两个RDD分别各自groupByKey后再把这两个RDD进行groupByKey
	 * @return
	 *  数据转换前:
			[(1,TOM), (2,JACK), (3,MARRY)]
		
		数据转换前:
			[(1,78), (2,87), (3,89), (1,98), (2,90), (3,95), (4,95)]
		
		数据转换后:
			[(4,([],[95])), (1,([TOM],[78, 98])), (3,([MARRY],[89, 95])), (2,([JACK],[87, 90]))]
	 */
	public static <T> void cogroupHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<Integer, String>> list1 = Arrays.asList(  
                new Tuple2<Integer, String>(1, "TOM"),  
                new Tuple2<Integer, String>(2, "JACK"),  
                new Tuple2<Integer, String>(3, "MARRY"));  
        
        List<Tuple2<Integer, Integer>> list2 = Arrays.asList(  
                new Tuple2<Integer, Integer>(1, 78),  
                new Tuple2<Integer, Integer>(2, 87),  
                new Tuple2<Integer, Integer>(3, 89),  
                new Tuple2<Integer, Integer>(1, 98),  
                new Tuple2<Integer, Integer>(2, 90),  
                new Tuple2<Integer, Integer>(3, 95),  
                new Tuple2<Integer, Integer>(4, 95));  
          
        JavaPairRDD<Integer, String> lines1 = sc.parallelizePairs(list1);  
        JavaPairRDD<Integer, Integer> lines2 = sc.parallelizePairs(list2);  
        JavaPairRDD<Integer, Tuple2<Iterable<String>, Iterable<Integer>>> words = lines1.cogroup(lines2);  
        
        System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
        System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close(); 
	}

    10. countByKey

	/**
	 * countByKeyHandle: 统计相同key元素数量
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,MARRY), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{name=3, age1=2, age2=1}
	 */
	public static <T> void countByKeyHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","MARRY"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list); 
		Map<String, Long> words = lines.countByKey();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close(); 
	}

    11. countByValue

	/**
	 * countByValue : 如果元素为Tuple2<K,V>结构,首先把元素的K和V组合作为新的Map的Key, 
	 * 				  再把相同元素的数量作为Map的Value.
	 * 			      如果元素为String结果,则把元素的Value作为新Map的Key, 
	 *                把统计元素结果作为Value
	 * @return
	 *  数据转换前:
			[(name,JOS), (name,JOS), (name,TOM), (age1,24), (age1,25), (age2,25)]
		
		数据转换后:
			{(name,TOM)=1, (age1,24)=1, (name,JOS)=2, (age2,25)=1, (age1,25)=1}
	 */
	public static <T> void countByValueHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<Tuple2<String, String>> list = Arrays.asList(
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","JOS"),
				new Tuple2<String, String>("name","TOM"),
				new Tuple2<String, String>("age1","24"),
				new Tuple2<String, String>("age1","25"),
				new Tuple2<String, String>("age2","25"));
		
		JavaPairRDD<String, String> lines = sc.parallelizePairs(list);
		Map<Tuple2<String, String>, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    12. countByValue

	/**
	 * @return
	 *  数据转换前:
			[name, age, high, name]
		
		数据转换后:
			{high=1, name=2, age=1}
	 */
	public static <T> void countByValueHandle2(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list = Arrays.asList("name","age","high","name");
		
		JavaRDD<String> lines = sc.parallelize(list);
		Map<String, Long> words = lines.countByValue();
		
		System.out.println("数据转换前:\n\t" + lines.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.toString());
        sc.close();
	}

    13. union

	/**
	 * union: 两个RDD进行合并,不去重
	 * @return
	 *  数据转换前:
			[One, Two, Zero]
		
		数据转换前:
			[One, Four]
		
		数据转换后:
			[One, Two, Zero, One, Four]
	 */
	public static <T> void unionHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("One","Two","Zero");
		List<String> list2 = Arrays.asList("One","Four");
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<String> lines2 = sc.parallelize(list2);
		
		JavaRDD<String> words = lines1.union(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

    14. cartesian

	/**
	 * cartesian: 笛卡尔积计算
	 * @return 
	 *  数据转换前:
			[A, B, C]
		
		数据转换前:
			[1, 2, 3]
		
		数据转换后:
			[(A,1), (A,2), (A,3), (B,1), (B,2), (B,3), (C,1), (C,2), (C,3)]
	 */
	public static void cartesianHandle(){
		SparkConf conf = new SparkConf().setAppName("TransformationTest").setMaster("local");
		JavaSparkContext sc = new JavaSparkContext(conf);
		
		List<String> list1 = Arrays.asList("A","B","C");
		List<Integer> list2 = Arrays.asList(1,2,3);
		JavaRDD<String> lines1 = sc.parallelize(list1);
		JavaRDD<Integer> lines2 = sc.parallelize(list2);
		
		JavaPairRDD<String, Integer> words = lines1.cartesian(lines2);
		
		System.out.println("数据转换前:\n\t" + lines1.collect() + "\n");
		System.out.println("数据转换前:\n\t" + lines2.collect() + "\n");
        System.out.println("数据转换后:\n\t" + words.collect());
        sc.close();
	}

 

标签: Spark Streaming
共有 人打赏支持
粉丝 2
博文 9
码字总数 6567
×
Joe_Wu
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: