文档章节

spark sql简单示例

cloud-coder
 cloud-coder
发布于 2015/06/16 17:36
字数 437
阅读 19714
收藏 8

运行环境

集群环境:CDH5.3.0

具体JAR版本如下:

spark版本:1.2.0-cdh5.3.0

hive版本:0.13.1-cdh5.3.0

hadoop版本:2.5.0-cdh5.3.0

spark sql的JAVA版简单示例

  1. spark sql直接查询JSON格式的数据

  2. spark sql的自定义函数

  3. spark sql查询hive上面的表

import java.util.ArrayList;
import java.util.List;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.api.java.DataType;
import org.apache.spark.sql.api.java.JavaSQLContext;
import org.apache.spark.sql.api.java.JavaSchemaRDD;
import org.apache.spark.sql.api.java.Row;
import org.apache.spark.sql.api.java.UDF1;
import org.apache.spark.sql.hive.api.java.JavaHiveContext;


/**
 * 注意:
 * 使用JavaHiveContext时
 * 1:需要在classpath下面增加三个配置文件:hive-site.xml,core-site.xml,hdfs-site.xml
 * 2:需要增加postgresql或mysql驱动包的依赖
 * 3:需要增加hive-jdbc,hive-exec的依赖
 *
 */
public class SimpleDemo {
    public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("simpledemo").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);
        JavaSQLContext sqlCtx = new JavaSQLContext(sc);
        JavaHiveContext hiveCtx = new JavaHiveContext(sc);
//        testQueryJson(sqlCtx);
//        testUDF(sc, sqlCtx);
        testHive(hiveCtx);
        sc.stop();
        sc.close();
    }

    //测试spark sql直接查询JSON格式的数据
    public static void testQueryJson(JavaSQLContext sqlCtx) {
        JavaSchemaRDD rdd = sqlCtx.jsonFile("file:///D:/tmp/tmp/json.txt");
        rdd.printSchema();

        // Register the input schema RDD
        rdd.registerTempTable("account");

        JavaSchemaRDD accs = sqlCtx.sql("SELECT address, email,id,name FROM account ORDER BY id LIMIT 10");
        List<Row> result = accs.collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getInt(2) + ","
                    + row.getString(3));
        }

        JavaRDD<String> names = accs.map(new Function<Row, String>() {
            @Override
            public String call(Row row) throws Exception {
                return row.getString(3);
            }
        });
        System.out.println(names.collect());
    }


    //测试spark sql的自定义函数
    public static void testUDF(JavaSparkContext sc, JavaSQLContext sqlCtx) {
        // Create a account and turn it into a Schema RDD
        ArrayList<AccountBean> accList = new ArrayList<AccountBean>();
        accList.add(new AccountBean(1, "lily", "lily@163.com", "gz tianhe"));
        JavaRDD<AccountBean> accRDD = sc.parallelize(accList);

        JavaSchemaRDD rdd = sqlCtx.applySchema(accRDD, AccountBean.class);

        rdd.registerTempTable("acc");

        // 编写自定义函数UDF
        sqlCtx.registerFunction("strlength", new UDF1<String, Integer>() {
            @Override
            public Integer call(String str) throws Exception {
                return str.length();
            }
        }, DataType.IntegerType);

        // 数据查询
        List<Row> result = sqlCtx.sql("SELECT strlength('name'),name,address FROM acc LIMIT 10").collect();
        for (Row row : result) {
            System.out.println(row.getInt(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }

    //测试spark sql查询hive上面的表
    public static void testHive(JavaHiveContext hiveCtx) {
        List<Row> result = hiveCtx.sql("SELECT foo,bar,name from pokes2 limit 10").collect();
        for (Row row : result) {
            System.out.println(row.getString(0) + "," + row.getString(1) + "," + row.getString(2));
        }
    }
}


© 著作权归作者所有

共有 人打赏支持
cloud-coder
粉丝 246
博文 189
码字总数 135000
作品 0
广州
架构师
加载中

评论(2)

i
iawom
D-_-D
D-_-D
JavaSQLContext 是哪个maven 依赖下的哦,都没找到呢
hive,shark,sparkSQL,hive on spark,impala,drill比较

Hive on Mapreduce Hive的原理大家可以参考这篇大数据时代的技术hive:hive介绍,实际的一些操作可以看这篇笔记:新手的Hive指南,至于还有兴趣看Hive优化方法可以看看我总结的这篇Hive性能优...

hblt-j
08/13
0
0
如何使用Spark SQL 的JDBC server

简介 Spark SQL provides JDBC connectivity, which is useful for connecting business intelligence (BI) tools to a Spark cluster and for sharing a cluster across multipleusers. The......

cloud-coder
2015/06/17
0
0
OSS重磅推出OSS Select——使用SQL选取文件的内容【免费公测中】

摘要: OSS重磅推出OSS Select功能,可以直接使用简单的SQL语句,从OSS的文件中选取所需要的内容 对象存储OSS(Object Storage Service)具有海量、可靠、安全、高性能、低成本的特点。OSS提...

阿里云云栖社区
05/24
0
0
3.sparkSQL整合Hive

  spark SQL经常需要访问Hive metastore,Spark SQL可以通过Hive metastore获取Hive表的元数据。从Spark 1.4.0开始,Spark SQL只需简单的配置,就支持各版本Hive metastore的访问。注意,涉...

intsmaze(刘洋)
08/09
0
0
【Spark亚太研究院系列丛书】Spark实战高手之路-第3章Spark架构设计与编程模型第2节①

一、到底什么是Spark? Spark是一个通用的大数据计算平台,基于“One Stack to rule them all”的理念成功成为了一体化多元化的大数据处理平台,轻松应对大数据处理中的实时流计算、SQL交互式...

Spark亚太研究院
2014/12/29
0
0

没有更多内容

加载失败,请刷新页面

加载更多

零拷贝I:用户模式视角

英文原文地址:http://www.linuxjournal.com/article/6345。内容是关于 Zero Copy(零拷贝) 的详细介绍。在RocketMQ的Consumer 消费消息过程,使用了零拷贝技术。作用是即使被频繁调用,文件传...

Jacktanger
15分钟前
8
0
记在k8s的pod上使用apache的commons-net:ftp功能时,ftp一直上传文件失败

在k8s的pod上使用apache的commons-net:ftp功能时,一直显示: 即上传文件失败,但是在本地环境进行上传时却又显示上传成功,让人十分不解。在网上搜索了一下ftp的一些资料发现ftp共有两种模式...

helplove
16分钟前
1
0
Go map实现原理

map数据结构 Golang的map使用哈希表作为底层实现,一个哈希表里可以有多个哈希表节点,也即bucket,而每个bucket就保存了map中的一个或一组键值对。 map数据结构由runtime/map.go/hmap定义:...

恋恋美食
22分钟前
1
0
debian python library re-install

apt-get install python-aptsudo apt-get install python-pkg-resources python-setuptools --reinstall...

关上越
27分钟前
1
0
Elasticsearch地理位置总结

更多内容请参考 : https://www.felayman.com 翻译版本:https://es.xiaoleilu.com/310_Geopoints/00_Intro.html 官方原文:https://www.elastic.co/guide/en/elasticsearch/guide/current/g......

xiaomin0322
28分钟前
3
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部