文档章节

Spark SQL编程

k
 kunping
发布于 2017/03/27 22:55
字数 1175
阅读 128
收藏 0

1、测试数据
	val acTransList = Seq("SB10001,1000", "SB10002,1200",
		"SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000",
		"SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000",
		"SB10002,-10")
		
2、定义Trans类
	case class Trans(accNo: String, tranAmount: Double)
	
3、创建SparkSession
	val conf = new SparkConf().setMaster("local[2]").setAppName("SparkSQL")
	val spark = SparkSession.builder().config(conf).getOrCreate()
	val sc = spark.sparkContext
	
4、创建DataFrame方式(2种方式)
	a. 通过createDataFrame()方法创建
		(1).RDD[Trans]方式
			val rdd = sc.parallelize(acTransList).map(_.split(",")).map(x => Trans(x(0), x(1).toDouble))
			val acTransDF = spark.createDataFrame(rdd);
		(2).schema方式
			//StructField含义:StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)
			val schema = StructType(StructField("accNo", StringType, false) :: StructField("tranAmount", DoubleType, false) :: Nil)
			val rdd = sc.parallelize(acTransList).map(_.split(",")).map(x => Row(x(0), x(1).toDouble))
			val acTransDF = spark.createDataFrame(rdd, schema);
		(3).Seq[Trans]方式
			val acTransSeq = Seq(Trans("SB10001", 1000.toDouble), Trans("SB10002", 1200.toDouble))
			val acTransDF = spark.createDataFrame(acTransSeq);
	b.通过RDD到DataFrame的隐式转换
		import spark.implicits._
		//若不加("accNo", "tranAmount")列名,默认为(_1, _2)
		val acTransDF = sc.parallelize(acTransList).map(_.split(",")).map(x => (x(0), x(1).toDouble)).toDF("accNo", "tranAmount");
		
5、显示结果
	a.显示schema
		acTransDF.printSchema()
	b.显示数据show(nums),默认为20条
		acTransDF.show()

6、创建临时视图供后续SQL操作
	acTransDF.createOrReplaceTempView("trans")

7、Spark SQL 简单操作
	//查询操作
	val goodTransRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")
	goodTransRecords.createOrReplaceTempView("goodtrans")
	goodTransRecords.show()
	//聚合操作,求sum, max,min
	val aggAmount = spark.sql("SELECT sum(tranAmount), max(tranAmount), min(tranAmount) FROM goodtrans")
	aggAmount.show()
	//去重操作
	val goodAccNos = spark.sql("SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo")
	//通过DataFrame API操作,求sum, max, min
	val sumAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce(_ + _)
	val maxAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a, b) => if (a > b) a else b)
	val minAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a, b) => if (a < b) a else b)

8、DataFrame API操作
	//查询过滤
	val goodTransRecords = acTransDF.filter("accNo like 'SB%'").filter("tranAmount > 0")
	//求sum, max, min
	val aggregates = goodTransRecords.agg("tranAmount" -> "sum", "tranAmount" -> "max", "tranAmount" -> "min")
	//过滤,查询,去重,排序,限制返回记录数
	val goodAccNos = acTransDF.filter("accNo like 'SB%'").select("accNo").distinct().orderBy("accNo").limit(3)
	//分组求和
	val acSummaryViaDFAPI = acTransDF.groupBy("accNo").agg(sum("tranAmount") as "TransTotal")

9、读写Parquet and JSON
	a.Parquet
		write.parquet("scala.master.parquet")
		val acTransDF = spark.read.parquet("scala.master.parquet")
		
		val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
		usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
		
	b.JSON
		acTransDF.write.json("/trans")//目录不能存在
		val acTransDF = spark.read.json("/trans")

10、关联查询
	//准备数据
	case class AcMaster(accNo: String, firstName: String, lastName:String)
	case class AcBal(accNo: String, balanceAmount: Double)
	def toAcMaster = (master: Seq[String]) => AcMaster(master(0), master(1), master(2))
	def toAcBal = (bal: Seq[String]) => AcBal(bal(0), bal(1).trim.toDouble)
	val acMasterList = Array("SB10001,Roger,Federer","SB10002,Pete,Sampras", "SB10003,Rafael,Nadal","SB10004,Boris,Becker", "SB10005,Ivan,Lendl")
	val acBalList = Array("SB10001,50000", "SB10002,12000","SB10003,3000", "SB10004,8500", "SB10005,5000")
	val acMasterDF = sc.parallelize(acMasterList).map(_.split(",")).map(toAcMaster(_)).toDF()
	val acBalDF = sc.parallelize(acBalList).map(_.split(",")).map(toAcBal(_)).toDF()
	acMasterDF.createOrReplaceTempView("master")
	acBalDF.createOrReplaceTempView("balance")
	acMasterDF.show()
	acBalDF.show()
	//关联查询
	val acDetail = spark.sql("SELECT master.accNo, firstName, lastName, balanceAmount FROM master, balance WHERE master.accNo = balance.accNo ORDER BY balanceAmount DESC")
	acDetail.show()

	//API关联查询
	val acDetailFromAPI = acMasterDF.join(acBalDF, acMasterDF("accNo") === acBalDF("accNo"), "inner")
		.sort($"balanceAmount".desc)
		.select(acMasterDF("accNo"), acMasterDF("firstName"), acMasterDF("lastName"), acBalDF("balanceAmount"))
	acDetailFromAPI.show()

11、DataSet操作与DataFrame类似
	a. 在org.apache.spark.sql源码中,有这么一句话:type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
	b.简单示例
		// you can use custom classes that implement the Product interface
		case class Person(name: String, age: Long)
		// Encoders are created for case classes
		val caseClassDS = Seq(Person("Andy", 32)).toDS()
		caseClassDS.show()
		
		// Encoders for most common types are automatically provided by importing spark.implicits._
		val primitiveDS = Seq(1, 2, 3).toDS()
		primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

		// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
		val path = "examples/src/main/resources/people.json"
		val peopleDS = spark.read.json(path).as[Person]
		peopleDS.show()

12、连接数据操作(MySQL实例)	
	 val jdbcDF = spark.read
    .format("jdbc")
    //.option("driver", "com.mysql.jdbc.Driver")//此句可不加,spark会自动识别
    .option("url", "jdbc:mysql://127.0.0.1:3306/mysql?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC")
    .option("dbtable", "user")
    .option("user", "root")
    .option("password", "123456")
    .load()

	jdbcDF.show()
		
	

 

13、连接MySQL数据库时报以下时区错误信息:为URL添加参数serverTimezone=UTC即可,这里的时区可以根据自己数据库的设定来设置(GMT/UTC )

Exception in thread "main" java.sql.SQLException: The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:545)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:69)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1606)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:633)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:347)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:219)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:59)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:113)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:330)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
	at com.spark.sql.SparkSQL$.delayedEndpoint$com$spark$sql$SparkSQL$1(SparkSQL.scala:108)
	at com.spark.sql.SparkSQL$delayedInit$body.apply(SparkSQL.scala:14)
	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
	at scala.App$$anonfun$main$1.apply(App.scala:76)
	at scala.App$$anonfun$main$1.apply(App.scala:76)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
	at scala.App$class.main(App.scala:76)
	at com.spark.sql.SparkSQL$.main(SparkSQL.scala:14)
	at com.spark.sql.SparkSQL.main(SparkSQL.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: com.mysql.cj.core.exceptions.InvalidConnectionAttributeException: The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:54)
	at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73)
	at com.mysql.cj.jdbc.util.TimeUtil.getCanonicalTimezone(TimeUtil.java:118)
	at com.mysql.cj.mysqla.MysqlaSession.configureTimezone(MysqlaSession.java:293)
	at com.mysql.cj.jdbc.ConnectionImpl.initializePropsFromServer(ConnectionImpl.java:2399)
	at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:1739)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1596)
	... 28 more

Process finished with exit code 1

 

© 著作权归作者所有

k
粉丝 3
博文 61
码字总数 29867
作品 0
深圳
私信 提问
Spark 2.0 时代全面到来 —— 2.0.1 版本发布

距离Spark 2.0.0发布两个月后,Spark 2.0.1版本发布了,这是一个修正版本,共处理了300多个Issue,涉及spark稳定性和bug等方面的修复 ,它的发布意味着Spark 2.0接近生产环境使用要求,想要尝...

达尔文
2016/10/08
13.4K
22
Spark 数据分析导论-笔记

Spark Core Spark Core 实现了Spark 的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。 Spark Core 中还包含了 对弹性分布式数据集(resilient distributed dataset,简...

Java搬砖工程师
2018/12/26
28
0
Spark2.1.0之基础知识

版权声明:本文为博主原创文章,未经博主允许不得转载。 https://blog.csdn.net/beliefer/article/details/80303035 在阅读本文之前,读者最好已经阅读了《Spark2.1.0之初识Spark》一文,本文...

泰山不老生
2018/05/24
0
0
Spark 学习资源收集【Updating】

(一)spark 相关安装部署、开发环境 1、Spark 伪分布式 & 全分布式 安装指南 http://my.oschina.net/leejun2005/blog/394928 2、Apache Spark探秘:三种分布式部署方式比较 http://dongxic...

大数据之路
2014/09/08
5.1K
0
Spark 2.0 预览:更简单,更快,更智能

Apache Spark 2.0 技术预览在 Databricks Community Edition 发布。该预览包使用upstream branch-2.0构建,当启动Cluster时,使用预览包和选择“2.0 (Tech Preview)” 一样简单。 离最终的A...

oschina
2016/05/12
7K
6

没有更多内容

加载失败,请刷新页面

加载更多

rime设置为默认简体

转载 https://github.com/ModerRAS/ModerRAS.github.io/blob/master/_posts/2018-11-07-rime%E8%AE%BE%E7%BD%AE%E4%B8%BA%E9%BB%98%E8%AE%A4%E7%AE%80%E4%BD%93.md 写在开始 我的Arch Linux上......

zhenruyan
今天
5
0
简述TCP的流量控制与拥塞控制

1. TCP流量控制 流量控制就是让发送方的发送速率不要太快,要让接收方来的及接收。 原理是通过确认报文中窗口字段来控制发送方的发送速率,发送方的发送窗口大小不能超过接收方给出窗口大小。...

鏡花水月
今天
10
0
OSChina 周日乱弹 —— 别问,问就是没空

Osc乱弹歌单(2019)请戳(这里) 【今日歌曲】 @tom_tdhzz :#今日歌曲推荐# 分享容祖儿/彭羚的单曲《心淡》: 《心淡》- 容祖儿/彭羚 手机党少年们想听歌,请使劲儿戳(这里) @wqp0010 :周...

小小编辑
今天
1K
11
golang微服务框架go-micro 入门笔记2.1 micro工具之micro api

micro api micro 功能非常强大,本文将详细阐述micro api 命令行的功能 重要的事情说3次 本文全部代码https://idea.techidea8.com/open/idea.shtml?id=6 本文全部代码https://idea.techidea8....

非正式解决方案
今天
5
0
Spring Context 你真的懂了吗

今天介绍一下大家常见的一个单词 context 应该怎么去理解,正确的理解它有助于我们学习 spring 以及计算机系统中的其他知识。 1. context 是什么 我们经常在编程中见到 context 这个单词,当...

Java知其所以然
昨天
9
0

没有更多内容

加载失败,请刷新页面

加载更多

返回顶部
顶部