Spark学习笔记-SparkSQL
Spark学习笔记-SparkSQL
Endless2010 发表于6个月前
Spark学习笔记-SparkSQL
  • 发表于 6个月前
  • 阅读 5
  • 收藏 0
  • 点赞 0
  • 评论 0

【腾讯云】如何购买服务器最划算?>>>   

从json文件创建DataFrame

对于json数据,Spark可以自动创建结构类型

import org.apache.spark.sql._
var sqlContext=new SQLContext(sc)
var dataFrame=sqlContext.jsonFile("D:/account.json")
dataFrame.registerTempTable("Account"); //创建临时表,sql查询中使用
dataFrame.printSchema() 
sqlContext.sql("select * from Account where age>30").collect()

输入图片说明

输入图片说明

从文本文件创建DataFrame

文本文件没有结构,可以使用StructType,StructField来指定模式

import org.apache.spark.sql._
import org.apache.spark.sql.types._
var file=sc.textFile("D:/account.txt")
var schemaString="account_number,balance,name,age,gender"
var schema=StructType(schemaString.split(",").map(field=>StructField(field,StringType,true)))
var rowRDD=file.map(_.split(",")).map(field=>Row(field(0),field(1),field(2),field(3),field(4)))
var dataFrame=new SQLContext(sc).createDataFrame(rowRDD,schema)
dataFrame.registerTempTable("Account"); //创建临时表,指定schama
dataFrame.printSchema()
dataFrame.show() 
val sqlContext=new SQLContext(sc)
sqlContext.sql("select * from Account where age>30").collect()

输入图片说明 输入图片说明 输入图片说明

使用Case Class

Spark SQL的scala接口支持自动转换一个包含case class的RDD为一个DataFrame

import org.apache.spark.sql._
val sqlContext=new SQLContext(sc)
var file=sc.textFile("D:/account.txt")
//RDD隐士转换为DataFrame
import sqlContext.implicits._
case class AccountInfo(account_number:Int,balance:Double,name:String,age:Int,gender:String)
var lines=file.map(_.split(","))
lines.map(field=>AccountInfo(field(0).toInt,field(1).toDouble,field(2),field(3).toInt,field(4))).toDF()
dataFrame.registerTempTable("Account"); 
dataFrame.printSchema()
dataFrame.show() 
sqlContext.sql("select * from Account where age>30").collect()

输入图片说明

从MySql读取数据

注意导入mysql-connector-java-xx.jar

import org.apache.spark.sql._
val sqlContext=new SQLContext(sc)
val url = "jdbc:mysql://127.0.0.1:3306/test" 
val tableName = "account" 
val prop = new java.util.Properties 
prop.setProperty("user","root") 
prop.setProperty("password","1234") 
val dataFrame= sqlContext.read.jdbc(url,tableName,prop) 
dataFrame.registerTempTable("Account"); 
dataFrame.printSchema()
dataFrame.show() 
sqlContext.sql("select * from Account where age>30").collect().foreach(println)

输入图片说明 输入图片说明 输入图片说明

共有 人打赏支持
粉丝 1
博文 25
码字总数 23027
×
Endless2010
如果觉得我的文章对您有用,请随意打赏。您的支持将鼓励我继续创作!
* 金额(元)
¥1 ¥5 ¥10 ¥20 其他金额
打赏人
留言
* 支付类型
微信扫码支付
打赏金额:
已支付成功
打赏金额: