spark implementation hadoop setup,cleanup

原创
2015/08/11 16:41
阅读数 35

def main(args: Array[String]) { val sc = new SparkContext("local", "xxx") val inputData = sc.textFile("hdfs://master:8020/data/spark/user-history-data") val lines = inputData.map(line => (line, line.length))

val result = lines.mapPartitions { valueIterator =>
  if (valueIterator.isEmpty) {
    Iterator[String]()
  } else {
    val transformedItem = new ListBuffer[String]() //setup ListBuffer
    val fs: FileSystem = FileSystem.get(new Configuration()) //setup FileSystem

    valueIterator.map { item =>
      transformedItem += item._1 +":"+item._2
      val outputFile = fs.create(new Path("/home/xxx/opt/data/spark/" + item._1.substring(0,item._1.indexOf("\t")) + ".txt"))
      outputFile.write((item._1 +":"+item._2).getBytes())
      if (!valueIterator.hasNext) {
        transformedItem.clear() //cleanup transformedItem
        outputFile.close() //cleanup outputFile
        fs.close() //cleanup fs
      }
      transformedItem
    }
  }
}

result.foreach(println(_))
sc.stop()

将hdfs数据: zhangsan 1 2015-07-30 20:01:01 127.0.0.1 zhangsan 2 2015-07-30 20:01:01 127.0.0.1 zhangsan 3 2015-07-30 20:01:01 127.0.0.1 zhangsan 4 2015-07-31 20:01:01 127.0.0.1 zhangsan 5 2015-07-31 20:21:01 127.0.0.1 lisi 1 2015-07-30 21:01:01 127.0.0.1 lisi 2 2015-07-30 22:01:01 127.0.0.1 lisi 3 2015-07-31 23:31:01 127.0.0.1 lisi 4 2015-07-31 22:21:01 127.0.0.1 lisi 5 2015-07-31 23:11:01 127.0.0.1 wangwu 1 2015-07-30 21:01:01 127.0.0.1 wangwu 2 2015-07-30 22:01:01 127.0.0.1 wangwu 3 2015-07-31 23:31:01 127.0.0.1 wangwu 4 2015-07-31 22:21:01 127.0.0.1 wangwu 5 2015-07-31 23:11:01 127.0.0.1

读取到spark中,并统计每行长度,再将数据写到本地的文件中(文件名称以每行第一个单词) 最终实现hadoop中setup, cleanup

强烈阅读如下链接: http://mail-archives.apache.org/mod_mbox/spark-user/201407.mbox/%3CCAPH-c_O9kQO6yJ4khXUVdO=+D4vj=JfG2tP9eqn5RPko=dRNAg@mail.gmail.com%3E http://blog.cloudera.com/blog/2014/09/how-to-translate-from-mapreduce-to-apache-spark/ http://apache-spark-user-list.1001560.n3.nabble.com/how-to-split-RDD-by-key-and-save-to-different-path-td11887.html#a11983 http://stackoverflow.com/questions/24520225/writing-to-hadoop-distributed-file-system-multiple-times-with-spark

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部