文档章节

Spark SQL和DataFrame指南[中]

openthings
 openthings
发布于 2016/07/11 08:00
字数 8272
阅读 2315
收藏 16

翻译自: http://spark.apache.org/docs/1.3.0/sql-programming-guide.html

概述(Overview)

Spark SQL是Spark的一个模块,用于结构化数据处理。它提供了一个编程的抽象被称为DataFrames,也可以作为分布式SQL查询引擎。

DataFrames

DataFrame是一种以命名列方式组织的分布式数据集。它概念上相当于关系型数据库中的表,或者R/Python中的数据帧,但是具有更丰富的优化。有很多方式可以构造出一个DataFrame,例如:结构化数据文件,Hive中的tables,外部数据库或者存在的RDDs。

DataFrame的API适用于Scala、Java和Python。

该页上所有的例子使用Spark分布式中的样本数据,可以运行在spark-shell或者pyspark shell中。

入口点: SQLContext

Spark SQL中所有功能的入口点是SQLContext类,或者它子类中的一个。为了创建一个基本的SQLContext,你所需要的是一个SparkContext。

 

val sc: SparkContext // An existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._

 

除了基本的SQLContext,你还可以创建一个HiveContext,它提供了基本的SQLContext的所提供的功能的超集。这些功能中包括附加的特性,可以编写查询,使用更完全的HiveQL解析器,访问Hive UDFs,能够从Hive表中读取数据。想要使用HiveContext,你不需要有一个存在的Hive步骤,并且所有SQLContext可用的数据源仍旧可用。HiveContext只是单独打包,以避免包含默认Spark build中的所有Hive依赖。如果这些依赖对于你的应用不是一个问题,那么推荐使用Spark 1.3版本的HiveContext。

使用spark.sql.dialect选项,可以选择SQL的具体变种,用它来解析查询。这个参数可以使用SQLContext上的setConf方法或者在SQL中使用一组key=value命令。对于SQLContext,唯一可以的dialect是“sql”,它可以使用SparkSQL提供的一个简单的SQL解析器。在HiveContext中,默认的是“hiveql”,尽管“sql”也是可用的。因为HiveOL解析器更加完整,在大多数情况下, 推荐使用这个。

创建DataFrames

使用SQLContext,应用可以从一个已经存在的RDD、Hive表或者数据源中创建DataFrames。

例如,以下根据一个JSON文件创建出一个DataFrame:

val sc: SparkContext // An existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create the DataFrame

val df = sqlContext.jsonFile("examples/src/main/resources/people.json")// Show the content of the DataFrame

df.show()

// age  name

// null Michael

// 30   Andy

// 19   Justin

 // Print the schema in a tree format

df.printSchema()

// root// |-- age: long (nullable = true)

// |-- name: string (nullable = true)

 // Select only the "name" column

df.select("name").show()

// name

// Michael

// Andy

// Justin

 // Select everybody, but increment the age by 1

// df.select("name", df("age") + 1).show() //官方文档这样的,但是测试时发现这样编译不通过。下面的形式可以

df.select(df("name"),df("age")+1).show()

// name    (age + 1)

// Michael null

// Andy    31

// Justin  20

 // Select people older than 21

df.filter(df("age") > 21).show()

// age name

// 30  Andy

 // Count people by age

df.groupBy("age").count().show()

// age  count

// null 1

// 19   1

// 30   1

以编程方式运行SQL查询

SQLContext中的sql函数使应用可以以编程方式运行SQL查询,并且将结果以DataFrame形式返回。

val sqlContext = ...  // An existing SQLContext

val df = sqlContext.sql("SELECT * FROM table")

RRDs之间的互操作(Interoperating with RDDs)

Spark SQL支持两种不同的方法,用于将存在的RDDs转换成DataFrames。第一种方法使用反射来推断包含特定类型的对象的RDD的模式。在写Spark应用时,当你已知schema的情况下,这种基于反射的方式使得代码更加简介,并且效果更好。

创建DataFrames的第二种方法是通过编程接口,它允许你构建一个模式,然后将其应用到现有的RDD上。这种方式更加的繁琐,它允许你构建一个DataFrame当列以及类型未知,直到运行时才能知道时。

使用反射推断模式

Spark SQL中的Scala接口支持自动地将包含case类的RDD转换成DataFrame。case类定义了表的模式,case类的参数的名称使用反射来读取,然后称为列的名称。case类还可以嵌套或者包含复杂的类型,例如Sequences或者Arrays。这个RDD可以隐式地转换为DataFrame,然后注册成表,表可以在后续SQL语句中使用

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)

// this is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._// Define the schema using a case class.

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,

// you can use custom classes that implement the Product interface.

case class Person(name: String, age: Int)// Create an RDD of Person objects and register it as a table.

val people = sc.textFile("examples/src/main/resources/people.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()

people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

1、使用case类定义schema

2、创建一个SQLContext

3、导入sqlContext.implicits._,用于隐式地将RDD转换成DataFrame

4、创建一个DataFrame,并将它注册成表。

5、使用sqlContext提供的sql方法,就可以使用SQL语句来查询了。查询后返回的结果是DataFrame,它支持所有的RDD操作

以编程方式指定模式

当case类不能提前定义时(例如,记录的结构被编码在一个String中,或者不同的用户会将文本数据集和字段进行不同的解析和投影),DataFrame可以使用以下三步,以编程的方式实现:

1.Create an RDD of Rows from the original RDD;

2.Create the schema represented by a StructType matching the structure of Rows in the RDD created in Step 1.

3.Apply the schema to the RDD of Rows via createDataFrame method provided by SQLContext.

1.从原有的RDD中创建行的RDD。

2.创建一个由StructType表示的模式,StructType符合由步骤1创建的RDD的行的结构。

3.通过SQLContext提供的createDataFrame方法,将模式应用于行的RDD。

For example:

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// Create an RDD

val people = sc.textFile("examples/src/main/resources/people.txt")// The schema is encoded in a string

val schemaString = "name age"// Import Spark SQL data types and Row.

import org.apache.spark.sql._// Generate the schema based on the string of schema

val schema =  StructType(    schemaString.split(" ").map(fieldName => StructField(fieldName, StringType, true)))// Convert records of the RDD (people) to Rows.

val rowRDD = people.map(_.split(",")).map(p => Row(p(0), p(1).trim))// Apply the schema to the RDD.

val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)// Register the DataFrames as a table.

peopleDataFrame.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val results = sqlContext.sql("SELECT name FROM people")// The results of SQL queries are DataFrames and support all the normal RDD operations.

// The columns of a row in the result can be accessed by ordinal.

results.map(t => "Name: " + t(0)).collect().foreach(println)

 

-----------------------------------------------------------

    //my code

    import org.apache.spark._

    import org.apache.spark.sql._

    import org.apache.spark.sql.types.{StructType, StructField, StringType}

    val conf = new SparkConf().setMaster("local").setAppName("XX")

    val sc = new SparkContext(conf)

    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    val schemaString = "fullName age"

    val schema = StructType(schemaString.split(" ").map(fieldName => StructField(fieldName,StringType,true)))

    val rowRDD = sc.textFile("data/people.txt").map(_.split(" ")).map(p=> Row(p(0),p(1).trim))

    val peopleDataFrame = sqlContext.createDataFrame(rowRDD, schema)

    peopleDataFrame.registerTempTable("people")

    val young = sqlContext.sql("select * from people where age <25")

    young.show()

数据源(Data Sources)

Spark SQL支持通过DataFrame接口在多种数据源上进行操作。一个DataFrame可以如同一个标准的RDDs那样进行操作,还可以注册成临时的表。将一个DataFrame注册成临时表允许你在它的数据上运行SQL查询。本节介绍使用Spark数据源装载和保存数据的常用方法,使用Spark数据源保存数据。然后进入可用于内置数据源的特定选项。

通用的加载/保存功能

在最简单的形式中,默认的数据源(parquet除非通过spark.sql.sources.default另外进行配置)将被用于所有的操作。

val df = sqlContext.load("people.parquet")
df.select("name", "age").save("namesAndAges.parquet")

手动指定选项

你还可以手动指定数据源,这些数据源将与任何额外的选项一同使用,你希望将这些选项传入到数据源中。数据源是通过它们的全名来指定的(如org.apache.spark.sql.parquet),但是对于内置的数据源,你也可以使用简短的名称(json, parquet, jdbc)。任何类型的DataFrames使用这些语法可以转化成其他的数据源:

val df = sqlContext.load("people.json", "json")

df.select("name", "age").save("namesAndAges.parquet", "parquet")

保存模式

Save操作可以可选择性地接收一个SaveModel,如果数据已经存在了,指定如何处理已经存在的数据。意识到这些保存模式没有利用任何锁,也不是原子的,这很重要。因此,如果有多个写入者试图往同一个地方写入,这是不安全的。此外,当执行一个Overwrite,在写入新的数据之前会将原来的数据进行删除。

 

Scala/Java

Python

Meaning

SaveMode.ErrorIfExists (default)

"error" (default)

When saving a DataFrame to a data source, if data already exists, an exception is expected to be thrown. 

当往一个数据源中保存一个DataFrame,如果数据已经存在,会抛出一个异常。

SaveMode.Append

"append"

When saving a DataFrame to a data source, if data/table already exists, contents of the DataFrame are expected to be appended to existing data. 

当往一个数据源中保存一个DataFrame,如果data/table已经存在,DataFrame的内容会追加到已经存在的数据后面。

SaveMode.Overwrite

"overwrite"

Overwrite mode means that when saving a DataFrame to a data source, if data/table already exists, existing data is expected to be overwritten by the contents of the DataFrame. 

Overwrite模式意味着当向数据源中保存一个DataFrame时,如果data/table已经存在了,已经存在的数据会被DataFrame中内容覆盖掉。

SaveMode.Ignore

"ignore"

Ignore mode means that when saving a DataFrame to a data source, if data already exists, the save operation is expected to not save the contents of the DataFrame and to not change the existing data. This is similar to a `CREATE TABLE IF NOT EXISTS` in SQL. 

Ignore模式意味着当向数据源中保存一个DataFrame时,如果数据已经存在,save操作不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATE TABLE IF NOT EXISTS`相似。

 

保存为持久化表

当与HiveContext一起工作时,DataFrames也可以使用saveAsTable命令保存为持久化的表。不像registerTempTable命令,saveAsTable会将DataFrame的内容进行物化,并且在HiveMetastore中创建一个指向数据的指针。持久化表会仍旧存在即使你的Spark程序重新启动。只要你保持连接到相同的元存储( metastore)。一个持久化表的DataFrame可以通过调用SQLContext上的带有表的名称的table方法来创建。

默认情况下,saveAsTable会创建一个“管理表(managed table)”,意味着元存储控制数据的位置。当一个表被删除后,managed table会自动地删除它们的数据。

Parquet Files

Parquet 是一种柱状的格式,被许多其他数据处理系统所支持。Spark SQL支持度对Parquet文件的读和写,自动保存原有数据的模式。

以编程方式加载数据

Loading Data Programmatically

Using the data from the above example:

使用上面例子中的数据:

// sqlContext from the previous example is used in this example.

// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._val people: RDD[Person] = ...

 // An RDD of case class objects, from the previous example.// The RDD is implicitly converted to a DataFrame by implicits, allowing it to be stored using Parquet.

people.saveAsParquetFile("people.parquet")// Read in the parquet file created above.  Parquet files are self-describing so the schema is preserved.

// The result of loading a Parquet file is also a DataFrame.

val parquetFile = sqlContext.parquetFile("people.parquet")//Parquet files can also be registered as tables and then used in SQL statements.

parquetFile.registerTempTable("parquetFile")

val teenagers = sqlContext.sql("SELECT name FROM parquetFile WHERE age >= 13 AND age <= 19")

teenagers.map(t => "Name: " + t(0)).collect().foreach(println)

分区发现

在系统中,如Hive,使用表分区是一个常见的优化途径。在一个分区表中,数据经常存储在不同的目录中,对每一个分区目录中的路径中,对分区列的值进行编码。Parquet数据源现在可以自动地发现并且推断出分区的信息。例如,我们可以将之前使用的人口数据存储成下列目录结构的分区表,两个额外的列,gender和country作为分区列:

path└── to    └── table        ├── gender=male        │   ├── ...        │   │        │   ├── country=US        │   │   └── data.parquet        │   ├── country=CN        │   │   └── data.parquet        │   └── ...        └── gender=female            ├── ...            │            ├── country=US            │   └── data.parquet            ├── country=CN            │   └── data.parquet            └── ...

通过向SQLContext.parquetFile或者 SQLContext.load中传入path/to/table,Spark SQL会自动地从路径中提取分区信息。现在返回的DataFrame模式变成:

root|-- name: string (nullable = true)|-- age: long (nullable = true)|-- gender: string (nullable = true)|-- country: string (nullable = true)

注意到分区列的数据类型自动被推断出来。目前支持数字的数据类型和string类型。

模式合并

像ProtocolBuffer, Avro和Thrift那样,Parquet还支持模式演化。用户可以从一个简单的模式开始,并且根据需要逐渐地向模式中添加更多的列。这样,用户最终可能会有多个不同但是具有相互兼容的模式的Parquet文件。Parquet数据源现在可以自动地发现这种情况,并且将所有这些文件的模式进行合并。

// sqlContext from the previous example is used in this example

.// This is used to implicitly convert an RDD to a DataFrame.

import sqlContext.implicits._// Create a simple DataFrame, stored into a partition directory

val df1 = sparkContext.makeRDD(1 to 5).map(i => (i, i * 2)).toDF("single", "double")

df1.saveAsParquetFile("data/test_table/key=1")// Create another DataFrame in a new partition directory,

// adding a new column and dropping an existing column

val df2 = sparkContext.makeRDD(6 to 10).map(i => (i, i * 3)).toDF("single", "triple")

df2.saveAsParquetFile("data/test_table/key=2")// Read the partitioned table

val df3 = sqlContext.parquetFile("data/test_table")

df3.printSchema()// The final schema consists of all 3 columns in the Parquet files together

// with the partiioning column appeared in the partition directory paths.

// root

// |-- single: int (nullable = true)

// |-- double: int (nullable = true)

// |-- triple: int (nullable = true)

// |-- key : int (nullable = true)

配置

Parquet的配置可以使用SQLContext的setConf来设置或者通过使用SQL运行SET key=value命令

Property Name

Default

Meaning

spark.sql.parquet.binaryAsString

false

Some other Parquet-producing systems, in particular Impala and older versions of Spark SQL, do not differentiate between binary data and strings when writing out the Parquet schema. This flag tells Spark SQL to interpret binary data as a string to provide compatibility with these systems.

 其他的一些产生Parquet的系统,特别是Impala和SparkSQL的老版本,当将Parquet模式写出时不会区分二进制数据和字符串。这个标志告诉Spark SQL将二进制数据解析成字符串,以提供对这些系统的兼容。

spark.sql.parquet.int96AsTimestamp

true

Some Parquet-producing systems, in particular Impala, store Timestamp into INT96. Spark would also store Timestamp as INT96 because we need to avoid precision lost of the nanoseconds field. This flag tells Spark SQL to interpret INT96 data as a timestamp to provide compatibility with these systems. 

其他的一些产生Parquet的系统,特别是Impala,将时间戳存储为INT96的形式。Spark也将时间戳存储为INT96,因为我们要避免纳秒级字段的精度的损失。这个标志告诉Spark SQL将INT96数据解析为一个时间戳,以提供对这些系统的兼容。

spark.sql.parquet.cacheMetadata

true

Turns on caching of Parquet schema metadata. Can speed up querying of static data. 

打开Parquet模式的元数据的缓存。能够加快对静态数据的查询。

spark.sql.parquet.compression.codec

gzip

Sets the compression codec use when writing Parquet files. Acceptable values include: uncompressed, snappy, gzip, lzo. 

设置压缩编码解码器,当写入一个Parquet文件时。可接收的值包括:uncompressed, snappy, gzip, lzo

spark.sql.parquet.filterPushdown

false

Turn on Parquet filter pushdown optimization. This feature is turned off by default because of a known bug in Paruet 1.6.0rc3 (PARQUET-136). However, if your table doesn't contain any nullable string or binary columns, it's still safe to turn this feature on. 

打开Parquet过滤器的后进先出存储的优化。这个功能默认是被关闭的,因为一个Parquet中的一个已知的bug 1.6.0rc3 (PARQUET-136)。然而,如果你的表中不包含任何的可为空的(nullable)字符串或者二进制列,那么打开这个功能是安全的。

spark.sql.hive.convertMetastoreParquet

true

When set to false, Spark SQL will use the Hive SerDe for parquet tables instead of the built in support. 

当设置成false,Spark SQL会为parquet表使用Hive SerDe(Serialize/Deserilize)而不是内置的支持。

 

JSON数据集

Spark SQL可以自动推断出JSON数据集的模式,将它作为DataFrame进行加载。这个转换可以通过使用SQLContext中的下面两个方法中的任意一个来完成。

• jsonFile - 从一个JSON文件的目录中加载数据,文件中的每一个行都是一个JSON对象。

• jsonRDD - 从一个已经存在的RDD中加载数据,每一个RDD的元素是一个包含一个JSON对象的字符串。

注意,作为jsonFile提供deep文件不是一个典型的JSON文件。每一行必须包含一个分开的独立的有效JSON对象。因此,常规的多行JSON文件通常会失败。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.SQLContext(sc)// A JSON dataset is pointed to by path.

// The path can be either a single text file or a directory storing text files.

val path = "examples/src/main/resources/people.json"

// Create a DataFrame from the file(s) pointed to by path

val people = sqlContext.jsonFile(path)// The inferred schema can be visualized using the printSchema() method.

people.printSchema()

// root

//  |-- age: integer (nullable = true)

//  |-- name: string (nullable = true)// Register this DataFrame as a table.

people.registerTempTable("people")// SQL statements can be run by using the sql methods provided by sqlContext.

val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")// Alternatively, a DataFrame can be created for a JSON dataset represented by

// an RDD[String] storing one JSON object per string.

val anotherPeopleRDD = sc.parallelize(  """{"name":"Yin","address":{"city":"Columbus","state":"Ohio"}}""" :: Nil)

val anotherPeople = sqlContext.jsonRDD(anotherPeopleRDD)

Hive表

Spark SQL还支持对存储在Apache Hive中的数据的读和写。但是,因为Hive有大量的依赖,它不包含在默认的Spark assembly中。对Hive的支持是通过在Spark的build中添加 -Phive 和 -Phive-thriftserver 标志来完成。这个命令构建了一个新的assembly jar ,它包含Hive。注意,这个HIve assembly jar还必须出现在所有的worker节点,因为为了访问存储在Hive中的数据,它们会访问Hive的序列化和反序列化库(SerDes) 。

Hive的配置是通过将你的hive-site.xml文件放到conf/下来完成。

当使用Hive时,必须构建一个HiveContext,它继承自SQLContext,对寻找MetaStore中的表和使用HiveQL编写查询提供了支持。没有部署Hive的用户也可以创建一个HiveContext。当没有通过hive-site.xml进行配置,context会在当前目录下自动创建一个metastore_db和warehouse。

// sc is an existing SparkContext.

val sqlContext = new org.apache.spark.sql.hive.HiveContext(sc)sqlContext.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)")

sqlContext.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")// Queries are expressed in HiveQL

sqlContext.sql("FROM src SELECT key, value").collect().foreach(println)

JDBC To Other Databases

Spark SQL还包含了一个数据源,它可以使用JDBC从其他数据库中读取数据。这个功能应该优先使用JdbcRDD。这是因为结果是作为DataFrame返回的,并且在Spark SQL中可以简单的被使用或加入其他数据源。JDBC数据源也能够轻松地被Java或者Python来使用,因为它不需要用户提供一个ClassTag(注意,这不同于Spark SQL JDBC服务,它允许其他应用使用Spark SQL运行查询)。

要开始使用时,你需要为你的特定数据块在Spark的类路径中添加JDBC驱动。例如,从Spark Shell中连接到postgres,你需要运行以下的命令:

SPARK_CLASSPATH=postgresql-9.3-1102-jdbc41.jar bin/spark-shell

通过使用数据源的API,可以将远程数据库中的表加载为DataFrame或者Spark SQL 临时表。以下的选项是被支持的:

Property Name

Meaning

url

The JDBC URL to connect to. 

要连接到的JDBC URL

dbtable

The JDBC table that should be read. Note that anything that is valid in a `FROM` clause of a SQL query can be used. For example, instead of a full table you could also use a subquery in parentheses. 

需要读取的JDBC表。注意,SQL查询中的“From”子句中的任何部分都是可以使用的。例如,你可以在括号中使用子查询,而不是一个完整的表。

driver

The class name of the JDBC driver needed to connect to this URL. This class with be loaded on the master and workers before running an JDBC commands to allow the driver to register itself with the JDBC subsystem. 

需要连接到的URL的JDBC驱动的类名。这个类会在运行一个JDBC命令之前被加载到master和workers上,允许驱动注册自己和JDBC子系统。

partitionColumn, lowerBound, upperBound, numPartitions

These options must all be specified if any of them is specified. They describe how to partition the table when reading in parallel from multiple workers. partitionColumn must be a numeric column from the table in question. 

如果这些选项中的一个被指定了,那么所有的选项必须被指定。它们描述了当从多个workers上并行读取时如何进行分区。partitionColumn必须是表中的一个数字列。

 

val jdbcDF = sqlContext.load("jdbc", Map(  "url" -> "jdbc:postgresql:dbserver",  "dbtable" -> "schema.tablename"))

故障排除

* 在客户端session以及所有executors上,JDBC驱动类必须对原来的类加载器是可见的。这是因为Java的DriverManager 类进行安全检查,这导致打开一个连接时,它会忽略所有对于原始来加载器不可见的驱动。一个方便的方法是在所有的worker节点上修改compute_classpath.sh以包含驱动的JARs。

* 一些数据库,例如H2,将所有的名称转化成大写的。在Spark SQL中你需要使用大写来指定那些名称。

性能调节(Performance Tuning)

对于一些工作负载来说,通过将数据缓存到内存中或者打开一些实验性的选项,可能会提高性能。

在内存中缓存数据

Spark SQL可以使用内存中的柱状格式来对表进行缓存,通过调用sqlContext.cacheTable("tableName") 或者rdataFrame.cache()。然后Spark SQL会扫描仅仅需要的列以及自动地调节压缩,以减少内存使用和GC压力。你可以调用sqlContext.uncacheTable("tableName") 来将表从内存中移除。内存缓存的配置可以使用SQLContext上的setConf或者通过使用SQL执行SET key=value命令的方式来完成。

Property Name

Default

Meaning

spark.sql.inMemoryColumnarStorage.compressed

true

When set to true Spark SQL will automatically select a compression codec for each column based on statistics of the data. 

当设置为true时,SparkSQL会根据统计出的各项数据为每一个列选择一种压缩编解码器。

spark.sql.inMemoryColumnarStorage.batchSize

10000

Controls the size of batches for columnar caching. Larger batch sizes can improve memory utilization and compression, but risk OOMs when caching data. 

为列状缓存控制批处理大小。较大的批大小可以提高内存的利用率和压缩,但是缓存数据时有OOMs的风险。

其他的配置选项

以下选项也可以用来调节执行查询时的性能。随着更多的优化被自动地执行,这些选项有可能会在将来的版本中被弃用,

Property Name

Default

Meaning

spark.sql.autoBroadcastJoinThreshold

10485760 (10 MB)

Configures the maximum size in bytes for a table that will be broadcast to all worker nodes when performing a join. By setting this value to -1 broadcasting can be disabled. Note that currently statistics are only supported for Hive Metastore tables where the command `ANALYZE TABLE <tableName> COMPUTE STATISTICS noscan` has been run. 

spark.sql.codegen

false

When true, code will be dynamically generated at runtime for expression evaluation in a specific query. For some queries with complicated expression this option can lead to significant speed-ups. However, for simple queries this can actually slow down query execution. 

spark.sql.shuffle.partitions

200

Configures the number of partitions to use when shuffling data for joins or aggregations. 

分布式SQL引擎

Spark SQL也可以使用它的JDBC/ODBC或者命令行接口作为分布式查询引擎。以这种模式,终端用户或者应用可以直接与Spark SQL交互来运行SQL查询,而不需要写任何代码。

运行Thrift JDBC/ODBC 服务

这里实现的Thrift JDBC/ODBC相当于Hive0.13中的HiveServer2.你可以用Spark或者Hive0.13自带的beeline脚本来测试JDBC服务。

在Spark目录中运行下面的命令来启动JDBC/ODBC server:

./sbin/start-thriftserver.sh

该脚本接受所有的bin/spark-submit命令行选项,外加一个--hiveconf选项来指定Hive属性。可能执行./sbin/start-thriftserver.sh --help来查看完整的可用选项列表。默认情况下,该服务会监听localhost:10000。你可以通过任一环境变量覆盖这个bahaviour,也就是:

export HIVE_SERVER2_THRIFT_PORT=<listening-port>

export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>./sbin/start-thriftserver.sh \  --master <master-uri> \  ...

或者系统属性

./sbin/start-thriftserver.sh \  --hiveconf hive.server2.thrift.port=<listening-port> \  --hiveconf hive.server2.thrift.bind.host=<listening-host> \  --master <master-uri>  ...

现在你可以使用beeline来测试Thrift JDBC/ODBC服务:

./bin/beeline

使用beeline来连接JDBC/ODBC:

beeline> !connect jdbc:hive2://localhost:10000

Beeline会向你询问用户名和密码。在非安全模式下,仅仅输入你机子的用户名以及一个空白的密码。对于安全模式,请按照beeline文档给出的指示。

通过将hive-site.xml文件放到conf/下来完成Hive的配置。

你也可以使用Hive自带的beeline脚本。

Thrift JDBC服务还支持通过HTTP传输发送Thrift RPC消息。使用下列设置作为系统属性或者通过conf/中hive-site.xml文件来启用HTTP模式:

hive.server2.transport.mode - Set this to value: httphive.server2.thrift.http.port - HTTP port number fo listen on; default is 10001hive.server2.http.endpoint - HTTP endpoint; default is cliservice

为了测试,使用beeline以http模式连接JDBC/ODBC服务:

beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>

运行Spark SQL CLI

Spark SQL CLI是一种方便的工具用来以本地模式运行Hive metastore服务,并且从命令行输入执行查询。注意,Spark SQL CLI不能和Thrift JDBC服务进行会话。

在Spark目录下,执行以下命令来启动Spark SQL CLI:

./bin/spark-sql

Hive的配置已经完成,通过将 hive-site.xml 文件放入conf/中。你可以运行 ./bin/spark-sql --help来查看完整的可用选项的列表。

迁移指南

从Spark SQL1.0-1.2升级到1.3Upgrading from Spark SQL 1.0-1.2 to 1.3。

在Spark 1.3中,我们从Spark SQL中移除了“Alpha”标签,作为其中的一部分,对可用的APIs进行了清理。从Spark 1.3以后,Spark SQL会与1.X系列中的其他版本提供二进制兼容。这种兼容性保证不包括的APIs明确地标记为不稳定。(也就是 DeveloperAPI 或者rExperimental)

重命名SchemaRDD为DataFrame

当升级到Spark SQL 1.3后,用户可以发现最大的改变是SchemaRDD重命名为DataFrame。这主要是因为DataFrame不再直接继承自RDD。而是自己实现了RDDs提供的大多数功能。DataFrames仍旧可以通过调用.rdd方法来转化成RDDs。

Scala中有一种类型别名,从 SchemaRDD到DataFrame为一些用例提供了源的兼容性。但还是建议用户更新它们的代码来使用DataFrame。Java和Python 用户需要更新它们的代码。

Java和Scala APIs的统一

Spark1.3之前,有单独的Java兼容性类(JavaSQLContext 和 JavaSchemaRDD)映射成Scala API。在Spark1.3中,Java API和Scala API进行了统一。任意语言的用户应该使用SQLContext和DataFrame。通常,这些类会尝试使用两种语言中都可用的类型(即,Array而不是语言特定的集合)。在有些情况下,如果没有相同的类型存在,则会使用函数重载来替代。

此外,Java指定的类型API被移除了。Scala和Java的用户应该使用inorg.apache.spark.sql.types中的类来以编程方式描述模式。

隐式转换的隔离以及dsl包的删除(只有Scala)

在Spark 1.3之前许多代码的例子以import sqlContext._开始,它会将sqlContext中所有的函数引入到scope中。在Spark 1.3中,我们将在SQLContext内部的RDDs转换成DataFrames转成成对象进行了隐式转化的隔离。用户现在需要写 import sqlContext.implicits._。

此外,隐式转换现在只有组成Rroducts的RDDs参数带有一个toDF方法,而不是自动地应用。

当使用DSL(现在替代为 DataFrame API)内部的方法时,用户之前会import org.apache.spark.sql.catalyst.dsl。现在使用公共的dataframe方法API应该用import org.apache.spark.sql.functions._。

为DataType删除在org.apache.spark.sql中的类型别名(只有Scala)

Spark 1.3为DataType删除了出现在根本的sql包中的类型别名。现在,用户应该引入org.apache.spark.sql.types中的类。

UDF注册移到sqlContext.udf 中(Java 和 Scala)

用于注册UDFs的函数,不是用于DataFrame DSL就是SQL,已经被移动了SQLContext中的udf对象中。

sqlCtx.udf.register("strLen", (s: String) => s.length())

Python的UDF注册没有改变。

Python中的DataTypes不再是单例了(Python DataTypes No Longer Singletons)。

当使用Python中的DataTypes,你需要创建它们(i.e. StringType()),而不是引用一个单例。

与Apache Hive的兼容性

Spark SQL被设计出来用于兼容Hive Metastore, SerDes 以及 UDFs。目前的Spark SQL是基于Hive 0.12.0和0.13.1。

部署在现有的Hive仓库中(Deploying in Existing Hive Warehouses)

The Spark SQL Thrigt JDBC服务被设计出来“立即可用” 的兼容现有的Hive安装。你不需要修改已经存在的Hive Metastore 或者更改数据位置或者表分区。

支持的Hive特性

Spark SQL 支持大量的Hive特性,例如:

· Hive query statements, including:

o SELECT

o GROUP BY

o ORDER BY

o CLUSTER BY

o SORT BY

· All Hive operators, including:

o Relational operators (=, ⇔, ==, <>, <, >, >=, <=, etc)

o Arithmetic operators (+, -, *, /, %, etc)

o Logical operators (AND, &&, OR, ||, etc)

o Complex type constructors

o Mathematical functions (sign, ln, cos, etc)

o String functions (instr, length, printf, etc)

· User defined functions (UDF)

· User defined aggregation functions (UDAF)

· User defined serialization formats (SerDes)

· Joins

o JOIN

o {LEFT|RIGHT|FULL} OUTER JOIN

o LEFT SEMI JOIN

o CROSS JOIN

· Unions

· Sub-queries

o SELECT col FROM ( SELECT a + b AS col from t1) t2

· Sampling

· Explain

· Partitioned tables

· View

· All Hive DDL Functions, including:

o CREATE TABLE

o CREATE TABLE AS SELECT

o ALTER TABLE

· Most Hive Data types, including:

o TINYINT

o SMALLINT

o INT

o BIGINT

o BOOLEAN

o FLOAT

o DOUBLE

o STRING

o BINARY

o TIMESTAMP

o DATE

o ARRAY<>

o MAP<>

o STRUCT<>

不支持的Hive功能

下面是不支持的Hive特性的列表。大多数特性很少会在Hive部署中用到。

Major Hive Features

· Tables with buckets: bucket is the hash partitioning within a Hive table partition. Spark SQL doesn’t support buckets yet.

· 带有buckets的Table:bucket是Hive表分区中的哈希分区。Spark SQL不支持buckets。

Esoteric Hive Features 

* UNION type * Unique join * Column statistics collecting: Spark SQL does not piggyback scans to collect column statistics at the moment and only supports populating the sizeInBytes field of the hive metastore.

Hive Input/Output Formats

· File format for CLI: For results showing back to the CLI, Spark SQL only supports TextOutputFormat.

· Hadoop archive

· CLI文件格式:对于结果,显示回CLI,Spark SQL 只支持TextOutputFormat

· Hadoop存档

Hive Optimizations

少量的Hive优化没有包含在Spark中。由于Spark SQL的内存计算模型它们中的有一些(例如索引)是次要的。其他的一些会来的Spark SQL版本中加入。

· Block level bitmap indexes and virtual columns 

· Automatically determine the number of reducers for joins and groupbys: Currently in Spark SQL, you need to control the degree of parallelism post-shuffle using “SET spark.sql.shuffle.partitions=[num_tasks];”.

· Meta-data only query: For queries that can be answered by using only meta data, Spark SQL still launches tasks to compute the result.

· Skew data flag: Spark SQL does not follow the skew data flags in Hive.

· STREAMTABLE hint in join: Spark SQL does not follow the STREAMTABLE hint.

· Merge multiple small files for query results: if the result output contains multiple small files, Hive can optionally merge the small files into fewer large files to avoid overflowing the HDFS metadata. Spark SQL does not support that.

· 块级别的位图索引和虚拟列(用来创建索引)

· 自动为joins和groupbys决定reducers的数量:目前在Spark SQL中,你需要使用“SET spark.sql.shuffle.partitions=[num_tasks];”来控制并行的post-shuffle的度。

· 仅元数据查询:对于仅使用元数据来回答的查询,Spark SQL还是启动任务来计算结果。

· 偏斜数据标志:Spark SQL不遵循Hive中的偏斜数据标志。

· join中的STREAMTABLE hint:Spark SQL不遵循STREAMTABLE hint。

· 为查询结果合并多个小文件:如果结果输出中包含多个小文件,Hive可以选择性地将多个小文件合并成更少的更大的文件,避免HDFS元数据的溢出。Spark SQL不支持这些。

数据类型

Spark SQL 和 DataFrames支持以下的数据类型:

· Numeric types

o ByteType: Represents 1-byte signed integer numbers. The range of numbers is from -128 to 127.

o ShortType: Represents 2-byte signed integer numbers. The range of numbers is from -32768 to 32767.

o IntegerType: Represents 4-byte signed integer numbers. The range of numbers is from -2147483648 to 2147483647.

o LongType: Represents 8-byte signed integer numbers. The range of numbers is from -9223372036854775808 to 9223372036854775807.

o FloatType: Represents 4-byte single-precision floating point numbers.

o DoubleType: Represents 8-byte double-precision floating point numbers.

o DecimalType: Represents arbitrary-precision signed decimal numbers. Backed internally by java.math.BigDecimal. A BigDecimal consists of an arbitrary precision integer unscaled value and a 32-bit integer scale.

· String type

o StringType: Represents character string values.

· Binary type

o BinaryType: Represents byte sequence values.

· Boolean type

o BooleanType: Represents boolean values.

· Datetime type

o TimestampType: Represents values comprising values of fields year, month, day, hour, minute, and second.

o DateType: Represents values comprising values of fields year, month, day.

· Complex types

o ArrayType(elementType, containsNull): Represents values comprising a sequence of elements with the type of elementType.containsNull is used to indicate if elements in a ArrayType value can have null values.

o MapType(keyType, valueType, valueContainsNull): Represents values comprising a set of key-value pairs. The data type of keys are described by keyType and the data type of values are described by valueType. For a MapType value, keys are not allowed to have nullvalues. valueContainsNull is used to indicate if values of a MapType value can have null values.

o StructType(fields): Represents values with the structure described by a sequence of StructFields (fields).

§ StructField(name, dataType, nullable): Represents a field in a StructType. The name of a field is indicated by name. The data type of a field is indicated by dataType. nullable is used to indicate if values of this fields can have null values.

Spark SQL中所有的数据类型的都在包 package org.apache.spark.sql.types中,你可以这样访问它们:

import  org.apache.spark.sql.types._

 

Data type

Value type in Scala

API to access or create a data type

ByteType

Byte

ByteType

ShortType

Short

ShortType

IntegerType

Int

IntegerType

LongType

Long

LongType

FloatType

Float

FloatType

DoubleType

Double

DoubleType

DecimalType

java.math.BigDecimal

DecimalType

StringType

String

StringType

BinaryType

Array[Byte]

BinaryType

BooleanType

Boolean

BooleanType

TimestampType

java.sql.Timestamp

TimestampType

DateType

java.sql.Date

DateType

ArrayType

scala.collection.Seq

ArrayType(elementType, [containsNull])
Note: The default value of containsNull is true.

MapType

scala.collection.Map

MapType(keyType, valueType, [valueContainsNull])
Note: The default value of valueContainsNull is true.

StructType

org.apache.spark.sql.Row

StructType(fields)
Note: fields is a Seq of StructFields. Also, two fields with the same name are not allowed.

StructField

The value type in Scala of the data type of this field (For example, Int for a StructField with the data type IntegerType)

StructField(name, dataType, nullable)

© 著作权归作者所有

openthings
粉丝 320
博文 1129
码字总数 675031
作品 1
东城
架构师
私信 提问
【Spark】Spark SQL, DataFrames and Datasets Guide(翻译文,持续更新)

本文主要是翻译Spark官网Spark SQL programming guide 。只能保证大概意思,尽量保证细节。英文水平有限,如果有错误的地方请指正,轻喷。目录导航在右上角 Spark SQL、DataFrames 和 Datase...

跑呀跑
2018/09/19
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
5.1K
0
Spark 2.0 时代全面到来 —— 2.0.1 版本发布

距离Spark 2.0.0发布两个月后,Spark 2.0.1版本发布了,这是一个修正版本,共处理了300多个Issue,涉及spark稳定性和bug等方面的修复 ,它的发布意味着Spark 2.0接近生产环境使用要求,想要尝...

达尔文
2016/10/08
13.4K
22
地铁译:Spark for python developers ---Spark的数据戏法

聚焦在 Twitter 上关于Apache Spark的数据, 这些是准备用于机器学习和流式处理应用的数据。 重点是如何通过分布式网络交换代码和数据,获得 串行化, 持久化 , 调度和缓存的实战经验 。 认真使...

abel_cao
01/17
0
0
Apache Spark 2.0 最快今年4月亮相

1月才刚释出1.6版的大数据技术Spark,下一个2.0版本预计4、5月释出,将提供可运行在SQL/Dataframe上的结构化串流即时引擎,并统一化Dataset及DataFrame 大数据技术Spark今年1月才刚释出1.6版...

oschina
2016/02/29
3K
16

没有更多内容

加载失败,请刷新页面

加载更多

OSChina 周六乱弹 —— 早上儿子问我他是怎么来的

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @凉小生 :#今日歌曲推荐# 少点戾气,愿你和这个世界温柔以待。中岛美嘉的单曲《僕が死のうと思ったのは (曾经我也想过一了百了)》 《僕が死の...

小小编辑
今天
2.3K
15
Excption与Error包结构,OOM 你遇到过哪些情况,SOF 你遇到过哪些情况

Throwable 是 Java 中所有错误与异常的超类,Throwable 包含两个子类,Error 与 Exception 。用于指示发生了异常情况。 Java 抛出的 Throwable 可以分成三种类型。 被检查异常(checked Exc...

Garphy
今天
38
0
计算机实现原理专题--二进制减法器(二)

在计算机实现原理专题--二进制减法器(一)中说明了基本原理,现准备说明如何来实现。 首先第一步255-b运算相当于对b进行按位取反,因此可将8个非门组成如下图的形式: 由于每次做减法时,我...

FAT_mt
昨天
40
0
好程序员大数据学习路线分享函数+map映射+元祖

好程序员大数据学习路线分享函数+map映射+元祖,大数据各个平台上的语言实现 hadoop 由java实现,2003年至今,三大块:数据处理,数据存储,数据计算 存储: hbase --> 数据成表 处理: hive --> 数...

好程序员官方
昨天
61
0
tabel 中含有复选框的列 数据理解

1、el-ui中实现某一列为复选框 实现多选非常简单: 手动添加一个el-table-column,设type属性为selction即可; 2、@selection-change事件:选项发生勾选状态变化时触发该事件 <el-table @sel...

everthing
昨天
21
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部