文档章节

Spark SQL, DataFrames and Datasets(Spark-2.1.1)指南

K_Zhiqiang
 K_Zhiqiang
发布于 2017/05/01 16:35
字数 1877
阅读 97
收藏 0

Overview(概述)

    Spark SQL是一个用于结构化数据处理的Spark模块。与基础的Spark RDD API不同,Spark SQL中的接口提供了更多的关于数据和执行计算的结构化信息。Spark SQL在内部利用这些额外的信息去执行特别的优化。可以通过SQL或者Dataset API等几种途径和Spark SQL进行交互。当计算同一个结果,会使用相同的执行引擎,与您使用哪种API或者开发语言来表达计算无关。这种统一意味着开发者可以基于提供最自然的方式来表达一个转换,很容易地在不同的API之间来回切换。

    这个页面中的所有示例使用的样本数据,包含在Spark发行版本中,并且可以在spark-shell、pyspark shell或sparkR shell中运行。

SQL

    Spark SQL是用来执行SQL查询的。Spark SQL也可以从一个已安装好的Hive中读取数据。更多关于如何配置和使用这个特征,请参考Hive Tables 部分。当从另一种编程语言中运行SQL,结果将会作为一个Dataset/DataFrame返回。你也可以通过使用命令行或者在JDBC/ODBC之上与SQL接口交互。

Datasets and DataFrames

    Dataset是一个分布式数据集合。Dataset是一个Spark 1.6增加的新的接口,提供了RDD的优势(强类型,可以使用强大的lamda函数)和Spark SQL的执行引擎最佳化的优势。Dataset可以从JVM对象构造而来,然后使用功能性的转化算子(map、flatMap、filter等等)进行操作。Dataset API可以在ScalaJava中使用。Python不支持Dataset API。但由于Python的动态特性,许多Dataset API的好处已经可以使用(例如,您可以自然地访问一行的字段通过row.columnName)。R也是相似的。

    DataFrame是一个被组织在命名的列中的Dataset。它和关系型数据库的表或者R/Python中的data frame在概念上是等价的,但是在后台拥有更丰富的优化。DataFrame可以从一系列的广泛的来源进行构造,比如:结构化的数据文件,Hive中的表,外部数据库,或者是现有的RDD。DataFrame API可以在Scala、Java、Python或者R中使用。在Scala和Java中,一个Dataset of Rows代表了一个DataFrame。在Scala API中,DataFrame就像一个Dataset[Row]的类型别名。然而在Java API中,用户需要使用Dataset[Row]来代表DataFrame。在这篇文档中,我们将经常引用Scala/Java Datasets of Rows 作为DataFrames。

Getting Started

Starting Point: SparkSession(起始点:SparkSession)

    Spark中所有方法的入口点是SparkSession类。使用SparkSession.builder()可以创建一个基本的SparkSession。

import org.apache.spark.sql.SparkSession;

SparkSession spark = SparkSession
  .builder()
  .appName("Java Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate();

    在Spark repo中的"examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java"可以找到完整的示例代码。Spark 2.0中的SparkSession提供了内建的Hive特性支持,包括使用HiveQL编写查询,访问Hive UDFs,和从Hive表中读取数据。使用这些特性,您不需要设置现有的Hive。

Creating DataFrames(创建DataFrame)

    通过SparkSession,应用程序可以从一个现有的RDD、Hive表、或者Spark data sources创建DataFrame。下面通过一个JSON文件创建一个DataFrame作为示例:

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

Dataset<Row> df = spark.read().json("examples/src/main/resources/people.json");

// Displays the content of the DataFrame to stdout
df.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

    在Spark repo中的"examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java"可以找到完整的示例代码。

Untyped Dataset Operations (aka DataFrame Operations)(无类型的Dataset操作)

    DataFrame 为操纵结构化数据提供了一个特定于域的语言在Scala、Java、Python和R中。

    正如上面提到的,在Spark 2.0中,对于Scala和Java API,DataFrame只是Dataset of Rows。相比于伴随强类型的Scala/Java Datasets的“类型转换”,这些操作也称为“无类型转换”。

    这里提供一些使用Dataset处理结构化数据的基本示例:

// col("...") is preferable to df.col("...")
import static org.apache.spark.sql.functions.col;

// Print the schema in a tree format
df.printSchema();
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show();
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select(col("name"), col("age").plus(1)).show();
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter(col("age").gt(21)).show();
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show();
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

    在Spark repo中的"examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java"可以找到完整的示例代码。

    查看可以执行在Dataset之上的完整操作列表,请参考API Documentation

    除了简单的列引用和表达式,Dataset也有一个丰富的函数库,包括字符串操作、日期计算,常见的数学操作等等。查看完整的列表,请参考DataFrame Function Reference

Running SQL Queries Programmatically(以编程方式执行SQL查询)

    SparkSession的sql函数使应用程序可以通过编程的方式运行SQL查询,并使用一个DataFrame返回结果。

import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people");

Dataset<Row> sqlDF = spark.sql("SELECT * FROM people");
sqlDF.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

    在Spark repo中的"examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java"可以找到完整的示例代码。

Global Temporary View(全局临时视图)

    Spark SQL的Temporary views(临时视图)是session范围有效的,且将会随着创建它的session的终止而消失。如果你想拥有一个所有session共享的,并且在Spark应用终止前始终存活的Temporary views,你可以创建一个Global temporary view(全局临时视图)。Global temporary view被绑定到一个系统维护的数据库global_temp,并且我们必须使用限定名称引用它,例如,SELECT * FROM global_temp.view1。

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people");

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

    在Spark repo中的"examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java"可以找到完整的示例代码。

Creating Datasets(创建Dataset)

    Dataset 和 RDD类似,然而它使用了一种特殊的编码器来序列化用于计算或者网络传输的对象,来替代使用Java序列化或者Kryo。编码器和标准序列化负责将一个对象序列化为字节,编码器动态生成代码,且使用了一种允许Spark执行许多操作像filtering(过滤)、sorting(排序)和hashing(散列)而不需要将字节反序列化成对象的格式。

import java.util.Arrays;
import java.util.Collections;
import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.Encoder;
import org.apache.spark.sql.Encoders;

public static class Person implements Serializable {
  private String name;
  private int age;

  public String getName() {
    return name;
  }

  public void setName(String name) {
    this.name = name;
  }

  public int getAge() {
    return age;
  }

  public void setAge(int age) {
    this.age = age;
  }
}

// Create an instance of a Bean class
Person person = new Person();
person.setName("Andy");
person.setAge(32);

// Encoders are created for Java beans
Encoder<Person> personEncoder = Encoders.bean(Person.class);
Dataset<Person> javaBeanDS = spark.createDataset(
  Collections.singletonList(person),
  personEncoder
);
javaBeanDS.show();
// +---+----+
// |age|name|
// +---+----+
// | 32|Andy|
// +---+----+

// Encoders for most common types are provided in class Encoders
Encoder<Integer> integerEncoder = Encoders.INT();
Dataset<Integer> primitiveDS = spark.createDataset(Arrays.asList(1, 2, 3), integerEncoder);
Dataset<Integer> transformedDS = primitiveDS.map(new MapFunction<Integer, Integer>() {
  @Override
  public Integer call(Integer value) throws Exception {
    return value + 1;
  }
}, integerEncoder);
transformedDS.collect(); // Returns [2, 3, 4]

// DataFrames can be converted to a Dataset by providing a class. Mapping based on name
String path = "examples/src/main/resources/people.json";
Dataset<Person> peopleDS = spark.read().json(path).as(personEncoder);
peopleDS.show();
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

    在Spark repo中的"examples/src/main/java/org/apache/spark/examples/sql/JavaSparkSQLExample.java"可以找到完整的示例代码。

 

未完待续。。。

 

本文译自Spark官网,原文地址:http://spark.apache.org/docs/latest/sql-programming-guide.html

© 著作权归作者所有

K_Zhiqiang
粉丝 0
博文 24
码字总数 13488
作品 0
海淀
程序员
私信 提问
Spark 2.0 预览:更简单,更快,更智能

Apache Spark 2.0 技术预览在 Databricks Community Edition 发布。该预览包使用upstream branch-2.0构建,当启动Cluster时,使用预览包和选择“2.0 (Tech Preview)” 一样简单。 离最终的A...

oschina
2016/05/12
7K
6
spark2.2官方教程笔记-Spark SQL, DataFrames and Datasets向导

概括 spark SQL是一个spark结构数据处理模型。不像基本的rdd api,Spark 提供的接口可以给spark提供更多更多关于数据的结构和正在执行的计算的信息。另外,spark sql在性能优化上比以往的有做...

skanda
2017/08/07
208
0
你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836
2018/04/12
0
0
在 Databricks 可获得 Spark 1.5 预览版

我们兴奋地宣布,从今天开始,Apache Spark1.5.0的预览数据砖是可用的。我们的用户现在可以选择提供集群与Spark 1.5或先前的火花版本准备好几个点击。 正式,Spark 1.5预计将在数周内公布,和社区...

stark_summer
2015/08/25
60
0
【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)

本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角 Spark SQL、DataFrames 和 Datase...

跑呀跑
2018/09/19
0
0

没有更多内容

加载失败,请刷新页面

加载更多

计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
6
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
7
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
6
0
【技术分享】TestFlight测试的流程文档

上架基本需求资料 1、苹果开发者账号(如还没账号先申请-苹果开发者账号申请教程) 2、开发好的APP 通过本篇教程,可以学习到ios证书申请和打包ipa上传到appstoreconnect.apple.com进行TestF...

qtb999
昨天
10
0
再见 Spring Boot 1.X,Spring Boot 2.X 走向舞台中心

2019年8月6日,Spring 官方在其博客宣布,Spring Boot 1.x 停止维护,Spring Boot 1.x 生命周期正式结束。 其实早在2018年7月30号,Spring 官方就已经在博客进行过预告,Spring Boot 1.X 将维...

Java技术剑
昨天
18
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部