文档章节

spark sql简单示例

cloud-coder
 cloud-coder
发布于 2015/06/16 17:36
字数 437
阅读 2.1W
收藏 8

钉钉、微博极速扩容黑科技,点击观看阿里云弹性计算年度发布会!>>>

运行环境

集群环境:CDH5.3.0

具体JAR版本如下:

spark版本:1.2.0-cdh5.3.0

hive版本:0.13.1-cdh5.3.0

hadoop版本:2.5.0-cdh5.3.0

spark sql的JAVA版简单示例

  1. spark sql直接查询JSON格式的数据

  2. spark sql的自定义函数

  3. spark sql查询hive上面的表

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.DataType;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;


/**
 * 注意:
 * 使用JavaHiveContext时
 * 1:需要在classpath下面增加三个配置文件:hive-site.xml,core-site.xml,hdfs-site.xml
 * 2:需要增加postgresql或mysql驱动包的依赖
 * 3:需要增加hive-jdbc,hive-exec的依赖
 *
 */
public class SimpleDemo {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);
        JavaHiveContext hiveCtx = new JavaHiveContext(sc);
//        testQueryJson(sqlCtx);
//        testUDF(sc, sqlCtx);
        testHive(hiveCtx);
        sc.stop();
        sc.close();
    }

    //测试spark sql直接查询JSON格式的数据
    public static void testQueryJson(JavaSQLContext sqlCtx) {
        JavaSchemaRDD rdd = sqlCtx.jsonFile("file:///D:/tmp/tmp/json.txt");
        rdd.printSchema();

        // Register the input schema RDD
        rdd.registerTempTable("account");

        JavaSchemaRDD accs = sqlCtx.sql("SELECT address, email,id,name FROM account ORDER BY id LIMIT 10");
        List<Row> result = accs.collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getInt(2) + ","
                    + row.getString(3));
        }

        JavaRDD<String> names = accs.map(new Function<Row, String>() {
            @Override
            public String call(Row row) throws Exception {
                return row.getString(3);
            }
        });
        System.out.println(names.collect());
    }


    //测试spark sql的自定义函数
    public static void testUDF(JavaSparkContext sc, JavaSQLContext sqlCtx) {
        // Create a account and turn it into a Schema RDD
        ArrayList<AccountBean> accList = new ArrayList<AccountBean>();
        accList.add(new AccountBean(1, "lily", "lily@163.com", "gz tianhe"));
        JavaRDD<AccountBean> accRDD = sc.parallelize(accList);

        JavaSchemaRDD rdd = sqlCtx.applySchema(accRDD, AccountBean.class);

        rdd.registerTempTable("acc");

        // 编写自定义函数UDF
        sqlCtx.registerFunction("strlength", new UDF1<String, Integer>() {
            @Override
            public Integer call(String str) throws Exception {
                return str.length();
            }
        }, DataType.IntegerType);

        // 数据查询
        List<Row> result = sqlCtx.sql("SELECT strlength('name'),name,address FROM acc LIMIT 10").collect();
        for (Row row : result) {
            System.out.println(row.getInt(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }

    //测试spark sql查询hive上面的表
    public static void testHive(JavaHiveContext hiveCtx) {
        List<Row> result = hiveCtx.sql("SELECT foo,bar,name from pokes2 limit 10").collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }
}


cloud-coder
粉丝 251
博文 193
码字总数 141277
作品 0
广州
架构师
私信 提问
加载中
此博客有 2 条评论,请先登录后再查看。
【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)

本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角 Spark SQL、DataFrames 和 Datase...

osc_qr98xq3d
2018/09/19
1
0
[转] Spark快速入门指南 – Spark安装与基础使用

[From] https://blog.csdn.net/w405722907/article/details/77943331 Spark快速入门指南 – Spark安装与基础使用 2017年09月12日 11:35:27 阅读数:104 本教程由给力星出品,转载请注明。 Ap...

osc_ld3u7lka
2018/07/27
6
0
【Spark】SparkSQL入门解析(二)

【一】SparkSQL数据源 【1】Spark SQL的DataFrame接口支持多种数据源的操作 一个DataFrame可以进行RDDs方式的操作,也可以被注册为临时表,把DataFrame注册为临时表之后,就可以对该DataFra...

osc_y7d83ufc
04/18
8
0
Spark实战系列目录

1 Spark rdd -- action函数详解与实战 2 Spark rdd -- transformations函数详解与实战(上) 3 Spark rdd -- transformations函数详解与实战(下) 4 Spark rdd -- aggregate函数深度剖析与应用 ...

osc_9eqghyd8
2019/05/23
0
0
spark教程(二)-shell操作

spark 支持 shell 操作 shell 主要用于调试,所以简单介绍用法即可 支持多种语言的 shell 包括 scala shell、python shell、R shell、SQL shell 等 spark-shell 用于在 scala 的 shell 模式下...

osc_qukgacve
2019/10/15
1
0

没有更多内容

加载失败,请刷新页面

加载更多

再见,Eclipse...

程序员的成长之路 互联网/程序员/技术/资料共享 关注 阅读本文大概需要 6 分钟。 来源:cnblogs.com/ouyida3/p/9901312.html 最近,改用了 IDEA,同事都说我投敌了。当然,这些同事都是和我一...

良月柒
05/24
0
0
kafka分区数过多引发的弊端

点击上方蓝字“极客运维”一起运筹帷幄 上篇文章我们了解到,如果一个topic分区越多,理论上整个集群所能达到的吞吐量就越大。那么,分区数越多就越好吗?显然不是。今天我们来聊下kafka在分...

hyper-xu
01/02
0
0
ggplot2作柱形图中排列顺序和堆积顺序的设置

在使用R语言的ggplot2包作柱形图时,有时需要设置排列和堆积的顺序。下面以自带的数据集diamonds作为演示。 library(ggplot2) set.seed(1234) # 从数据集 diamonds中抽取1000个样本 diam <- ...

小石头记
2018/05/04
0
0
我为什么要冒险从工业界离职做内容平台有三AI

新搬家,顺便回答下很多朋友都问过的问题,我为什么要冒着伤害职业生涯的危险,从工业界离职做有三AI,顺便透露一下,明年我就会重新回到工业界。 从读书时代说起 首先要说起一点学生时代的背...

言有三
2019/10/27
0
0
你说,一个Java字符串到底有多少个字符?

点击上方“朱小厮的博客”,选择“设为星标” 后台回复"书",获取推荐书籍 来源:urlify.cn/qYNR3q 依照Java的文档, Java中的字符内部是以UTF-16编码方式表示的,最小值是 \u0000 (0),最大值...

osc_s6yenydw
3分钟前
0
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部