Spark SQL编程

原创
2017/03/27 22:55
阅读数 294

1、测试数据
	val acTransList = Seq("SB10001,1000", "SB10002,1200",
		"SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000",
		"SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000",
		"SB10002,-10")
		
2、定义Trans类
	case class Trans(accNo: String, tranAmount: Double)
	
3、创建SparkSession
	val conf = new SparkConf().setMaster("local[2]").setAppName("SparkSQL")
	val spark = SparkSession.builder().config(conf).getOrCreate()
	val sc = spark.sparkContext
	
4、创建DataFrame方式(2种方式)
	a. 通过createDataFrame()方法创建
		(1).RDD[Trans]方式
			val rdd = sc.parallelize(acTransList).map(_.split(",")).map(x => Trans(x(0), x(1).toDouble))
			val acTransDF = spark.createDataFrame(rdd);
		(2).schema方式
			//StructField含义:StructField(name: String, dataType: DataType, nullable: Boolean = true, metadata: Metadata = Metadata.empty)
			val schema = StructType(StructField("accNo", StringType, false) :: StructField("tranAmount", DoubleType, false) :: Nil)
			val rdd = sc.parallelize(acTransList).map(_.split(",")).map(x => Row(x(0), x(1).toDouble))
			val acTransDF = spark.createDataFrame(rdd, schema);
		(3).Seq[Trans]方式
			val acTransSeq = Seq(Trans("SB10001", 1000.toDouble), Trans("SB10002", 1200.toDouble))
			val acTransDF = spark.createDataFrame(acTransSeq);
	b.通过RDD到DataFrame的隐式转换
		import spark.implicits._
		//若不加("accNo", "tranAmount")列名,默认为(_1, _2)
		val acTransDF = sc.parallelize(acTransList).map(_.split(",")).map(x => (x(0), x(1).toDouble)).toDF("accNo", "tranAmount");
		
5、显示结果
	a.显示schema
		acTransDF.printSchema()
	b.显示数据show(nums),默认为20条
		acTransDF.show()

6、创建临时视图供后续SQL操作
	acTransDF.createOrReplaceTempView("trans")

7、Spark SQL 简单操作
	//查询操作
	val goodTransRecords = spark.sql("SELECT accNo, tranAmount FROM trans WHERE accNo like 'SB%' AND tranAmount > 0")
	goodTransRecords.createOrReplaceTempView("goodtrans")
	goodTransRecords.show()
	//聚合操作,求sum, max,min
	val aggAmount = spark.sql("SELECT sum(tranAmount), max(tranAmount), min(tranAmount) FROM goodtrans")
	aggAmount.show()
	//去重操作
	val goodAccNos = spark.sql("SELECT DISTINCT accNo FROM trans WHERE accNo like 'SB%' ORDER BY accNo")
	//通过DataFrame API操作,求sum, max, min
	val sumAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce(_ + _)
	val maxAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a, b) => if (a > b) a else b)
	val minAmountByMixing = goodTransRecords.map(trans => trans.getAs[Double]("tranAmount")).reduce((a, b) => if (a < b) a else b)

8、DataFrame API操作
	//查询过滤
	val goodTransRecords = acTransDF.filter("accNo like 'SB%'").filter("tranAmount > 0")
	//求sum, max, min
	val aggregates = goodTransRecords.agg("tranAmount" -> "sum", "tranAmount" -> "max", "tranAmount" -> "min")
	//过滤,查询,去重,排序,限制返回记录数
	val goodAccNos = acTransDF.filter("accNo like 'SB%'").select("accNo").distinct().orderBy("accNo").limit(3)
	//分组求和
	val acSummaryViaDFAPI = acTransDF.groupBy("accNo").agg(sum("tranAmount") as "TransTotal")

9、读写Parquet and JSON
	a.Parquet
		write.parquet("scala.master.parquet")
		val acTransDF = spark.read.parquet("scala.master.parquet")
		
		val usersDF = spark.read.load("examples/src/main/resources/users.parquet")
		usersDF.select("name", "favorite_color").write.save("namesAndFavColors.parquet")
		
	b.JSON
		acTransDF.write.json("/trans")//目录不能存在
		val acTransDF = spark.read.json("/trans")

10、关联查询
	//准备数据
	case class AcMaster(accNo: String, firstName: String, lastName:String)
	case class AcBal(accNo: String, balanceAmount: Double)
	def toAcMaster = (master: Seq[String]) => AcMaster(master(0), master(1), master(2))
	def toAcBal = (bal: Seq[String]) => AcBal(bal(0), bal(1).trim.toDouble)
	val acMasterList = Array("SB10001,Roger,Federer","SB10002,Pete,Sampras", "SB10003,Rafael,Nadal","SB10004,Boris,Becker", "SB10005,Ivan,Lendl")
	val acBalList = Array("SB10001,50000", "SB10002,12000","SB10003,3000", "SB10004,8500", "SB10005,5000")
	val acMasterDF = sc.parallelize(acMasterList).map(_.split(",")).map(toAcMaster(_)).toDF()
	val acBalDF = sc.parallelize(acBalList).map(_.split(",")).map(toAcBal(_)).toDF()
	acMasterDF.createOrReplaceTempView("master")
	acBalDF.createOrReplaceTempView("balance")
	acMasterDF.show()
	acBalDF.show()
	//关联查询
	val acDetail = spark.sql("SELECT master.accNo, firstName, lastName, balanceAmount FROM master, balance WHERE master.accNo = balance.accNo ORDER BY balanceAmount DESC")
	acDetail.show()

	//API关联查询
	val acDetailFromAPI = acMasterDF.join(acBalDF, acMasterDF("accNo") === acBalDF("accNo"), "inner")
		.sort($"balanceAmount".desc)
		.select(acMasterDF("accNo"), acMasterDF("firstName"), acMasterDF("lastName"), acBalDF("balanceAmount"))
	acDetailFromAPI.show()

11、DataSet操作与DataFrame类似
	a. 在org.apache.spark.sql源码中,有这么一句话:type DataFrame = org.apache.spark.sql.Dataset[org.apache.spark.sql.Row]
	b.简单示例
		// you can use custom classes that implement the Product interface
		case class Person(name: String, age: Long)
		// Encoders are created for case classes
		val caseClassDS = Seq(Person("Andy", 32)).toDS()
		caseClassDS.show()
		
		// Encoders for most common types are automatically provided by importing spark.implicits._
		val primitiveDS = Seq(1, 2, 3).toDS()
		primitiveDS.map(_ + 1).collect() // Returns: Array(2, 3, 4)

		// DataFrames can be converted to a Dataset by providing a class. Mapping will be done by name
		val path = "examples/src/main/resources/people.json"
		val peopleDS = spark.read.json(path).as[Person]
		peopleDS.show()

12、连接数据操作(MySQL实例)	
	 val jdbcDF = spark.read
    .format("jdbc")
    //.option("driver", "com.mysql.jdbc.Driver")//此句可不加,spark会自动识别
    .option("url", "jdbc:mysql://127.0.0.1:3306/mysql?useUnicode=true&characterEncoding=UTF-8&useJDBCCompliantTimezoneShift=true&useLegacyDatetimeCode=false&serverTimezone=UTC")
    .option("dbtable", "user")
    .option("user", "root")
    .option("password", "123456")
    .load()

	jdbcDF.show()
		
	

 

13、连接MySQL数据库时报以下时区错误信息:为URL添加参数serverTimezone=UTC即可,这里的时区可以根据自己数据库的设定来设置(GMT/UTC )

Exception in thread "main" java.sql.SQLException: The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:545)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:513)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:505)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:479)
	at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:489)
	at com.mysql.cj.jdbc.exceptions.SQLExceptionsMapping.translateException(SQLExceptionsMapping.java:69)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1606)
	at com.mysql.cj.jdbc.ConnectionImpl.<init>(ConnectionImpl.java:633)
	at com.mysql.cj.jdbc.ConnectionImpl.getInstance(ConnectionImpl.java:347)
	at com.mysql.cj.jdbc.NonRegisteringDriver.connect(NonRegisteringDriver.java:219)
	at org.apache.spark.sql.execution.datasources.jdbc.DriverWrapper.connect(DriverWrapper.scala:45)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:59)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcUtils$$anonfun$createConnectionFactory$1.apply(JdbcUtils.scala:50)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRDD$.resolveTable(JDBCRDD.scala:58)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCRelation.<init>(JDBCRelation.scala:113)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:45)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:330)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:152)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:125)
	at com.spark.sql.SparkSQL$.delayedEndpoint$com$spark$sql$SparkSQL$1(SparkSQL.scala:108)
	at com.spark.sql.SparkSQL$delayedInit$body.apply(SparkSQL.scala:14)
	at scala.Function0$class.apply$mcV$sp(Function0.scala:34)
	at scala.runtime.AbstractFunction0.apply$mcV$sp(AbstractFunction0.scala:12)
	at scala.App$$anonfun$main$1.apply(App.scala:76)
	at scala.App$$anonfun$main$1.apply(App.scala:76)
	at scala.collection.immutable.List.foreach(List.scala:381)
	at scala.collection.generic.TraversableForwarder$class.foreach(TraversableForwarder.scala:35)
	at scala.App$class.main(App.scala:76)
	at com.spark.sql.SparkSQL$.main(SparkSQL.scala:14)
	at com.spark.sql.SparkSQL.main(SparkSQL.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at com.intellij.rt.execution.application.AppMain.main(AppMain.java:144)
Caused by: com.mysql.cj.core.exceptions.InvalidConnectionAttributeException: The server time zone value '�й���׼ʱ��' is unrecognized or represents more than one time zone. You must configure either the server or JDBC driver (via the serverTimezone configuration property) to use a more specifc time zone value if you want to utilize time zone support.
	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
	at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:54)
	at com.mysql.cj.core.exceptions.ExceptionFactory.createException(ExceptionFactory.java:73)
	at com.mysql.cj.jdbc.util.TimeUtil.getCanonicalTimezone(TimeUtil.java:118)
	at com.mysql.cj.mysqla.MysqlaSession.configureTimezone(MysqlaSession.java:293)
	at com.mysql.cj.jdbc.ConnectionImpl.initializePropsFromServer(ConnectionImpl.java:2399)
	at com.mysql.cj.jdbc.ConnectionImpl.connectOneTryOnly(ConnectionImpl.java:1739)
	at com.mysql.cj.jdbc.ConnectionImpl.createNewIO(ConnectionImpl.java:1596)
	... 28 more

Process finished with exit code 1

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部