文档章节

spark2.2官方教程笔记-Spark SQL, DataFrames and Datasets向导

skanda
 skanda
发布于 2017/08/07 17:17
字数 1604
阅读 22
收藏 2
点赞 0
评论 0

概括

spark SQL是一个spark结构数据处理模型。不像基本的rdd api,Spark 提供的接口可以给spark提供更多更多关于数据的结构和正在执行的计算的信息。另外,spark sql在性能优化上比以往的有做改善。目前有更多的方式和spark sql交互:sql,dataset api。无论你是用哪种api/语言,计算时最终使用相同的sql引擎。

SQL

  Spark Sql的一个作用是执行sql查询。spark sql能够用来从现有的hive中读取数据。更多关于如何配置使用的问题,请参考Hibe 表章节。当使用其他编程语言运行sql,结果也将返回Dataset/DataFrame。你能通过使用命令行或者jdbc/odbc和sql接口交互。

Datasets and DataFrames

一个dataset是分布式数据集合。数据集合是一个spark1.6开始提供的接口,想要把rdd的优势(强类似,强大的lambda函数功能)和spark sql的优化执行的优势结合到一起来。dataset能够从jvm对象中创建然后使用transformations(map,flagmap,filter等等)进行转换。

开始

SparkSession

所有spark功能的入口点是SparkSession类,创建一个基本的sparksession,只要使用一句语句就话哦啊

SparkSession.builder():

import org.apache.spark.sql.SparkSession

val spark = SparkSession
  .builder()
  .appName("Spark SQL basic example")
  .config("spark.some.config.option", "some-value")
  .getOrCreate()

// For implicit conversions like converting RDDs to DataFrames
import spark.implicits._

更完整的代码可以在spark安装包

examples/src/main/scala/org/apache/spark/examples/sql/SparkSQLExample.scala

路径下查看。

在spark2.0中SparkSession提供了内部对Hive功能的支持,包括查询HiveSql,访问Hive UDFs,以及从Hive表读数据的能力。要使用这些特征,你不需要单独安装一个hive。

创建DataFrames

通过SparkSession,应用能够由现有的rdd,或者hive表,或者spark数据源创建DataFrames。举个栗子,通过json文件创建

val df = spark.read.json("examples/src/main/resources/people.json")

// Displays the content of the DataFrame to stdout
df.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

非强类型/泛型 数据集操作(又叫做DataFrame操作)

DataFrames提供一个专用的语言来操作结构化数据。

如上所述,在Spark2.0中,DataFrames只是包含Rows的数据集。相对“强类型转换”这些被称作弱类型转换。【烂英文有时候翻译起来自己都不知道官方文档想表达啥。】

还是下面的代码好理解

// This import is needed to use the $-notation
import spark.implicits._
// Print the schema in a tree format
df.printSchema()
// root
// |-- age: long (nullable = true)
// |-- name: string (nullable = true)

// Select only the "name" column
df.select("name").show()
// +-------+
// |   name|
// +-------+
// |Michael|
// |   Andy|
// | Justin|
// +-------+

// Select everybody, but increment the age by 1
df.select($"name", $"age" + 1).show()
// +-------+---------+
// |   name|(age + 1)|
// +-------+---------+
// |Michael|     null|
// |   Andy|       31|
// | Justin|       20|
// +-------+---------+

// Select people older than 21
df.filter($"age" > 21).show()
// +---+----+
// |age|name|
// +---+----+
// | 30|Andy|
// +---+----+

// Count people by age
df.groupBy("age").count().show()
// +----+-----+
// | age|count|
// +----+-----+
// |  19|    1|
// |null|    1|
// |  30|    1|
// +----+-----+

更全的操作请移步api文档

除了简单的列引用和表达式,datasets也有丰富的函数库包括字符串操作,时间计算,常用的数学操作等。具体请移步DataFrame Function Reference.

执行Sql查询

SparkSession sql功能使应用可以执行一串sql查询并且返回一个DataFrame。把sql引入了语言。

// Register the DataFrame as a SQL temporary view
df.createOrReplaceTempView("people")

val sqlDF = spark.sql("SELECT * FROM people")
sqlDF.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

全局临时视图

临时视图的生命周期在Session范围内,随着session的停止而释放。如果你要让所有的session的可以访问,那么你就要搞个全局临时视图了。这个视图绑定在系统保留的global_temp数据库中,访问前我们必须通过引用gloabal_temp来访问它。 e.g. SELECT * FROM global_temp.view1.

// Register the DataFrame as a global temporary view
df.createGlobalTempView("people")

// Global temporary view is tied to a system preserved database `global_temp`
spark.sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

// Global temporary view is cross-session
spark.newSession().sql("SELECT * FROM global_temp.people").show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

创建Datasets

 datasets和rdds类似,和java序列化相反的是,他们使用特殊的编码方式去序列化对象。

// Note: Case classes in Scala 2.10 can support only up to 22 fields. To work around this limit,
// you can use custom classes that implement the Product interface
case class Person(name: String, age: Long)

// Encoders are created for case classes
val caseClassDS = Seq(Person("Andy", 32)).toDS()
caseClassDS.show()
// +----+---+
// |name|age|
// +----+---+
// |Andy| 32|
// +----+---+

// Encoders for most common types are automatically provided by importing spark.implicits._
val primitiveDS = Seq(1, 2, 3).toDS()
primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
val path = "examples/src/main/resources/people.json"
val peopleDS = spark.read.json(path).as[Person]
peopleDS.show()
// +----+-------+
// | age|   name|
// +----+-------+
// |null|Michael|
// |  30|   Andy|
// |  19| Justin|
// +----+-------+

和rdd对象交互

两种从rdd对象转换到dataset的方法。第一种是使用反射来反推一个rdd的对象类型。第二种是通过一个现有的rdd和你构建的schema,并且这种方式允许创建泛型,到运行时再来定义。

反射推理schema

先建好schema对应的类型,在map过程加进去。show code

// For implicit conversions from RDDs to DataFrames
import spark.implicits._

// Create an RDD of Person objects from a text file, convert it to a Dataframe
val peopleDF = spark.sparkContext
  .textFile("examples/src/main/resources/people.txt")
  .map(_.split(","))
  .map(attributes => Person(attributes(0), attributes(1).trim.toInt))
  .toDF()
// Register the DataFrame as a temporary view
peopleDF.createOrReplaceTempView("people")

// SQL statements can be run by using the sql methods provided by Spark
val teenagersDF = spark.sql("SELECT name, age FROM people WHERE age BETWEEN 13 AND 19")

// The columns of a row in the result can be accessed by field index
teenagersDF.map(teenager => "Name: " + teenager(0)).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// or by field name
teenagersDF.map(teenager => "Name: " + teenager.getAs[String]("name")).show()
// +------------+
// |       value|
// +------------+
// |Name: Justin|
// +------------+

// No pre-defined encoders for Dataset[Map[K,V]], define explicitly
implicit val mapEncoder = org.apache.spark.sql.Encoders.kryo[Map[String, Any]]
// Primitive types and case classes can be also defined as
// implicit val stringIntMapEncoder: Encoder[Map[String, Any]] = ExpressionEncoder()

// row.getValuesMap[T] retrieves multiple columns at once into a Map[String, T]
teenagersDF.map(teenager => teenager.getValuesMap[Any](List("name", "age"))).collect()
// Array(Map("name" -> "Justin", "age" -> 19))

编程式定义schema

import org.apache.spark.sql.types._

// Create an RDD
val peopleRDD = spark.sparkContext.textFile("examples/src/main/resources/people.txt")

// The schema is encoded in a string
val schemaString = "name age"

// Generate the schema based on the string of schema
val fields = schemaString.split(" ")
  .map(fieldName => StructField(fieldName, StringType, nullable = true))
val schema = StructType(fields)

// Convert records of the RDD (people) to Rows
val rowRDD = peopleRDD
  .map(_.split(","))
  .map(attributes => Row(attributes(0), attributes(1).trim))

// Apply the schema to the RDD
val peopleDF = spark.createDataFrame(rowRDD, schema)

// Creates a temporary view using the DataFrame
peopleDF.createOrReplaceTempView("people")

// SQL can be run over a temporary view created using DataFrames
val results = spark.sql("SELECT name FROM people")

// The results of SQL queries are DataFrames and support all the normal RDD operations
// The columns of a row in the result can be accessed by field index or by field name
results.map(attributes => "Name: " + attributes(0)).show()
// +-------------+
// |        value|
// +-------------+
// |Name: Michael|
// |   Name: Andy|
// | Name: Justin|
// +-------------+

聚合

内建的dataframe函数提供常用的聚合函数,比如count,countDistinct,avg,max,min等等。

© 著作权归作者所有

共有 人打赏支持
skanda
粉丝 9
博文 75
码字总数 50007
作品 0
厦门
你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836 ⋅ 04/12 ⋅ 0

Spark操作hive示例代码

示例代码 val sparkConf = new SparkConf().setAppName("hive example") val sc = new SparkContext(sparkConf) val hiveContext = new HiveContext(sc) val tableName="hive_test"; var spa......

守望者之父 ⋅ 06/15 ⋅ 0

Spark Streaming入门

欢迎大家前往腾讯云+社区,获取更多腾讯海量技术实践干货哦~ 本文将帮助您使用基于HBase的Apache Spark Streaming。Spark Streaming是Spark API核心的一个扩展,支持连续的数据流处理。 什么...

腾讯云加社区 ⋅ 05/16 ⋅ 0

A Vision for Making Deep Learning Simple

A Vision for Making Deep Learning Simple When MapReduce was introduced 15 years ago, it showed the world a glimpse into the future. For the first time, engineers at Silicon Vall......

openthings ⋅ 05/17 ⋅ 0

Spark Streaming 框架 - StreamingPro

概述 Spark 是一个可扩展的可编程框架,用于数据集的大规模分布式处理, 称为弹性分布式数据集(Resilient Distributed Datasets,RDD)。 Spark Streaming 是 Spark API 核心的扩展,它支持...

匿名 ⋅ 04/29 ⋅ 0

Apache Flink和Apache Spark有什么异同?它们的发展前景分别怎样?

============================= object WordCount { def main(args: Array[String]) {val env = new SparkContext("local","wordCount")val data = List("hi","how are you","hi")val dataSe......

justlpf ⋅ 05/12 ⋅ 0

教你如何成为Spark大数据高手

Spark目前被越来越多的企业使用,和Hadoop一样,Spark也是以作业的形式向集群提交任务,那么如何成为Spark大数据高手?下面就来个深度教程。 分享之前我还是要推荐下我自己创建的大数据学习交...

风火数据 ⋅ 05/20 ⋅ 0

Spark2.1.0之基础知识

在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文将对Spark的基础知识进行介绍。但在此之前,读者先跟随本人来一次简单的时光穿梭,最后还将对Java与Scala在语言上进...

beliefer ⋅ 05/24 ⋅ 0

spark和hive storm mapreduce的比较

Spark Streaming与Storm都可以用于进行实时流计算。但是他们两者的区别是非常大的。其中区别之一 就是,Spank Streaming和Stom的计算模型完全不一样,Spark Streaming是基于RDD的,因此需要将...

necther ⋅ 04/28 ⋅ 0

Comprehensive Introduction to Apache Spark

Introduction Industry estimates that we are creating more than 2.5 Quintillion bytes of data every year. Think of it for a moment – 1 Qunitillion = 1 Million Billion! Can you i......

grasp_D ⋅ 06/15 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

MySQL 数据库设计总结

规则1:一般情况可以选择MyISAM存储引擎,如果需要事务支持必须使用InnoDB存储引擎。 注意:MyISAM存储引擎 B-tree索引有一个很大的限制:参与一个索引的所有字段的长度之和不能超过1000字节...

OSC_cnhwTY ⋅ 56分钟前 ⋅ 0

多线程(四)

线程池和Exector框架 什么是线程池? 降低资源的消耗 提高响应速度,任务:T1创建线程时间,T2任务执行时间,T3线程销毁时间,线程池没有或者减少T1和T3 提高线程的可管理性。 线程池要做些什...

这很耳东先生 ⋅ 今天 ⋅ 0

使用SpringMVC的@Validated注解验证

1、SpringMVC验证@Validated的使用 第一步:编写国际化消息资源文件 编写国际化消息资源ValidatedMessage.properties文件主要是用来显示错误的消息定制 [java] view plain copy edit.userna...

瑟青豆 ⋅ 今天 ⋅ 0

19.压缩工具gzip bzip2 xz

6月22日任务 6.1 压缩打包介绍 6.2 gzip压缩工具 6.3 bzip2压缩工具 6.4 xz压缩工具 6.1 压缩打包介绍: linux中常见的一些压缩文件 .zip .gz .bz2 .xz .tar .gz .tar .bz2 .tar.xz 建立一些文...

王鑫linux ⋅ 今天 ⋅ 0

6. Shell 函数 和 定向输出

Shell 常用函数 简洁:目前没怎么在Shell 脚本中使用过函数,哈哈,不过,以后可能会用。就像java8的函数式编程,以后获取会用吧,行吧,那咱们简单的看一下具体的使用 Shell函数格式 linux ...

AHUSKY ⋅ 今天 ⋅ 0

单片机软件定时器

之前写了一个软件定时器,发现不够优化,和友好,现在重写了 soft_timer.h #ifndef _SOFT_TIMER_H_#define _SOFT_TIMER_H_#include "sys.h"typedef void (*timer_callback_function)(vo...

猎人嘻嘻哈哈的 ⋅ 今天 ⋅ 0

好的资料搜说引擎

鸠摩搜书 简介:鸠摩搜书是一个电子书搜索引擎。它汇集了多个网盘和电子书平台的资源,真所谓大而全。而且它还支持筛选txt,pdf,mobi,epub、azw3格式文件。还显示来自不同网站的资源。对了,...

乔三爷 ⋅ 今天 ⋅ 0

Debian下安装PostgreSQL的表分区插件pg_pathman

先安装基础的编译环境 apt-get install build-essential libssl1.0-dev libkrb5-dev 将pg的bin目录加入环境变量,主要是要使用 pg_config export PATH=$PATH:/usr/lib/postgresql/10/bin 进......

玛雅牛 ⋅ 今天 ⋅ 0

inno安装

#define MyAppName "HoldChipEngin" #define MyAppVersion "1.0" #define MyAppPublisher "Hold Chip, Inc." #define MyAppURL "http://www.holdchip.com/" #define MyAppExeName "HoldChipE......

backtrackx ⋅ 今天 ⋅ 0

Linux(CentOS)下配置php运行环境及nginx解析php

【part1:搭建php环境】 1.选在自己需要安装的安装包版本,wget命令下载到服务器响应目录 http://php.net/releases/ 2.解压安装包 tar zxf php-x.x.x 3.cd到解压目录执行如下操作 cd ../php-...

硅谷课堂 ⋅ 今天 ⋅ 0

没有更多内容

加载失败,请刷新页面

加载更多

下一页

返回顶部
顶部