文档章节

spark sql简单示例

cloud-coder
 cloud-coder
发布于 2015/06/16 17:36
字数 437
阅读 19924
收藏 8

运行环境

集群环境:CDH5.3.0

具体JAR版本如下:

spark版本:1.2.0-cdh5.3.0

hive版本:0.13.1-cdh5.3.0

hadoop版本:2.5.0-cdh5.3.0

spark sql的JAVA版简单示例

  1. spark sql直接查询JSON格式的数据

  2. spark sql的自定义函数

  3. spark sql查询hive上面的表

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.DataType;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;


/**
 * 注意:
 * 使用JavaHiveContext时
 * 1:需要在classpath下面增加三个配置文件:hive-site.xml,core-site.xml,hdfs-site.xml
 * 2:需要增加postgresql或mysql驱动包的依赖
 * 3:需要增加hive-jdbc,hive-exec的依赖
 *
 */
public class SimpleDemo {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);
        JavaHiveContext hiveCtx = new JavaHiveContext(sc);
//        testQueryJson(sqlCtx);
//        testUDF(sc, sqlCtx);
        testHive(hiveCtx);
        sc.stop();
        sc.close();
    }

    //测试spark sql直接查询JSON格式的数据
    public static void testQueryJson(JavaSQLContext sqlCtx) {
        JavaSchemaRDD rdd = sqlCtx.jsonFile("file:///D:/tmp/tmp/json.txt");
        rdd.printSchema();

        // Register the input schema RDD
        rdd.registerTempTable("account");

        JavaSchemaRDD accs = sqlCtx.sql("SELECT address, email,id,name FROM account ORDER BY id LIMIT 10");
        List<Row> result = accs.collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getInt(2) + ","
                    + row.getString(3));
        }

        JavaRDD<String> names = accs.map(new Function<Row, String>() {
            @Override
            public String call(Row row) throws Exception {
                return row.getString(3);
            }
        });
        System.out.println(names.collect());
    }


    //测试spark sql的自定义函数
    public static void testUDF(JavaSparkContext sc, JavaSQLContext sqlCtx) {
        // Create a account and turn it into a Schema RDD
        ArrayList<AccountBean> accList = new ArrayList<AccountBean>();
        accList.add(new AccountBean(1, "lily", "lily@163.com", "gz tianhe"));
        JavaRDD<AccountBean> accRDD = sc.parallelize(accList);

        JavaSchemaRDD rdd = sqlCtx.applySchema(accRDD, AccountBean.class);

        rdd.registerTempTable("acc");

        // 编写自定义函数UDF
        sqlCtx.registerFunction("strlength", new UDF1<String, Integer>() {
            @Override
            public Integer call(String str) throws Exception {
                return str.length();
            }
        }, DataType.IntegerType);

        // 数据查询
        List<Row> result = sqlCtx.sql("SELECT strlength('name'),name,address FROM acc LIMIT 10").collect();
        for (Row row : result) {
            System.out.println(row.getInt(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }

    //测试spark sql查询hive上面的表
    public static void testHive(JavaHiveContext hiveCtx) {
        List<Row> result = hiveCtx.sql("SELECT foo,bar,name from pokes2 limit 10").collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }
}


© 著作权归作者所有

共有 人打赏支持
cloud-coder
粉丝 247
博文 191
码字总数 135000
作品 0
广州
架构师
私信 提问
加载中

评论(2)

i
iawom
D-_-D
D-_-D
JavaSQLContext 是哪个maven 依赖下的哦,都没找到呢
扩展Spark Catalyst,打造自定义的Spark SQL引擎

Apache Spark是大数据处理领域最常用的计算引擎之一,被应用在各种各样的场景中,除了易用的API,稳定高效的处理引擎,可扩展性也是Spark能够得到广泛应用的一个重要原因。Spark中最常见的扩...

李呈祥
2018/11/20
0
0
【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)

本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角 Spark SQL、DataFrames 和 Datase...

跑呀跑
2018/09/19
0
0
Spark 2.0 预览:更简单,更快,更智能

Apache Spark 2.0 技术预览在 Databricks Community Edition 发布。该预览包使用upstream branch-2.0构建,当启动Cluster时,使用预览包和选择“2.0 (Tech Preview)” 一样简单。 离最终的A...

oschina
2016/05/12
6.8K
6
hive,shark,sparkSQL,hive on spark,impala,drill比较

Hive on Mapreduce Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结的这篇Hive性能优...

hblt-j
2018/08/13
0
0
Apache Spark 1.3 发布,开源集群计算环境

Apache Spark 1.3 发布,1.3 版本引入了期待已久的 DataFrame API,这是 Spark 的 RDD 抽象设计来简单快速支持大数据集的变革。同时在流转换 ML 和 SQL 的大量提升。 DateFrame API 示例: ...

oschina
2015/03/14
5.7K
14

没有更多内容

加载失败,请刷新页面

加载更多

centos7安装RabbitMQ详细过程

由于RabbitMQ是基于Erlang语言开发,所以在安装RabbitMQ之前,需要先安装Erlang 1、环境: centos 7.1 内核版本3.10.0-229.el7.x86_64 Erlang 19.0.4版本 RabbitMQ 3.6.14版本 2、在线安装E...

秋至丶枫以落
6分钟前
0
0
6个使用KeePassX保护密码的技巧

虽然安全是个深奥的主题,但是你可以遵循几个简单的日常习惯来减小攻击面。本文将解释确保密码信息安全的重要性,并给出如何充分利用KeePassX的建议。 日益互联的数字世界使安全成为一个重要...

Linux就该这么学
8分钟前
0
0
2018最佳GAN论文回顾(下)

继上一篇《2018最佳GAN论文回顾(上)》,我又继续介绍了一个对于GAN的基于样式的生成器体系结构的新论文,提出了一个新的模型来应对这种挑战。 一种用于生成式对抗网络的基于生成器体系结构...

阿里云官方博客
10分钟前
0
0
UnsatisfiedLinkError sawindbg.dll

方法:搜索sawindbg.dll,然后将文件报错的目录下

洛水
46分钟前
4
0
说说不知道的Golang中参数传递

本文由云+社区发表 导言 几乎每一个C++开发人员,都被面试过有关于函数参数是值传递还是引用传递的问题,其实不止于C++,任何一个语言中,我们都需要关心函数在参数传递时的行为。在golang中...

腾讯云加社区
47分钟前
4
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部