文档章节

大数据-Spark Java版,核心算子使用简介

zhizhuode
 zhizhuode
发布于 2017/05/24 16:21
字数 2062
阅读 153
收藏 0

package com.spark.streaming.app;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
import java.util.List;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.api.java.function.Function2;
import scala.Tuple2;

/**
 * spark核心算子使用介绍
 * @author
 */
public class TransformationApp {
    private static JavaSparkContext sc;

    /**
     * map、flatmap及mapPartitions map十分容易理解,他是将源JavaRDD的一个一个元素的传入call方法,并经过算法后一个一个的返回从而生成一个新的JavaRDD。
     */
    private static void mapTest() {

        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000");
        sc = new JavaSparkContext("local", "Spark App", conf);
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        // map算子是对RDD中的每个元素都进行一次操作后生成一个新的RDD
        // map是有序操作
        JavaRDD<Integer> bRDD = arrayRDD.map(new Function<Integer, Integer>() {
            @Override
            public Integer call(Integer v1) throws Exception {
                return v1 * 2;
            }

        });
        System.out.println(arrayRDD.collect());
        System.out.println(bRDD.collect());

    }
    /**
     * 与Map的区别是,可以向原有元素中添加新的元素
     */
    private static void flatMapTest() {
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "2147480000");
        sc = new JavaSparkContext("local", "Spark App", conf);
        // 演示flatMap
        int[] array = new int[] { 1, 2, 3, 4, 5, 6, 7, 8, 9 };
        List<Integer> list1 = new ArrayList<Integer>();
        for (Integer i : array) {
            list1.add(i);
        }
        JavaRDD<Integer> rdd = sc.parallelize(list1, 2);
        JavaRDD<Integer> result = rdd
                .flatMap(new FlatMapFunction<Integer, Integer>() {
                    @Override
                    public Iterator<Integer> call(Integer t) throws Exception {
                        List<Integer> list = new ArrayList<Integer>();
                        for (int i = 0; i < t; i++) {
                            list.add(t + i);
                        }
                        return list.iterator(); // 返回的这个list就是传入的元素及新增的内容
                    }
                });
        System.out.println(result.collect());
    }
    /**
     * 与Map 、flatMap区别可以一次性获取RDD中所有元素
     */
    private static void mapPartitionsTest() {
        SparkConf conf = new SparkConf();
        conf.set("spark.testing.memory", "21474800000000"); // 因为jvm无法获得足够的资源
        sc = new JavaSparkContext("local", "Spark App", conf); // 本地模式使用local
        List<Integer> list = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9);
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        // mapPartitions就是将整个RDD传入,并对整个RDD操作后传出
        JavaRDD<Double> resultRDD = arrayRDD
                .mapPartitions(new FlatMapFunction<Iterator<Integer>, Double>() {
                    @Override
                    public Iterator<Double> call(Iterator<Integer> t)
                            throws Exception {
                        final double interest = 0.1;
                        List<Double> list = new ArrayList<Double>();
                        double money = 10000;
                        // 因为有整个RDD的所有元素,所以就能循环计算,能节省很多代码及算法复杂度
                        while (t.hasNext()) {
                            money += money * interest;
                            list.add(money);
                        }
                        return list.iterator();
                    }
                });
        System.out.println(resultRDD.collect());

    }
    /**
     * 获取元素的key  value形式
     */
    private static void keyValueTest() {
        List<String> list = Arrays.asList("张三", "李四", "王五", "赵六", "田七");
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        // conf.set("spark.testing.memory", "2147480000");
        JavaRDD<String> rdd = sc.parallelize(list, 1);
        // 算子keyBy即是传入的RDD作为值,返回值作为键
        JavaPairRDD<Integer, String> result = rdd
                .keyBy(new Function<String, Integer>() {
                    @Override
                    public Integer call(String v1) throws Exception {
                        return v1.length();
                    }

                });
        System.out.println(result.collect());
    }
    /**
     * 用于修改Value值
     */
    private static void mapValuesTest() {
        List<String> list = Arrays.asList("张三", "李四", "王五", "赵六", "田七");
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        // conf.set("spark.testing.memory", "2147480000");
        JavaRDD<String> rdd = sc.parallelize(list, 1);
        // 算子keyBy即是传入的RDD作为值,返回值作为键
        JavaPairRDD<Integer, String> result = rdd
                .keyBy(new Function<String, Integer>() {
                    @Override
                    public Integer call(String v1) throws Exception {
                        return v1.length();
                    }

                });
        JavaPairRDD<Integer, String> result1 = result
                .mapValues(new Function<String, String>() {
                    @Override
                    public String call(String v1) throws Exception {
                        return v1 + "_改了";
                    }
                });
        System.out.println(result1.collect());
    }
    /**
     * 用于扩展Value值
     */
    private static void flatMapValuesTest() {
        List<String> list = Arrays.asList("张三", "李四", "王五", "赵六", "田七");
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        // conf.set("spark.testing.memory", "2147480000");
        JavaRDD<String> rdd = sc.parallelize(list, 1);
        // 算子keyBy即是传入的RDD作为值,返回值作为键
        JavaPairRDD<Integer, String> result = rdd
                .keyBy(new Function<String, Integer>() {
                    @Override
                    public Integer call(String v1) throws Exception {
                        return v1.length();
                    }

                });
        JavaPairRDD<Integer, String> result1 = result
                .mapValues(new Function<String, String>() {
                    @Override
                    public String call(String v1) throws Exception {
                        return v1 + "_改了";
                    }
                });
        JavaPairRDD<Integer, String> result2 = result1
                .flatMapValues(new Function<String, Iterable<String>>() {
                    @Override
                    public Iterable<String> call(String v1) throws Exception {
                        System.out.println("遍历元素:" + v1);
                        // List<String> list = Arrays.asList("wrm", "wln",
                        // "张三");
                        List<String> list = new ArrayList();
                        list.add("wrm");
                        return list;
                    }
                });
        System.out.println(result2.collect());
    }

    /**
     * 累加所有元素值
     */
    private static void reduceTest() {
        List<Integer> list = Arrays.asList(1, 2, 3, 3, 3, 6, 7, 8, 9);
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        // 运行reduce时,会两个两个的从RDD中取出对象,然后进行一定操作合并
        // 合并后的对象会和一个新的对象同时传入作为输出,直到得到最后一个结果
        Integer result = arrayRDD
                .reduce(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer arg0, Integer arg1)
                            throws Exception {
                        // TODO Auto-generated method stub
                        return arg0 + arg1;
                    }
                });
        System.out.println(result);
    }

    /**
     * 根据Key累计值
     */
    private static void reduceByKeyTest() {

        // / List<String> list=Arrays.asList("aa","cc","aa","cc","dd");
        List<Integer> list = Arrays.asList(0, 1, 1, 2, 2, 3, 4, 3);
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        // 运行reduce时,会两个两个的从RDD中取出对象,然后进行一定操作合并
        // 合并后的对象会和一个新的对象同时传入作为输出,直到得到最后一个结果\
        JavaPairRDD<String, Integer> result2 = arrayRDD
                .keyBy(new Function<Integer, String>() {
                    @Override
                    public String call(Integer v1) throws Exception {
                        String result = "A" + v1;
                        return result;
                    }
                });
        // 运行reduceByKey时,会将key值相同的组合在一起做call方法中的操作。
        JavaPairRDD<String, Integer> result3 = result2
                .reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1, Integer v2)
                            throws Exception {
                        return v1 + v2;
                    }
                });
        System.out.println(result2.collect());

        System.out.println(result3.collect());
    }
    /**
     * 拼接,类似于数据库中的union
     */
    private static void uninTest() {
        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        List<Integer> list = Arrays.asList(1, 2, 2, 4);
        JavaRDD<Integer> student = sc.parallelize(list);
        JavaRDD<Integer> student2 = student
                .map(new Function<Integer, Integer>() {
                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }
                });
        JavaPairRDD<String, Integer> studentinfo = student
                .keyBy(new Function<Integer, String>() {

                    @Override
                    public String call(Integer v1) throws Exception {
                        return "A" + v1;
                    }

                });
        JavaPairRDD<String, Integer> studentinfo2 = student2
                .keyBy(new Function<Integer, String>() {

                    @Override
                    public String call(Integer v1) throws Exception {
                        return "A" + v1;
                    }

                });
        // union可以将两个RDD中所有的元素合并为一个RDD,但是得确保要合并的两个RDD的泛型是相同的。
        // 且union不去重如果要去重,可以使用distinct()方法
        // 合并
        JavaPairRDD<String, Integer> result = studentinfo.union(studentinfo2);
        System.out.println("未去重:" + result.collect());
        // 去重
        System.out.println("去重:" + result.distinct().collect());
    }

    /**
     * union只是将两个RDD简单的累加在一起,而join则不一样,join类似于hadoop中的combin操作,只是少了排序这一段,
     * 再说join之前说说groupByKey,因为join可以理解为union与groupByKey的结合:
     * groupBy是将RDD中的元素进行分组,
     * 组名是call方法中的返回值,而顾名思义groupByKey是将PairRDD中拥有相同key值得元素归为一组。即:
     */
    private static void groupByTest() {

        SparkConf conf = new SparkConf();
        sc = new JavaSparkContext("local", "Spark App", conf);
        conf.setMaster("local[2]");
        List<Integer> list = Arrays.asList(1, 2, 2, 4);
        JavaRDD<Integer> student = sc.parallelize(list);
        JavaRDD<Integer> student2 = student
                .map(new Function<Integer, Integer>() {

                    @Override
                    public Integer call(Integer v1) throws Exception {
                        return v1 * 2;
                    }
                });
        JavaPairRDD<String, Integer> studentinfo = student
                .keyBy(new Function<Integer, String>() {

                    @Override
                    public String call(Integer v1) throws Exception {
                        return "A" + v1;
                    }

                });
        JavaPairRDD<String, Integer> studentinfo2 = student2
                .keyBy(new Function<Integer, String>() {

                    @Override
                    public String call(Integer v1) throws Exception {
                        return "A" + v1;
                    }

                });

        JavaPairRDD<String, Iterable<Integer>> result = studentinfo
                .groupByKey();
        System.out.println(result.collect());

        // groupBy与join类似于hadoop中的combin操作,只是少了排序这一段
        // groupBy是将RDD中的元素进行分组,组名是call方法中的返回值
        // 而groupByKey是将PairRDD中拥有相同key值得元素归为一组
        JavaPairRDD<String, Iterable<Integer>> result1 = studentinfo
                .groupByKey();
        System.out.println(result1.collect());
        // join是将两个PairRDD合并,并将有相同key的元素分为一组,可以理解为groupByKey和Union的结合
        JavaPairRDD<String, Tuple2<Integer, Integer>> result2 = studentinfo
                .join(studentinfo2);
        System.out.println(result2.collect());

    }

    /**
     * spark的算子中还有一些用于特定计算的算子,例如sample用作抽样。他的用法即所需注意事项在我的示例中写得很详细,就不赘述了,直接上代码:
     */
    private static void OperatorSampleTest() {

        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]");
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        sc = new JavaSparkContext("local", "Spark App", conf);

        List<Integer> list = new ArrayList<Integer>();
        for (int i = 1; i <= 100; i++) {
            list.add(i);
        }
        JavaRDD<Integer> any = sc.parallelize(list);
        // sample用来从RDD中抽取样本。他有三个参数
        // withReplacement:表示样本是否放回 true放回
        // fraction:抽取样本的比例
        // seed:随机数生成种子
        // 由于样本的抽取其实是以一个固定的算法实现的,所以要达到随机抽样需用随机数生成seed
        JavaRDD<Integer> sample = any.sample(true, 0.1, 0);
        System.out.println("seed=0:" + sample.collect());
        sample = any.sample(true, 0.1, 0);
        System.out.println("seed=0:" + sample.collect()); // 由于seed相同,所以抽出样本是相同的

        // 这里使用系统时间作为seed,发现抽出的样本是随机的
        JavaRDD<Integer> sample1 = any.sample(true, 0.1,
                System.currentTimeMillis());
        System.out.println("seed随机生成1" + sample1.collect());
        sample1 = any.sample(true, 0.1, System.currentTimeMillis());
        System.out.println("seed随机生成2" + sample1.collect());
    }
    /**
     * 笛卡尔统计
     */
    private static void operatorCartesianTest() {

        SparkConf conf = new SparkConf();
        conf.setMaster("local[2]");
        conf.set("spark.testing.memory", "2147480000"); // 因为jvm无法获得足够的资源
        sc = new JavaSparkContext("local", "Spark App", conf);

        List<Integer> list = Arrays.asList(1, 2, 3, 4);
        List<Integer> list2 = Arrays.asList(5, 6, 7, 1);
        JavaRDD<Integer> arrayRDD = sc.parallelize(list);
        JavaRDD<Integer> arrayRDD2 = sc.parallelize(list2);
        // 算子cartesian就是用来求两个RDD的笛卡尔积的。
        JavaPairRDD<Integer, Integer> result = arrayRDD.cartesian(arrayRDD2);
        System.out.println(result.collect());

    }

    public static void main(String[] args) {
        // flatMapTest();
        // mapPartitionsTest();
        // keyValueTest();
        // mapValuesTest();
        // flatMapValuesTest();
        // reduceTest();
        // reduceByKeyTest();
        // uninTest();
        // groupByTest();
        OperatorSampleTest();
    }

}
 

© 著作权归作者所有

zhizhuode
粉丝 1
博文 40
码字总数 39020
作品 0
海淀
高级程序员
私信 提问
阿里年薪50WJAVA工程师转大数据学习路线!

大数据有两个方向,一个是偏计算机的,另一个是偏经济的。你学过Java,所以你可以偏将计算机的。 Java程序员想转大数据可行吗?Java是全世界使用人数最多的编程语言。不少程序员选择Java做为...

JAVA丶学习
2018/04/25
0
0
Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
2018/05/24
0
0
[Spark性能调优] 第四章 : Spark Shuffle 中 JVM 内存使用及配置内幕详情

本课主题 JVM 內存使用架构剖析 Spark 1.6.x 和 Spark 2.x 的 JVM 剖析 Spark 1.6.x 以前 on Yarn 计算内存使用案例 Spark Unified Memory 的运行原理和机制 引言 Spark 从1.6.x 开始对 JVM ...

hblt-j
2017/11/08
65
0
[Spark]Spark RDD 指南一 引入Spark

2.3.0版本:Spark2.3.0 引入Spark 1. Java版 Spark 2.1.1适用于Java 7及更高版本。 如果您使用的是Java 8,则Spark支持使用lambda表达式来简洁地编写函数,否则可以使用org.apache.spark.ap...

sjf0115
2017/06/08
0
0
浅谈 Spark 的多语言支持

作者:郑锴,花名铁杰,阿里巴巴高级技术专家,Apache Hadoop PMC,Apache Kerby 创立者。深耕分布式系统开发和开源大数据多年,先后专注在安全,存储和计算领域。之前在 Intel,目前转战阿里...

开源大数据EMR
04/23
0
0

没有更多内容

加载失败,请刷新页面

加载更多

spring cloud

一、从面试题入手 1.1、什么事微服务 1.2、微服务之间如何独立通讯的 1.3、springCloud和Dubbo有哪些区别 1.通信机制:DUbbo基于RPC远程过程调用;微服务cloud基于http restFUL API 1.4、spr...

榴莲黑芝麻糊
55分钟前
2
0
Executor线程池原理与源码解读

线程池为线程生命周期的开销和资源不足问题提供了解决方 案。通过对多个任务重用线程,线程创建的开销被分摊到了多个任务上。 线程实现方式 Thread、Runnable、Callable //实现Runnable接口的...

小强的进阶之路
昨天
6
0
maven 环境隔离

解决问题 即 在 resource 文件夹下面 ,新增对应的资源配置文件夹,对应 开发,测试,生产的不同的配置内容 <resources> <resource> <directory>src/main/resources.${deplo......

之渊
昨天
8
0
详解箭头函数和普通函数的区别以及箭头函数的注意事项、不适用场景

箭头函数是ES6的API,相信很多人都知道,因为其语法上相对于普通函数更简洁,深受大家的喜爱。就是这种我们日常开发中一直在使用的API,大部分同学却对它的了解程度还是不够深... 普通函数和...

OBKoro1
昨天
7
0
轻量级 HTTP(s) 代理 TinyProxy

CentOS 下安装 TinyProxy yum install -y tinyproxy 启动、停止、重启 # 启动service tinyproxy start# 停止service tinyproxy stop# 重启service tinyproxy restart 相关配置 默认...

Anoyi
昨天
2
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部