文档章节

Spark SQL编程指南

十一月不远
 十一月不远
发布于 2014/08/27 21:02
字数 1469
阅读 1.6K
收藏 35

#Spark SQL 编程指南#

##简介## Spark SQL支持在Spark中执行SQL,或者HiveQL的关系查询表达式。它的核心组件是一个新增的RDD类型JavaSchemaRDD。JavaSchemaRDD由Row对象和表述这个行的每一列的数据类型的schema组成。一个JavaSchemaRDD类似于传统关系数据库的一个表。JavaSchemaRDD可以通过一个已存在的RDD,Parquet文件,JSON数据集,或者通过运行HiveSQL获得存储在Apache Hive上的数据创建。

Spark SQL目前是一个alpha组件。尽管我们会尽量减少API变化,但是一些API任然后再以后的发布中改变。

##入门## 在Spark中,所有关系函数功能的入口点是JavaSQLContext类。或者他的子类。要创建一个基本的JavaSQLContext,所有你需要的只是一个JavaSparkContext。

JavaSparkContext sc = ...; // An existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);

##数据源## Spark SQL支持通过JavaSchemaRDD接口操作各种各样的数据源。一单一个数据集被加载,它可以被注册成一个表,甚至和来自其他源的数据连接。

###RDDs### Spark SQL支持的表的其中一个类型是由JavaBeans的RDD。BeanInfo定义了这个表的schema。现在 ,Spark SQL 不支持包括嵌套或者复杂类型例如Lists或者Arrays的JavaBeans。你可以通过创建一个实现了Serializable并且它的所有字段都有getters和setters方法的类类创建一个JavaBeans。

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

一个schema可以被应用在一个已存在的RDD上,通过调用applySchema并且提供这个JavaBean的类对象。

// sc is an existing JavaSparkContext.
JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc)

// Load a text file and convert each line to a JavaBean.
JavaRDD<Person> people = sc.textFile("examples/src/main/resources/people.txt").map(
  new Function<String, Person>() {
    public Person call(String line) throws Exception {
      String[] parts = line.split(",");

      Person person = new Person();
      person.setName(parts[0]);
      person.setAge(Integer.parseInt(parts[1].trim()));

      return person;
    }
  });

// Apply a schema to an RDD of JavaBeans and register it as a table.
JavaSchemaRDD schemaPeople = sqlContext.applySchema(people, Person.class);
schemaPeople.registerAsTable("people");

// SQL can be run over RDDs that have been registered as tables.
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")

// The results of SQL queries are SchemaRDDs and support all the normal RDD operations.
// The columns of a row in the result can be accessed by ordinal.
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

注意,Spark SQL目前使用一个非常简单的SQL解析器。用户如果想获得一个更加完整的SQL方言,应该看看HiveContext提供的HiveQL支持。

###Parquet Files### Parquet是一个columnar格式,并且被许多其他数据处理系统支持。Spark SQL对读写Parquet文件提供支持,并且自动保存原始数据的Schema。通过下面的例子使用数据:

// sqlContext from the previous example is used in this example.

JavaSchemaRDD schemaPeople = ... // The JavaSchemaRDD from the previous example.

// JavaSchemaRDDs can be saved as Parquet files, maintaining the schema information.
schemaPeople.saveAsParquetFile("people.parquet");

// Read in the Parquet file created above.  Parquet files are self-describing so the schema is preserved.
// The result of loading a parquet file is also a JavaSchemaRDD.
JavaSchemaRDD parquetFile = sqlContext.parquetFile("people.parquet");

//Parquet files can also be registered as tables and then used in SQL statements.
parquetFile.registerAsTable("parquetFile");
JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19");
List<String> teenagerNames = teenagers.map(new Function<Row, String>() {
  public String call(Row row) {
    return "Name: " + row.getString(0);
  }
}).collect();

###JSON Datasets### Spark SQL可以自动推断一个JSON数据集的schema,并加载成一个JavaSchemaRDD。这个转换可以通过JavaSQLContext中的两个方法中的一个完成:

  • jsonFile -从一个目录下的文件中加载数据,这个文件中的每一行都是一个JSON对象。

  • jsonRdd -从一个已存在的RDD加载数据,这个RDD中的每一个元素是一个包含一个JSON对象的String。

     // sc is an existing JavaSparkContext.
     JavaSQLContext sqlContext = new org.apache.spark.sql.api.java.JavaSQLContext(sc);
    
     // A JSON dataset is pointed to by path.
     // The path can be either a single text file or a directory storing text files.
     String path = "examples/src/main/resources/people.json";
     // Create a JavaSchemaRDD from the file(s) pointed to by path
     JavaSchemaRDD people = sqlContext.jsonFile(path);
    
     // The inferred schema can be visualized using the printSchema() method.
     people.printSchema();
     // root
     //  |-- age: IntegerType
     //  |-- name: StringType
    
     // Register this JavaSchemaRDD as a table.
     people.registerAsTable("people");
    
     // SQL statements can be run by using the sql methods provided by sqlContext.
     JavaSchemaRDD teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19");
    
     // Alternatively, a JavaSchemaRDD can be created for a JSON dataset represented by
     // an RDD[String] storing one JSON object per string.
     List<String> jsonData = Arrays.asList(
       "{\"name\":\"Yin\",\"address\":{\"city\":\"Columbus\",\"state\":\"Ohio\"}}");
     JavaRDD<String> anotherPeopleRDD = sc.parallelize(jsonData);
     JavaSchemaRDD anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD);
    

###Hive Tables### Spark SQL也支持读和写存储在apache Hive中的数据。然而,由于Hive有一个非常大的依赖,他没有在Spark默认宝中包括。为了使用Hive,你必须运行‘SPARK_HIVE=true sbt/sbt assembly/assembly'(或者对Maven使用 -Phive)。这个命令构建一个包含Hive的assembly。注意,这个Hive assembly 必须放在所有的工作节点上,因为它们需要访问Hive的序列化和方序列化包(SerDes),以此访问存储在Hive中的数据。

可以通过conf目录下的hive-site.xml文件完成Hive配置 。

要和Hive配合工作,你需要构造一个JavaHiveContext,它继承了JavaSQLContext,并且添加了发现MetaStore中的表和使用HiveQL编写查询的功能。此外,除了sql方法,JavaHiveContext方法还提供了一个hql方法,它允许查询使用HiveQL表达。

##Writing Language-Integrated Relational Queries## Language-Integrated查询目前只在Scala中被支持。

Spark SQL同样支持使用领域特定的语言来编写查询。再次,使用上面例子中的数据:

// sc is an existing SparkContext.
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
// Importing the SQL context gives access to all the public SQL functions and implicit conversions.
import sqlContext._
val people: RDD[Person] = ... // An RDD of case class objects, from the first example.

// The following is the same as 'SELECT name FROM people WHERE age >= 10 AND age <= 19'
val teenagers = people.where('age >= 10).where('age <= 19).select('name)
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

DSL使用Scala中得到标记来表示基础表中的表,他们使用一个前缀’标识。隐式转换这些标记为被SQL 执行引擎评估的表达式。支持这些功能的完成列表可以再ScalaDoc找到。

© 著作权归作者所有

十一月不远

十一月不远

粉丝 39
博文 78
码字总数 61436
作品 1
海淀
程序员
私信 提问
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
5.5K
0
【Spark】Spark Quick Start(快速入门翻译)

本文主要是翻译Spark官网Quick Start。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷 目录导航在右上角,感谢两个大佬(孤傲苍狼 JavaScript自动生成博文目录...

跑呀跑
2018/09/16
0
0
【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)

本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角 Spark SQL、DataFrames 和 Datase...

跑呀跑
2018/09/19
0
0
Spark 数据分析导论-笔记

Spark Core Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 Spark Core 中还包含了 对弹性分布式数据集(resilient distributed dataset,简...

Java搬砖工程师
2018/12/26
78
0
Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
2018/05/24
0
0

没有更多内容

加载失败,请刷新页面

加载更多

WCF - 如何增加邮件大小配额

我有一个WCF服务,它从数据库返回1000条记录到客户端。 我有一个ASP.NET WCF客户端(我在asp.net Web应用程序项目中添加了服务引用以使用WCF)。 运行客户端应用程序时收到以下消息: 已超出...

技术盛宴
39分钟前
52
0
toast组件单元测试

先看是否存在 describe('Toast', () => { it('存在.', () => { expect(Toast).to.be.exist }) }); 看属性,我们要测 ToastVue 和 plugin.js describe('Toast', () =>......

ories
47分钟前
57
0
如何将整个MySQL数据库字符集和排序规则转换为UTF-8?

如何将整个MySQL数据库字符集转换为UTF-8并将排序规则转换为UTF-8? #1楼 在命令行外壳上 如果您是命令行外壳程序之一,则可以非常快速地执行此操作。 只需填写“ dbname”:D DB="dbname"(...

javail
今天
80
0
开源矿工系统内部的层

开源矿工系统内部的层 所谓“层”、“界”、“域”、“集合”,这些词其实是在试图表达物质系统的组成结构和运动景象中的规矩,这些不同人发明的词都是来源于对同一个规律的观察、发现、表达...

NTMiner
今天
88
0
数据结构之数组-c代码实现

在上一篇文章里讲了数组的具体内容,然后自己使用c语言对数组进行了实现。 其中定义了一个结构体,定义了长度、已使用长度和地址指针。 定义alloc函数来分配内存空间 之后便是插入元素的ins...

无心的梦呓
今天
65
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部