文档章节

Spark(八):Spark SQL 之 Data Sources And Save Modes

berg-dm
 berg-dm
发布于 2016/06/06 20:33
字数 1168
阅读 713
收藏 3

一:Data Sources(数据源):

1.1    了解数据源。

        Spark SQL 支持对各种数据源通过DataFrame接口操作。DataFrame 可以作为正常 的RDDs进行操作,也可以注册为一个临时表。
    注册DataFrame为一个表允许您在其数据运行 SQL 查询。本节介绍用于加载和保存使用 Spark数据源的数据的一般方法,然后再进入到可用的内置数据源的特定选项。

1.2    Generic Load/Save Functions(通用加载/保存功能)。

        最简单的形式,默认的数据源 (parquet除非否则配置由 spark.sql.sources.default) 将用于所有操作。

        eg:第一种读取方式:通过 parquetFile("xxx") 来读取

       首先把spark-1.6.1-bin-hadoop2.6\examples\src\main\resources下的users.parquet上传到HDFS上。

public class SparkSqlDemo4 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//创建了 sqlContext的上下文,注意,它是DataFrame的起点。
		SQLContext sqlContext = new SQLContext(sc);

		DataFrame df = sqlContext.read().load("hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet");
		
		df.select("name", "favorite_color").write().save("namesAndFavColors.parquet");

		//指定保存模式  
		//df.select("name", "favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");  
		//第一种读取方式  
		DataFrame parquetFile = sqlContext.parquetFile("namesAndFavColors.parquet");

		parquetFile.registerTempTable("users");  

		DataFrame df1 = sqlContext.sql("SELECT name,favorite_color FROM users ");  
		
		df1.show();
		
		List<String> listString = df1.javaRDD().map(new Function<Row, String>() {  

			private static final long serialVersionUID = 1L;
			public String call(Row row) {  
				return "Name: " + row.getString(0) + " ,FavoriteColor: " + row.getString(1);  
			}  
		}).collect();  

		for (String string : listString) {
			System.out.println(  string );
		}
	}
}

输出结果如下:

+------+--------------+
|  name|favorite_color|
+------+--------------+
|Alyssa|          null|
|   Ben|           red|
+------+--------------+

Name: Alyssa ,FavoriteColor: null
Name: Ben ,FavoriteColor: red

1.3    Manually Specifying Options(手动指定选项):

         你可以也手动指定的数据源,将与您想要将传递给数据源的任何额外选项一起使用。

         数据源由其完全限定名称 (即 org.apache.spark.sql.parquet),

        但对于内置来源您还可以使用 他们短名称(json, parquet, jdbc)。

        DataFrames 任何类型可以转换为其他类型,使用此语法。

     eg:第二种读取方式:通过 parquet("xxx") 来读取

public class SparkSqlDemo5 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//创建了 sqlContext的上下文,注意,它是DataFrame的起点。
		SQLContext sqlContext = new SQLContext(sc);


		DataFrame df = sqlContext.read().format("json").load("hdfs://192.168.226.129:9000/txt/sparkshell/people.json");
		df.select("id", "name","sex","age").write().format("parquet").save("people.parquet");


		DataFrame parquetFile = sqlContext.read().parquet("people.parquet");  
		
		parquetFile.registerTempTable("people");  

		DataFrame df1 = sqlContext.sql("SELECT id,name,sex,age FROM people WHERE age >= 21 AND age <= 23");  

		df1.show();
		
		df1.printSchema();

		List<String> listString = df1.javaRDD().map(new Function<Row, String>() {  

			private static final long serialVersionUID = 1L;
			public String call(Row row) {  
				return "Id: " + row.getString(0) + ", Name: "+row.getString(1) + ",Sex" +row.getString(2)+ ", Age: " + row.getLong(3);
			}
		}).collect();  

		for (String string : listString) {
			System.out.println(  string );
		}
	}
}

1.4    Run SQL on files directly(直接在文件上运行SQL)

       您也可以查询该文件直接使用 SQL,并对其进行查询,而不是使用 API 读取文件加载到DataFrame。

public class SparkSqlDemo6 {

	private static String appName = "Test Spark RDD";
	private static String master = "local";

	public static void main(String[] args) {
		SparkConf conf = new SparkConf();
		conf.set("spark.testing.memory", "269522560000");
		JavaSparkContext sc = new JavaSparkContext(master, appName, conf);

		//创建了 sqlContext的上下文,注意,它是DataFrame的起点。
		SQLContext sqlContext = new SQLContext(sc);

		// 注意 sql 语句 parquet 后面目录的符号 。 
		DataFrame df = sqlContext.sql("SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet`");
		
		df.show();
	}
}

    注意:sql语句中红色部分标记的符号。

"SELECT * FROM parquet.`hdfs://192.168.226.129:9000/txt/sparkshell/users.parquet`"

 

二:Save Modes(保存模式):

        Save操作可以可选择性地接收一个SaveModel,如果数据已经存在了,指定如何处理已经存在的数据。意识到这些保存模式没有利用任何锁,也不是原子的,这很重要。因此,如果有多个写入者试图往同一个地方写入,这是不安全的。此外,当执行一个Overwrite,在写入新的数据之前会将原来的数据进行删除。

eg:指定保存模式:
       df.select("name","favorite_color").write().mode(SaveMode.Overwrite).save("namesAndFavColors.parquet");  

Scala/Java Any Language Meaning
SaveMode.ErrorIfExists(default) "error"(default) 当保存DataFrame到一个数据源,如果数据已经存在,将会引发异常。
SaveMode.Append "append"


当保存DataFrame到一个数据源,如果数据/表已经存在,DataFrame的内容预计将追加到现有数据后面。

SaveMode.Overwrite "overwrite" 覆盖模式意味着当保存DataFrame到一个数据源,如果数据/表已存在,现有数据预计将覆盖原有的DataFrame内容。
SaveMode.Ignore "ignore" Ignore模式意味着当向数据源中保存一个DataFrame时,如果数据已经存在,save操作不会将DataFrame的内容进行保存,也不会修改已经存在的数据。这与SQL中的`CREATE TABLE IF NOT EXISTS`相似。

三:Parquet 文件

    Parquet是一种列式存储格式的文件,被许多其他数据处理系统所支持。Spark SQL支持度对Parquet文件的读和写,自动保存原有数据的模式。

 

© 著作权归作者所有

berg-dm
粉丝 26
博文 98
码字总数 88970
作品 0
深圳
程序员
私信 提问
基于spark1.3.1的spark-sql实战-01

sqlContext总的一个过程如下图所示: SQL语句经过SqlParse解析成UnresolvedLogicalPlan; 使用analyzer结合数据数据字典(catalog)进行绑定,生成resolvedLogicalPlan; 使用optimizer对res...

stark_summer
2015/05/19
404
0
Apache Spark 2.4.0 正式发布

Apache Spark 2.4 与昨天正式发布,Apache Spark 2.4 版本是 2.x 系列的第五个版本。 如果想及时了解 Spark、Hadoop或者Hbase相关的文章,欢迎关注微信公共帐号: itebloghadoop Apache Spa...

Spark
2018/11/09
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
5.4K
0
你不能错过的 spark 学习资源

1. 书籍,在线文档 2. 网站 3. Databricks Blog 4. 文章,博客 5. 视频

u012608836
2018/04/12
0
0
Spark DataSource API

Spark 1.3 引入了第一版的数据源 API,我们可以使用它将常见的数据格式整合到 Spark SQL 中。但是,随着 Spark 的不断发展,这一 API 也体现出了其局限性,故而 Spark 团队不得不加入越来越多...

GordonNemo
03/07
39
0

没有更多内容

加载失败,请刷新页面

加载更多

JavaScript设计模式——适配器模式

  适配器模式是设计模式行为型模式中的一种模式;   定义:   适配器用来解决两个已有接口之间不匹配的问题,它并不需要考虑接口是如何实现,也不用考虑将来该如何修改;适配器不需要修...

有梦想的咸鱼前端
36分钟前
3
0
Andorid SQLite数据库开发基础教程(1)

Andorid SQLite数据库开发基础教程(1) Android数据库访问方式 SQLite是Android系统默认支持的文件数据库。该数据库支持SQL语言,适合开发人员上手。本教程将讲解如何开发使用SQLite的Andro...

大学霸
39分钟前
3
0
Handler简解

Handler 这里简化一下代码 以便理解 Handler不一定要在主线程建 但如Handler handler = new Handler(); 会使用当前的Looper的, 由于要更新UI 所以最好在主线程 new Handler() { mLooper = Lo...

shzwork
今天
4
0
h5获取摄像头拍照功能

完整代码展示: <!DOCTYPE html> <head> <title>HTML5 GetUserMedia Demo</title> <meta charset="utf-8"> <meta name="viewport" content="width=device-width, initial-scale=1.0, maximum......

诗书易经
今天
3
0
正向代理和反向代理

文章来源 运维公会:正向代理和反向代理 1、正向代理 (1)服务对象不同 正向代理服务器的服务对象是客户端,可以将客户端和代理服务器看作一个整体。 (2)配置方法不同 需要在客户端配置代...

运维团
今天
5
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部