标签生成器

原创
2017/09/20 18:01
阅读数 125
package com.hhb.spark.core

import org.apache.spark.{SparkConf, SparkContext}

import com.alibaba.fastjson.JSON

/**
  * 标签生成器
  * Created by dell on 2017/9/20.
  */
object TagGenerator {
  def main(args: Array[String]) {
    val conf = new SparkConf()
      .setAppName("TagGenerator").setMaster("local")
      .set("spark.testing.memory", "2147480000")
    val sc = new SparkContext(conf)

    // 加载文件
    val rdd1 = sc.textFile("c://temptags.txt")
    // 过滤文件
    val rdd2 = rdd1.map(line => {
      val arr = line.split("\\t")// 切割
      val busId = arr(0)
      val text = arr(1)
      // 将字符串转为json对象
      val jo = JSON.parseObject(text)
      val jarr = jo.getJSONArray("extInfoList")
      if(jarr != null && jarr.size() > 0){
        val v1 = jarr.getJSONObject(0)
        val arr2 = v1.getJSONArray("values")

        if (arr2 != null && arr2.size() > 0){
          var str = ""
          var i = 0
          while (i < arr2.size()) {
            str = str + arr2.getString(i) + ","
            i += 1
          }
          (busId, str.substring(0, str.length - 1))
        }
        else (busId, "")
      }
      else (busId, "")
    })

    // 过滤,没有评论的过滤掉
    val rdd3 = rdd2.filter(t => {
      t._2 != null && !"".equals(t._2)
    })

    // 按照value压扁
    val rdd4 = rdd3.flatMapValues(_.split(","))

    // 重组key busId-comm, 1
    val rdd5 = rdd4.map(t=>{
      (t._1 + "-" + t._2, 1)
    })

    // 聚合
    val rdd6 = rdd5.reduceByKey(_ + _)

    // 变换成(busId, (comm, count))
    val rdd7 = rdd6.map(t => {
      val arr = t._1.split("-")
      (arr(0), (arr(1), t._2) :: Nil)
    })

    // 安装busId进行聚合,values是list
    val rdd8 = rdd7.reduceByKey(_ ++ _)

    // 按key降序排序
    val rdd9 = rdd8.map(t => {
      val x = t._2.sortBy(t=>{
        t._2
      }).reverse.take(5)
      (t._1, x)
    })

    val rdd99 = rdd9.sortBy(t => {
      t._2(0)._2
    },false,1)

    // 变换成(busId, )
    val rdd10 = rdd99.map(t =>{
      val col = t._2
      var desc = "";
      for (tt <- col){
        desc = desc + tt._1 + "(" + tt._2 + "),"
      }
      (t._1,desc.substring(0, desc.length-1))
    })

    rdd10.foreach(println)

    sc.stop()
  }
}

展开阅读全文
打赏
0
0 收藏
分享
加载中
更多评论
打赏
0 评论
0 收藏
0
分享
返回顶部
顶部