大数据-Spark Java版,核心算子使用简介

原创
2017/05/24 16:21
阅读数 689

package com.spark.streaming.app;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

/**
 * spark核心算子使用介绍
 * @author
 */
public class TransformationApp {
    private static JavaSparkContext sc;

    /**
     * map、flatmap及mapPartitions map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。
     */
    private static void mapTest() {

        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000");
        sc = new JavaSparkContext("local", "Spark App", conf);
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        // map算子是对RDD中的每个元素都进行一次操作后生成一个新的RDD
        // map是有序操作
        JavaRDD<Integer> bRDD = arrayRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 * 2;
            }

        });
        System.out.println(arrayRDD.collect());
        System.out.println(bRDD.collect());

    }
    /**
     * 与Map的区别是,可以向原有元素中添加新的元素
     */
    private static void flatMapTest() {
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000");
        sc = new JavaSparkContext("local", "Spark App", conf);
        // 演示flatMap
        int[] array = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        List<Integer> list1 = new ArrayList<Integer>();
        for (Integer i : array) {
            list1.add(i);
        }
        JavaRDD<Integer> rdd = sc.parallelize(list1, 2);
        JavaRDD<Integer> result = rdd
                .flatMap(new FlatMapFunction<Integer, Integer>() {
                    @Override
                    public Iterator<Integer> call(Integer t) throws Exception {
                        List<Integer> list = new ArrayList<Integer>();
                        for (int i = 0; i < t; i++) {
                            list.add(t + i);
                        }
                        return list.iterator(); // 返回的这个list就是传入的元素及新增的内容
                    }
                });
        System.out.println(result.collect());
    }
    /**
     * 与Map 、flatMap区别可以一次性获取RDD中所有元素
     */
    private static void mapPartitionsTest() {
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "21474800000000"); // 因为jvm无法获得足够的资源
        sc = new JavaSparkContext("local", "Spark App", conf); // 本地模式使用local
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        // mapPartitions就是将整个RDD传入,并对整个RDD操作后传出
        JavaRDD<Double> resultRDD = arrayRDD
                .mapPartitions(new FlatMapFunction<Iterator<Integer>, Double>() {
                    @Override
                    public Iterator<Double> call(Iterator<Integer> t)
                            throws Exception {
                        final double interest = 0.1;
                        List<Double> list = new ArrayList<Double>();
                        double money = 10000;
                        // 因为有整个RDD的所有元素,所以就能循环计算,能节省很多代码及算法复杂度
                        while (t.hasNext()) {
                            money += money * interest;
                            list.add(money);
                        }
                        return list.iterator();
                    }
                });
        System.out.println(resultRDD.collect());

    }
    /**
     * 获取元素的key  value形式
     */
    private static void keyValueTest() {
        List<String> list = Arrays.asList("张三", "李四", "王五", "赵六", "田七");
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        // conf.set("spark.testing.memory", "2147480000");
        JavaRDD<String> rdd = sc.parallelize(list, 1);
        // 算子keyBy即是传入的RDD作为值,返回值作为键
        JavaPairRDD<Integer, String> result = rdd
                .keyBy(new Function<String, Integer>() {
                    @Override
                    public Integer call(String v1) throws Exception {
                        return v1.length();
                    }

                });
        System.out.println(result.collect());
    }
    /**
     * 用于修改Value值
     */
    private static void mapValuesTest() {
        List<String> list = Arrays.asList("张三", "李四", "王五", "赵六", "田七");
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        // conf.set("spark.testing.memory", "2147480000");
        JavaRDD<String> rdd = sc.parallelize(list, 1);
        // 算子keyBy即是传入的RDD作为值,返回值作为键
        JavaPairRDD<Integer, String> result = rdd
                .keyBy(new Function<String, Integer>() {
                    @Override
                    public Integer call(String v1) throws Exception {
                        return v1.length();
                    }

                });
        JavaPairRDD<Integer, String> result1 = result
                .mapValues(new Function<String, String>() {
                    @Override
                    public String call(String v1) throws Exception {
                        return v1 + "_改了";
                    }
                });
        System.out.println(result1.collect());
    }
    /**
     * 用于扩展Value值
     */
    private static void flatMapValuesTest() {
        List<String> list = Arrays.asList("张三", "李四", "王五", "赵六", "田七");
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        // conf.set("spark.testing.memory", "2147480000");
        JavaRDD<String> rdd = sc.parallelize(list, 1);
        // 算子keyBy即是传入的RDD作为值,返回值作为键
        JavaPairRDD<Integer, String> result = rdd
                .keyBy(new Function<String, Integer>() {
                    @Override
                    public Integer call(String v1) throws Exception {
                        return v1.length();
                    }

                });
        JavaPairRDD<Integer, String> result1 = result
                .mapValues(new Function<String, String>() {
                    @Override
                    public String call(String v1) throws Exception {
                        return v1 + "_改了";
                    }
                });
        JavaPairRDD<Integer, String> result2 = result1
                .flatMapValues(new Function<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> call(String v1) throws Exception {
                        System.out.println("遍历元素:" + v1);
                        // List<String> list = Arrays.asList("wrm", "wln",
                        // "张三");
                        List<String> list = new ArrayList();
                        list.add("wrm");
                        return list;
                    }
                });
        System.out.println(result2.collect());
    }

    /**
     * 累加所有元素值
     */
    private static void reduceTest() {
        List<Integer> list = Arrays.asList(1, 2, 3, 3, 3, 6, 7, 8, 9);
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        // 运行reduce时,会两个两个的从RDD中取出对象,然后进行一定操作合并
        // 合并后的对象会和一个新的对象同时传入作为输出,直到得到最后一个结果
        Integer result = arrayRDD
                .reduce(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer arg0, Integer arg1)
                            throws Exception {
                        // TODO Auto-generated method stub
                        return arg0 + arg1;
                    }
                });
        System.out.println(result);
    }

    /**
     * 根据Key累计值
     */
    private static void reduceByKeyTest() {

        // / List<String> list=Arrays.asList("aa","cc","aa","cc","dd");
        List<Integer> list = Arrays.asList(0, 1, 1, 2, 2, 3, 4, 3);
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        // 运行reduce时,会两个两个的从RDD中取出对象,然后进行一定操作合并
        // 合并后的对象会和一个新的对象同时传入作为输出,直到得到最后一个结果\
        JavaPairRDD<String, Integer> result2 = arrayRDD
                .keyBy(new Function<Integer, String>() {
                    @Override
                    public String call(Integer v1) throws Exception {
                        String result = "A" + v1;
                        return result;
                    }
                });
        // 运行reduceByKey时,会将key值相同的组合在一起做call方法中的操作。
        JavaPairRDD<String, Integer> result3 = result2
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1, Integer v2)
                            throws Exception {
                        return v1 + v2;
                    }
                });
        System.out.println(result2.collect());

        System.out.println(result3.collect());
    }
    /**
     * 拼接,类似于数据库中的union
     */
    private static void uninTest() {
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        List<Integer> list = Arrays.asList(1, 2, 2, 4);
        JavaRDD<Integer> student = sc.parallelize(list);
        JavaRDD<Integer> student2 = student
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }
                });
        JavaPairRDD<String, Integer> studentinfo = student
                .keyBy(new Function<Integer, String>() {

                    @Override
                    public String call(Integer v1) throws Exception {
                        return "A" + v1;
                    }

                });
        JavaPairRDD<String, Integer> studentinfo2 = student2
                .keyBy(new Function<Integer, String>() {

                    @Override
                    public String call(Integer v1) throws Exception {
                        return "A" + v1;
                    }

                });
        // union可以将两个RDD中所有的元素合并为一个RDD,但是得确保要合并的两个RDD的泛型是相同的。
        // 且union不去重如果要去重,可以使用distinct()方法
        // 合并
        JavaPairRDD<String, Integer> result = studentinfo.union(studentinfo2);
        System.out.println("未去重:" + result.collect());
        // 去重
        System.out.println("去重:" + result.distinct().collect());
    }

    /**
     * union只是将两个RDD简单的累加在一起,而join则不一样,join类似于hadoop中的combin操作,只是少了排序这一段,
     * 再说join之前说说groupByKey,因为join可以理解为union与groupByKey的结合:
     * groupBy是将RDD中的元素进行分组,
     * 组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。即:
     */
    private static void groupByTest() {

        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        List<Integer> list = Arrays.asList(1, 2, 2, 4);
        JavaRDD<Integer> student = sc.parallelize(list);
        JavaRDD<Integer> student2 = student
                .map(new Function<Integer, Integer>() {

                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }
                });
        JavaPairRDD<String, Integer> studentinfo = student
                .keyBy(new Function<Integer, String>() {

                    @Override
                    public String call(Integer v1) throws Exception {
                        return "A" + v1;
                    }

                });
        JavaPairRDD<String, Integer> studentinfo2 = student2
                .keyBy(new Function<Integer, String>() {

                    @Override
                    public String call(Integer v1) throws Exception {
                        return "A" + v1;
                    }

                });

        JavaPairRDD<String, Iterable<Integer>> result = studentinfo
                .groupByKey();
        System.out.println(result.collect());

        // groupBy与join类似于hadoop中的combin操作,只是少了排序这一段
        // groupBy是将RDD中的元素进行分组,组名是call方法中的返回值
        // 而groupByKey是将PairRDD中拥有相同key值得元素归为一组
        JavaPairRDD<String, Iterable<Integer>> result1 = studentinfo
                .groupByKey();
        System.out.println(result1.collect());
        // join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合
        JavaPairRDD<String, Tuple2<Integer, Integer>> result2 = studentinfo
                .join(studentinfo2);
        System.out.println(result2.collect());

    }

    /**
     * spark的算子中还有一些用于特定计算的算子,例如sample用作抽样。他的用法即所需注意事项在我的示例中写得很详细,就不赘述了,直接上代码:
     */
    private static void OperatorSampleTest() {

        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]");
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        sc = new JavaSparkContext("local", "Spark App", conf);

        List<Integer> list = new ArrayList<Integer>();
        for (int i = 1; i <= 100; i++) {
            list.add(i);
        }
        JavaRDD<Integer> any = sc.parallelize(list);
        // sample用来从RDD中抽取样本。他有三个参数
        // withReplacement:表示样本是否放回 true放回
        // fraction:抽取样本的比例
        // seed:随机数生成种子
        // 由于样本的抽取其实是以一个固定的算法实现的,所以要达到随机抽样需用随机数生成seed
        JavaRDD<Integer> sample = any.sample(true, 0.1, 0);
        System.out.println("seed=0:" + sample.collect());
        sample = any.sample(true, 0.1, 0);
        System.out.println("seed=0:" + sample.collect()); // 由于seed相同,所以抽出样本是相同的

        // 这里使用系统时间作为seed,发现抽出的样本是随机的
        JavaRDD<Integer> sample1 = any.sample(true, 0.1,
                System.currentTimeMillis());
        System.out.println("seed随机生成1" + sample1.collect());
        sample1 = any.sample(true, 0.1, System.currentTimeMillis());
        System.out.println("seed随机生成2" + sample1.collect());
    }
    /**
     * 笛卡尔统计
     */
    private static void operatorCartesianTest() {

        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]");
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        sc = new JavaSparkContext("local", "Spark App", conf);

        List<Integer> list = Arrays.asList(1, 2, 3, 4);
        List<Integer> list2 = Arrays.asList(5, 6, 7, 1);
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        JavaRDD<Integer> arrayRDD2 = sc.parallelize(list2);
        // 算子cartesian就是用来求两个RDD的笛卡尔积的。
        JavaPairRDD<Integer, Integer> result = arrayRDD.cartesian(arrayRDD2);
        System.out.println(result.collect());

    }

    public static void main(String[] args) {
        // flatMapTest();
        // mapPartitionsTest();
        // keyValueTest();
        // mapValuesTest();
        // flatMapValuesTest();
        // reduceTest();
        // reduceByKeyTest();
        // uninTest();
        // groupByTest();
        OperatorSampleTest();
    }

}
 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部