package week4
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SQLContext
case class Person(name: String, age: Int)
object SQLOnSpark {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SQLOnSpark")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
import sqlContext._
val people: RDD[Person] = sc.textFile("hdfs://hadoop1:8000/dataguru/week4/people.txt")
.map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt))
people.registerAsTable("people")
val teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 10 and age <= 19")
teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
sc.stop()
}
}
****************************************************
****************************************************
****************************************************
package week4
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.hive.LocalHiveContext
object HiveOnSpark {
case class Record(key: Int, value: String)
def main(args: Array[String]) {
val sparkConf = new SparkConf().setAppName("HiveFromSpark")
val sc = new SparkContext(sparkConf)
val hiveContext = new LocalHiveContext(sc)
import hiveContext._
hql("use saledata")
hql("select c.theyear,count(distinct a.ordernumber),sum(b.amount) from tblStock a join tblStockDetail b on a.ordernumber=b.ordernumber join tbldate c on a.dateid=c.dateid group by c.theyear order by c.theyear")
.collect().foreach(println)
sc.stop()
}
}
****************************************************
****************************************************
****************************************************
package week4
import java.sql.{Connection, DriverManager, ResultSet}
object MySQL_test {
def main(args: Array[String]) {
Class.forName("com.mysql.jdbc.Driver")
val conn = DriverManager.getConnection("jdbc:mysql://hadoop3:3306/test", "hadoop", "hadoop")
try {
val statement = conn.createStatement(ResultSet.TYPE_FORWARD_ONLY,ResultSet.CONCUR_UPDATABLE )
val prep=conn.prepareStatement("insert into saledata (theyear,qty,amount) values (\"2003\",2,3)")
prep.executeUpdate()
val rs = statement.executeQuery("select theyear,qty,amount from saledata")
while (rs.next) {
val theyear = rs.getString("theyear")
val qty = rs.getString("qty")
println("theyear = %s, qtyname = %s".format(theyear, qty))
}
} catch {
case e: Exception => e.printStackTrace
}
conn.close
}
}