Spark编程基础

原创
2017/03/27 14:33
阅读数 74
1、什么是Spark RDD
	a. Spark RDD is immutable(不可变)
	b. Spark RDD is distributable(分布式)
	c. Spark RDD lives in memory(驻内存)
	d. Spark RDD is strongly typed(强类型)

2、开启Spark监控(3种方式,优先级:c > b > a)
	a. 修改$SPARK_HOME/conf/spark-defaults.conf
		spark.eventLog.enabled           true
		spark.eventLog.dir               hdfs://hadoop.master:9000/user/centos/spark/event_log
	b. 使用./spark-shell(或./spark-submit)命令
		./spark-shell --master spark://spark.master:7077 --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://hadoop.master:9000/user/centos/spark/event_log
	c. 使用编码方式
		new SparkConf().set(key, value)

3、Spark-Shell交互模式(或IDEA等)下设置控制台日志输出
	import org.apache.log4j.{Level, Logger}
	Logger.getLogger("org.apache.spark").setLevel(Level.WARN)
	Logger.getLogger("org.apache.eclipse.jetty.server").setLevel(Level.OFF)

4、Spark简单使用
	val acTransList = Array("SB10001,1000", "SB10002,1200", "SB10003,8000", "SB10004,400", "SB10005,300", "SB10006,10000", "SB10007,500", "SB10008,56", "SB10009,30","SB10010,7000", "CR10001,7000", "SB10002,-10")
	
	val acTransRDD = sc.parallelize(acTransList).map(_.split(",")).map(x => (x(0), x(1).toDouble))
	acTransRDD.collect

	val goodTransRecords = acTransRDD.filter( _._2.toDouble > 0).filter(_._1.startsWith("SB"))
	goodTransRecords.collect
	val sumAmount = goodTransRecords.map(_._2.toDouble).reduce(_ + _)
	val maxAmount = goodTransRecords.map(_._2.toDouble).reduce((a, b) => if (a > b) a else b)
	val minAmount = goodTransRecords.map(_._2.toDouble).reduce((a, b) => if (a < b) a else b)

	val highValueTransRecords = goodTransRecords.filter(_._2.toDouble > 1000)
	highValueTransRecords.collect

	val badAmountLambda = (trans: (String, Double)) => trans._2 <= 0
	val badAmountRecords = acTransRDD.filter(badAmountLambda)
	badAmountRecords.collect

	val badAcNoLambda = (trans: (String, Double)) => trans._1.startsWith("SB") == false
	val badAccountRecords = acTransRDD.filter(badAcNoLambda)
	badAccountRecords.collect

	val badTransRecords = badAmountRecords.union(badAccountRecords)
	badTransRecords.collect

	val combineAllElements = sc.parallelize(acTransList).flatMap(trans => trans.split(","))
	combineAllElements.collect

	val allGoodAccountNos = combineAllElements.filter(_.startsWith("SB"))
	allGoodAccountNos.distinct.collect

	val accSummary = acTransRDD.reduceByKey(_ + _).sortByKey()
	accSummary.collect

5、Spark Transformation And Actions
	a. Spark transformation
		filter(fn)
		map(fn)
		flatMap(fn)
		union(other)
		join(other, [numTasks]):根据key连接两个RDD

	b. Spark action
		collect()
		reduce(fn):
		foreach(fn):迭代
		reduceByKey(fn,[noOfTasks]):根据key分类,通过fn计算value
		sortByKey([ascending], [numTasks]):通过key进行排序
		first():返回RDD的第一个元素
		take(n):返回RDD前n个元素
		countByKey():计算每个key的个数
		count():返回RDD元素的个数

6、从文件中创建RDD
Local filesystem	val textFile = sc.textFile("README.md")
HDFS			val textFile = sc.textFile("hdfs://<location in HDFS>")

 

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部